From 8452969a8afb088947271e64a9db932ff210f21e Mon Sep 17 00:00:00 2001 From: mahajanadhitya <115617755+mahajanadhitya@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:59:53 +0530 Subject: [PATCH] ListGroups API KIP 848 (#7) * rebase commit * change --- examples/AdminClient/Program.cs | 141 +++-- .../Admin/ConsumerGroupListing.cs | 7 +- .../Admin/ListConsumerGroupsOptions.cs | 8 + .../Admin/ListConsumerGroupsReport.cs | 12 +- .../Admin/ListConsumerGroupsResult.cs | 6 +- src/Confluent.Kafka/AdminClient.cs | 561 ++++++++-------- src/Confluent.Kafka/ConsumerGroupType.cs | 40 ++ src/Confluent.Kafka/Impl/LibRdKafka.cs | 597 +++++++++--------- .../Impl/NativeMethods/NativeMethods.cs | 47 +- .../NativeMethods/NativeMethods_Alpine.cs | 47 +- .../NativeMethods/NativeMethods_Centos8.cs | 48 +- src/Confluent.Kafka/Impl/SafeKafkaHandle.cs | 14 +- .../AdminClient_ListDescribeConsumerGroups.cs | 72 ++- .../Tests/Tests.cs | 3 +- .../Tests/Tests.cs | 3 +- 15 files changed, 907 insertions(+), 699 deletions(-) create mode 100644 src/Confluent.Kafka/ConsumerGroupType.cs diff --git a/examples/AdminClient/Program.cs b/examples/AdminClient/Program.cs index e7330ba5b..7e3052a60 100644 --- a/examples/AdminClient/Program.cs +++ b/examples/AdminClient/Program.cs @@ -92,7 +92,7 @@ static async Task CreateTopicAsync(string bootstrapServers, string[] commandArgs { try { - await adminClient.CreateTopicsAsync(new TopicSpecification[] { + await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } }); } catch (CreateTopicsException e) @@ -168,23 +168,24 @@ static List ParseUserScramCredentialAlterations( { if (args.Length == 0) { - Console.WriteLine("usage: .. alter-user-scram-alterations " + + Console.WriteLine("usage: .. alter-user-scram-alterations " + "UPSERT " + "[UPSERT " + "DELETE ..]"); Environment.ExitCode = 1; return null; } - + var alterations = new List(); - for (int i = 0; i < args.Length;) { + for (int i = 0; i < args.Length;) + { string alterationName = args[i]; if (alterationName == "UPSERT") { if (i + 5 >= args.Length) { throw new ArgumentException( - $"invalid number of arguments for alteration {alterations.Count},"+ + $"invalid number of arguments for alteration {alterations.Count}," + $" expected 5, got {args.Length - i - 1}"); } @@ -218,7 +219,7 @@ static List ParseUserScramCredentialAlterations( if (i + 2 >= args.Length) { throw new ArgumentException( - $"invalid number of arguments for alteration {alterations.Count},"+ + $"invalid number of arguments for alteration {alterations.Count}," + $" expected 2, got {args.Length - i - 1}"); } @@ -246,34 +247,34 @@ static Tuple> ParseListOffsetsArg { if (args.Length == 0) { - Console.WriteLine("usage: .. list-offsets " + + Console.WriteLine("usage: .. list-offsets " + " .."); Environment.ExitCode = 1; return null; } - + var isolationLevel = Enum.Parse(args[0]); var topicPartitionOffsetSpecs = new List(); for (int i = 1; i < args.Length;) { - if (args.Length < i+3) + if (args.Length < i + 3) { throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}"); } - + string topic = args[i]; var partition = Int32.Parse(args[i + 1]); var offsetSpec = args[i + 2]; if (offsetSpec == "TIMESTAMP") { - if (args.Length < i+4) + if (args.Length < i + 4) { throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}"); } - + var timestamp = Int64.Parse(args[i + 3]); i = i + 1; - topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec + topicPartitionOffsetSpecs.Add(new TopicPartitionOffsetSpec { TopicPartition = new TopicPartition(topic, new Partition(partition)), OffsetSpec = OffsetSpec.ForTimestamp(timestamp) @@ -281,7 +282,7 @@ static Tuple> ParseListOffsetsArg } else if (offsetSpec == "MAX_TIMESTAMP") { - topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec + topicPartitionOffsetSpecs.Add(new TopicPartitionOffsetSpec { TopicPartition = new TopicPartition(topic, new Partition(partition)), OffsetSpec = OffsetSpec.MaxTimestamp() @@ -289,7 +290,7 @@ static Tuple> ParseListOffsetsArg } else if (offsetSpec == "EARLIEST") { - topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec + topicPartitionOffsetSpecs.Add(new TopicPartitionOffsetSpec { TopicPartition = new TopicPartition(topic, new Partition(partition)), OffsetSpec = OffsetSpec.Earliest() @@ -297,7 +298,7 @@ static Tuple> ParseListOffsetsArg } else if (offsetSpec == "LATEST") { - topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec + topicPartitionOffsetSpecs.Add(new TopicPartitionOffsetSpec { TopicPartition = new TopicPartition(topic, new Partition(partition)), OffsetSpec = OffsetSpec.Latest() @@ -313,9 +314,9 @@ static Tuple> ParseListOffsetsArg return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs); } - static void PrintListOffsetsResultInfos(List ListOffsetsResultInfos) - { - foreach(var listOffsetsResultInfo in ListOffsetsResultInfos) + static void PrintListOffsetsResultInfos(List ListOffsetsResultInfos) + { + foreach (var listOffsetsResultInfo in ListOffsetsResultInfos) { Console.WriteLine(" ListOffsetsResultInfo:"); Console.WriteLine($" TopicPartitionOffsetError: {listOffsetsResultInfo.TopicPartitionOffsetError}"); @@ -493,7 +494,7 @@ static async Task AlterConsumerGroupOffsetsAsync(string bootstrapServers, string { var results = await adminClient.AlterConsumerGroupOffsetsAsync(input); Console.WriteLine("Successfully altered offsets:"); - foreach(var groupResult in results) + foreach (var groupResult in results) { Console.WriteLine(groupResult); } @@ -542,7 +543,7 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[ return; } } - if(!tpes.Any()) + if (!tpes.Any()) { // In case the list is empty, request offsets for all the partitions. tpes = null; @@ -556,7 +557,7 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[ { var result = await adminClient.ListConsumerGroupOffsetsAsync(input); Console.WriteLine("Successfully listed offsets:"); - foreach(var groupResult in result) + foreach (var groupResult in result) { Console.WriteLine(groupResult); } @@ -582,35 +583,58 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm { var timeout = TimeSpan.FromSeconds(30); var statesList = new List(); - try + var groupTypesList = new List(); + var isType = false; + var isState = false; + for (int i = 0; i < commandArgs.Length; i++) { - if (commandArgs.Length > 0) + if (commandArgs[i] == "-states") { - timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0])); + if (isState) + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + isState = true; + } + else if (commandArgs[i] == "-types") + { + if (isType) + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + isType = true; } - if (commandArgs.Length > 1) + else { - for (int i = 1; i < commandArgs.Length; i++) + if (isState) { statesList.Add(Enum.Parse(commandArgs[i])); } + else if (isType) + { + groupTypesList.Add(Enum.Parse(commandArgs[i])); + } + else + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } } } - catch (SystemException) - { - Console.WriteLine("usage: .. list-consumer-groups [ ... ]"); - Environment.ExitCode = 1; - return; - } - using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() - { + { RequestTimeout = timeout, MatchStates = statesList, + MatchGroupTypes = groupTypesList, }); Console.WriteLine(result); } @@ -640,7 +664,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] var password = commandArgs[1]; var includeAuthorizedOperations = (commandArgs[2] == "1"); var groupNames = commandArgs.Skip(3).ToList(); - + if (string.IsNullOrWhiteSpace(username)) { username = null; @@ -671,7 +695,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] { try { - var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); + var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout, IncludeAuthorizedOperations = includeAuthorizedOperations }); foreach (var group in descResult.ConsumerGroupDescriptions) { Console.WriteLine($"\n Group: {group.GroupId} {group.Error}"); @@ -705,7 +729,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] } } } - + static async Task IncrementalAlterConfigsAsync(string bootstrapServers, string[] commandArgs) { var timeout = TimeSpan.FromSeconds(30); @@ -720,8 +744,8 @@ static async Task IncrementalAlterConfigsAsync(string bootstrapServers, string[] { throw new ArgumentException("invalid arguments length"); } - - for (int i = 1; i < commandArgs.Length; i+=3) + + for (int i = 1; i < commandArgs.Length; i += 3) { var resourceType = Enum.Parse(commandArgs[i]); var resourceName = commandArgs[i + 1]; @@ -734,14 +758,14 @@ static async Task IncrementalAlterConfigsAsync(string bootstrapServers, string[] { throw new ArgumentException($"invalid alteration name \"{config}\""); } - + var name = nameOpValue[0]; var opValue = nameOpValue[1].Split(":"); if (opValue.Length != 2) { throw new ArgumentException($"invalid alteration value \"{nameOpValue[1]}\""); } - + var op = Enum.Parse(opValue[0]); var value = opValue[1]; configList.Add(new ConfigEntry @@ -759,7 +783,7 @@ static async Task IncrementalAlterConfigsAsync(string bootstrapServers, string[] configResourceList[resource] = configList; } } - catch (Exception e) when ( + catch (Exception e) when ( e is ArgumentException || e is FormatException ) @@ -769,7 +793,7 @@ e is FormatException Environment.ExitCode = 1; return; } - + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try @@ -797,7 +821,7 @@ e is FormatException } static async Task DescribeUserScramCredentialsAsync(string bootstrapServers, string[] commandArgs) - { + { var users = commandArgs.ToList(); var timeout = TimeSpan.FromSeconds(30); using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) @@ -874,16 +898,17 @@ await adminClient.AlterUserScramCredentialsAsync(alterations, } } - static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) { + static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) + { var listOffsetsArgs = ParseListOffsetsArgs(commandArgs); if (listOffsetsArgs == null) { return; } - + var isolationLevel = listOffsetsArgs.Item1; var topicPartitionOffsets = listOffsetsArgs.Item2; - + var timeout = TimeSpan.FromSeconds(30); - ListOffsetsOptions options = new ListOffsetsOptions(){ RequestTimeout = timeout, IsolationLevel = isolationLevel }; + ListOffsetsOptions options = new ListOffsetsOptions() { RequestTimeout = timeout, IsolationLevel = isolationLevel }; using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { @@ -916,7 +941,7 @@ static void PrintTopicDescriptions(List topicDescriptions, boo foreach (var partition in topic.Partitions) { Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}"); - if(!partition.ISR.Any()) + if (!partition.ISR.Any()) { Console.WriteLine(" There is no In-Sync-Replica broker for the partition"); } @@ -926,7 +951,7 @@ static void PrintTopicDescriptions(List topicDescriptions, boo Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}"); } - if(!partition.Replicas.Any()) + if (!partition.Replicas.Any()) { Console.WriteLine(" There is no Replica broker for the partition"); } @@ -935,7 +960,7 @@ static void PrintTopicDescriptions(List topicDescriptions, boo string replicas = string.Join("; ", partition.Replicas); Console.WriteLine($" The Replica brokers are: {replicas}"); } - + } Console.WriteLine($" Is internal: {topic.IsInternal}"); if (includeAuthorizedOperations) @@ -954,7 +979,7 @@ static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandA Environment.ExitCode = 1; return; } - + var username = commandArgs[0]; var password = commandArgs[1]; var includeAuthorizedOperations = (commandArgs[2] == "1"); @@ -991,7 +1016,7 @@ static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandA { var descResult = await adminClient.DescribeTopicsAsync( TopicCollection.OfTopicNames(topicNames), - new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); + new DescribeTopicsOptions() { RequestTimeout = timeout, IncludeAuthorizedOperations = includeAuthorizedOperations }); PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations); } catch (DescribeTopicsException e) @@ -1041,11 +1066,11 @@ static async Task DescribeClusterAsync(string bootstrapServers, string[] command { try { - var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); - + var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout, IncludeAuthorizedOperations = includeAuthorizedOperations }); + Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}"); Console.WriteLine(" Nodes:"); - foreach(var node in descResult.Nodes) + foreach (var node in descResult.Nodes) { Console.WriteLine($" {node}"); } @@ -1072,7 +1097,7 @@ public static async Task Main(string[] args) "list-groups", "metadata", "library-version", "create-topic", "create-acls", "list-consumer-groups", "describe-consumer-groups", "list-consumer-group-offsets", "alter-consumer-group-offsets", - "incremental-alter-configs", "describe-user-scram-credentials", + "incremental-alter-configs", "describe-user-scram-credentials", "alter-user-scram-credentials", "describe-topics", "describe-cluster", "list-offsets" }) + diff --git a/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs index 27caece2f..c51f2e2f3 100644 --- a/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs +++ b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs @@ -32,6 +32,11 @@ public class ConsumerGroupListing /// public ConsumerGroupState State { get; set; } + /// + /// The group type of the consumer group. + /// + public ConsumerGroupType GroupType { get; set; } + /// /// Whether the consumer group is simple or not. /// @@ -42,7 +47,7 @@ public class ConsumerGroupListing /// public override string ToString() { - return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}"; + return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}, Type = {GroupType}"; } } } diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs index 1bc23b500..91b91d159 100644 --- a/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs @@ -41,5 +41,13 @@ public class ListConsumerGroupsOptions /// Default: null /// public IEnumerable MatchStates { get; set; } = null; + + /// + /// An enumerable with the group types to query, null to query for all + /// the group types. + /// + /// Default: null + /// + public IEnumerable MatchGroupTypes { get; set; } = null; } } diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs index fd1ea92a7..ab0807175 100644 --- a/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs @@ -35,14 +35,18 @@ public class ListConsumerGroupsReport /// /// Returns a human readable representation of this object. /// - public override string ToString() { + public override string ToString() + { string res = "Groups:\n"; - foreach (ConsumerGroupListing cgl in Valid) { + foreach (ConsumerGroupListing cgl in Valid) + { res += "\t" + cgl.ToString() + "\n"; } - if (Errors.Count != 0) { + if (Errors.Count != 0) + { res += "Errors:\n"; - foreach (Error err in Errors) { + foreach (Error err in Errors) + { res += "\t" + err.ToString() + "\n"; } } diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs index 71c9dd19d..490088752 100644 --- a/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs @@ -30,9 +30,11 @@ public class ListConsumerGroupsResult /// /// Returns a human readable representation of this object. /// - public override string ToString() { + public override string ToString() + { string res = "Groups:\n"; - foreach (ConsumerGroupListing cgl in Valid) { + foreach (ConsumerGroupListing cgl in Valid) + { res += "\t" + cgl.ToString() + "\n"; } return res; diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index ee9a07429..bca989fdb 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -55,13 +55,13 @@ private List extractTopicResults(IntPtr topicResultsPtr, int IntPtr[] topicResultsPtrArr = new IntPtr[topicResultsCount]; Marshal.Copy(topicResultsPtr, topicResultsPtrArr, 0, topicResultsCount); - return topicResultsPtrArr.Select(topicResultPtr => new CreateTopicReport - { - Topic = PtrToStringUTF8(Librdkafka.topic_result_name(topicResultPtr)), - Error = new Error( - Librdkafka.topic_result_error(topicResultPtr), + return topicResultsPtrArr.Select(topicResultPtr => new CreateTopicReport + { + Topic = PtrToStringUTF8(Librdkafka.topic_result_name(topicResultPtr)), + Error = new Error( + Librdkafka.topic_result_error(topicResultPtr), PtrToStringUTF8(Librdkafka.topic_result_error_string(topicResultPtr))) - }).ToList(); + }).ToList(); } private ConfigEntryResult extractConfigEntry(IntPtr configEntryPtr) @@ -74,7 +74,7 @@ private ConfigEntryResult extractConfigEntry(IntPtr configEntryPtr) Marshal.Copy(synonymsPtr, synonymsPtrArr, 0, (int)synonymsCount); synonyms = synonymsPtrArr .Select(synonymPtr => extractConfigEntry(synonymPtr)) - .Select(e => new ConfigSynonym { Name = e.Name, Value = e.Value, Source = e.Source } ) + .Select(e => new ConfigSynonym { Name = e.Name, Value = e.Value, Source = e.Source }) .ToList(); } @@ -113,7 +113,8 @@ private List extractResultConfigs(IntPtr configResourcesP .Select(configEntryPtr => extractConfigEntry(configEntryPtr)) .ToDictionary(e => e.Name); - result.Add(new DescribeConfigsReport { + result.Add(new DescribeConfigsReport + { ConfigResource = new ConfigResource { Name = resourceName, Type = resourceConfigType }, Entries = configEntries, Error = new Error(errorCode, errorReason) @@ -143,7 +144,7 @@ private List extractCreateAclReports(IntPtr aclResultsPtr, int Marshal.Copy(aclResultsPtr, aclsResultsPtrArr, 0, aclResultsCount); return aclsResultsPtrArr.Select(aclResultPtr => - new CreateAclReport + new CreateAclReport { Error = new Error(Librdkafka.acl_result_error(aclResultPtr), false) } @@ -152,7 +153,7 @@ private List extractCreateAclReports(IntPtr aclResultsPtr, int private List extractAclBindings(IntPtr aclBindingsPtr, int aclBindingsCnt) { - if (aclBindingsCnt == 0) { return new List {}; } + if (aclBindingsCnt == 0) { return new List { }; } IntPtr[] aclBindingsPtrArr = new IntPtr[aclBindingsCnt]; Marshal.Copy(aclBindingsPtr, aclBindingsPtrArr, 0, aclBindingsCnt); @@ -185,7 +186,7 @@ private DescribeAclsReport extractDescribeAclsReport(IntPtr resultPtr) return new DescribeAclsReport { Error = new Error(errCode, errString, false), - AclBindings = extractAclBindings(resultAcls, (int) resultAclCntPtr) + AclBindings = extractAclBindings(resultAcls, (int)resultAclCntPtr) }; } @@ -196,13 +197,14 @@ private List extractDeleteAclsReports(IntPtr resultPtr) IntPtr[] resultResponsesPtrArr = new IntPtr[(int)resultResponsesCntPtr]; Marshal.Copy(resultResponsesPtr, resultResponsesPtrArr, 0, (int)resultResponsesCntPtr); - return resultResponsesPtrArr.Select(resultResponsePtr => { + return resultResponsesPtrArr.Select(resultResponsePtr => + { var matchingAcls = Librdkafka.DeleteAcls_result_response_matching_acls( resultResponsePtr, out UIntPtr resultResponseAclCntPtr); - return new DeleteAclsReport + return new DeleteAclsReport { Error = new Error(Librdkafka.DeleteAcls_result_response_error(resultResponsePtr), false), - AclBindings = extractAclBindings(matchingAcls, (int) resultResponseAclCntPtr) + AclBindings = extractAclBindings(matchingAcls, (int)resultResponseAclCntPtr) }; }).ToList(); } @@ -230,12 +232,14 @@ private List extractListConsumerGroupOffsetsResu IntPtr[] resultGroupsPtrArr = new IntPtr[(int)resultCountPtr]; Marshal.Copy(resultGroupsPtr, resultGroupsPtrArr, 0, (int)resultCountPtr); - return resultGroupsPtrArr.Select(resultGroupPtr => { + return resultGroupsPtrArr.Select(resultGroupPtr => + { // Construct the TopicPartitionOffsetError list from internal list. var partitionsPtr = Librdkafka.group_result_partitions(resultGroupPtr); - return new ListConsumerGroupOffsetsReport { + return new ListConsumerGroupOffsetsReport + { Group = PtrToStringUTF8(Librdkafka.group_result_name(resultGroupPtr)), Error = new Error(Librdkafka.group_result_error(resultGroupPtr), false), Partitions = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitionsPtr), @@ -249,12 +253,14 @@ private List extractAlterConsumerGroupOffsetsRe IntPtr[] resultGroupsPtrArr = new IntPtr[(int)resultCountPtr]; Marshal.Copy(resultGroupsPtr, resultGroupsPtrArr, 0, (int)resultCountPtr); - return resultGroupsPtrArr.Select(resultGroupPtr => { + return resultGroupsPtrArr.Select(resultGroupPtr => + { // Construct the TopicPartitionOffsetError list from internal list. var partitionsPtr = Librdkafka.group_result_partitions(resultGroupPtr); - return new AlterConsumerGroupOffsetsReport { + return new AlterConsumerGroupOffsetsReport + { Group = PtrToStringUTF8(Librdkafka.group_result_name(resultGroupPtr)), Error = new Error(Librdkafka.group_result_error(resultGroupPtr), false), Partitions = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitionsPtr), @@ -275,13 +281,15 @@ private ListConsumerGroupsReport extractListConsumerGroupsResults(IntPtr resultP { IntPtr[] consumerGroupListingPtrArr = new IntPtr[(int)resultCountPtr]; Marshal.Copy(validResultsPtr, consumerGroupListingPtrArr, 0, (int)resultCountPtr); - result.Valid = consumerGroupListingPtrArr.Select(cglPtr => { + result.Valid = consumerGroupListingPtrArr.Select(cglPtr => + { return new ConsumerGroupListing() { GroupId = PtrToStringUTF8(Librdkafka.ConsumerGroupListing_group_id(cglPtr)), IsSimpleConsumerGroup = (int)Librdkafka.ConsumerGroupListing_is_simple_consumer_group(cglPtr) == 1, - State = Librdkafka.ConsumerGroupListing_state(cglPtr), + State = (ConsumerGroupState)Librdkafka.ConsumerGroupListing_state(cglPtr), + GroupType = (ConsumerGroupType)Librdkafka.ConsumerGroupListing_type(cglPtr), }; }).ToList(); } @@ -313,7 +321,8 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr IntPtr[] groupPtrArr = new IntPtr[(int)groupsCountPtr]; Marshal.Copy(groupsPtr, groupPtrArr, 0, (int)groupsCountPtr); - result.ConsumerGroupDescriptions = groupPtrArr.Select(groupPtr => { + result.ConsumerGroupDescriptions = groupPtrArr.Select(groupPtr => + { var coordinatorPtr = Librdkafka.ConsumerGroupDescription_coordinator(groupPtr); var coordinator = extractNode(coordinatorPtr); @@ -347,7 +356,7 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr var authorizedOperations = extractAuthorizedOperations( Librdkafka.ConsumerGroupDescription_authorized_operations(groupPtr, out UIntPtr authorizedOperationCount), - (int) authorizedOperationCount); + (int)authorizedOperationCount); var desc = new ConsumerGroupDescription() { @@ -374,42 +383,43 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr private DescribeUserScramCredentialsReport extractDescribeUserScramCredentialsResult(IntPtr eventPtr) { var report = new DescribeUserScramCredentialsReport(); - + var resultDescriptionsPtr = Librdkafka.DescribeUserScramCredentials_result_descriptions( eventPtr, out UIntPtr resultDescriptionCntPtr); IntPtr[] resultDescriptionsPtrArr = new IntPtr[(int)resultDescriptionCntPtr]; Marshal.Copy(resultDescriptionsPtr, resultDescriptionsPtrArr, 0, (int)resultDescriptionCntPtr); - + var descriptions = resultDescriptionsPtrArr.Select(resultDescriptionPtr => { var description = new UserScramCredentialsDescription(); - + var user = PtrToStringUTF8(Librdkafka.UserScramCredentialsDescription_user(resultDescriptionPtr)); IntPtr cError = Librdkafka.UserScramCredentialsDescription_error(resultDescriptionPtr); var error = new Error(cError, false); var scramCredentialInfos = new List(); - if (Librdkafka.error_code(cError)==0) + if (Librdkafka.error_code(cError) == 0) { int numCredentials = Librdkafka.UserScramCredentialsDescription_scramcredentialinfo_count(resultDescriptionPtr); - for(int j=0; j extractAlterUserScramCredentialsRe IntPtr[] resultResponsesPtrArr = new IntPtr[(int)resultResponsesCntPtr]; Marshal.Copy(resultResponsesPtr, resultResponsesPtrArr, 0, (int)resultResponsesCntPtr); - - return resultResponsesPtrArr.Select(resultResponsePtr => { - var user = + + return resultResponsesPtrArr.Select(resultResponsePtr => + { + var user = PtrToStringUTF8( Librdkafka.AlterUserScramCredentials_result_response_user(resultResponsePtr)); var error = new Error(Librdkafka.AlterUserScramCredentials_result_response_error(resultResponsePtr), false); - return new AlterUserScramCredentialsReport + return new AlterUserScramCredentialsReport { User = user, Error = error @@ -442,18 +453,19 @@ private List extractTopicPartitionInfo(IntPtr topicPartition { if (topicPartitionInfosCount == 0) return new List(); - + IntPtr[] topicPartitionInfos = new IntPtr[topicPartitionInfosCount]; Marshal.Copy(topicPartitionInfosPtr, topicPartitionInfos, 0, topicPartitionInfosCount); - return topicPartitionInfos.Select(topicPartitionInfoPtr => { + return topicPartitionInfos.Select(topicPartitionInfoPtr => + { return new TopicPartitionInfo { ISR = extractNodeList( Librdkafka.TopicPartitionInfo_isr(topicPartitionInfoPtr, out UIntPtr isrCount ), - (int) isrCount + (int)isrCount ), Leader = extractNode(Librdkafka.TopicPartitionInfo_leader(topicPartitionInfoPtr)), Partition = Librdkafka.TopicPartitionInfo_partition(topicPartitionInfoPtr), @@ -461,7 +473,7 @@ out UIntPtr isrCount Librdkafka.TopicPartitionInfo_replicas(topicPartitionInfoPtr, out UIntPtr replicasCount ), - (int) replicasCount + (int)replicasCount ), }; }).ToList(); @@ -493,8 +505,8 @@ private DescribeTopicsReport extractDescribeTopicsResults(IntPtr resultPtr) Librdkafka.TopicDescription_authorized_operations( topicPtr, out UIntPtr authorizedOperationCount), - (int) authorizedOperationCount); - + (int)authorizedOperationCount); + return new TopicDescription() { Name = topicName, @@ -505,7 +517,7 @@ private DescribeTopicsReport extractDescribeTopicsResults(IntPtr resultPtr) Partitions = extractTopicPartitionInfo( Librdkafka.TopicDescription_partitions(topicPtr, out UIntPtr partitionsCount), - (int) partitionsCount + (int)partitionsCount ), }; }).ToList(); @@ -523,15 +535,15 @@ private Uuid extractUuid(IntPtr uuidPtr) Librdkafka.Uuid_most_significant_bits(uuidPtr), Librdkafka.Uuid_least_significant_bits(uuidPtr) ); - } - + } + private Node extractNode(IntPtr nodePtr) { if (nodePtr == IntPtr.Zero) { return null; } - + return new Node() { Id = (int)Librdkafka.Node_id(nodePtr), @@ -558,11 +570,11 @@ private unsafe List extractAuthorizedOperations(IntPtr authorizedO { return null; } - + List authorizedOperations = new List(authorizedOperationCount); for (int i = 0; i < authorizedOperationCount; i++) { - AclOperation *aclOperationPtr = ((AclOperation *) authorizedOperationsPtr.ToPointer()) + i; + AclOperation* aclOperationPtr = ((AclOperation*)authorizedOperationsPtr.ToPointer()) + i; authorizedOperations.Add( *aclOperationPtr); } @@ -577,13 +589,13 @@ private DescribeClusterResult extractDescribeClusterResult(IntPtr resultPtr) var nodes = extractNodeList( Librdkafka.DescribeCluster_result_nodes(resultPtr, out UIntPtr nodeCount), - (int) nodeCount); + (int)nodeCount); List authorizedOperations = extractAuthorizedOperations( Librdkafka.DescribeCluster_result_authorized_operations( resultPtr, out UIntPtr authorizedOperationCount), - (int) authorizedOperationCount); + (int)authorizedOperationCount); return new DescribeClusterResult() { @@ -597,15 +609,15 @@ private DescribeClusterResult extractDescribeClusterResult(IntPtr resultPtr) private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr) { var resultInfosPtr = Librdkafka.ListOffsets_result_infos(resultPtr, out UIntPtr resulInfosCntPtr); - + IntPtr[] resultResponsesPtrArr = new IntPtr[(int)resulInfosCntPtr]; if ((int)resulInfosCntPtr > 0) { Marshal.Copy(resultInfosPtr, resultResponsesPtrArr, 0, (int)resulInfosCntPtr); - } - + } + ErrorCode reportErrorCode = ErrorCode.NoError; - var listOffsetsResultInfos = resultResponsesPtrArr.Select(resultResponsePtr => + var listOffsetsResultInfos = resultResponsesPtrArr.Select(resultResponsePtr => { long timestamp = Librdkafka.ListOffsetsResultInfo_timestamp(resultResponsePtr); IntPtr c_topic_partition = Librdkafka.ListOffsetsResultInfo_topic_partition(resultResponsePtr); @@ -679,7 +691,7 @@ private Task StartPollTask(CancellationToken ct) { if (errorCode != ErrorCode.NoError) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetException( new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; @@ -690,13 +702,13 @@ private Task StartPollTask(CancellationToken ct) if (result.Any(r => r.Error.IsError)) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetException( new CreateTopicsException(result))); } else { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetResult(result)); } } @@ -741,8 +753,8 @@ private Task StartPollTask(CancellationToken ct) } var result = extractDeleteGroupsReport(eventPtr); - - if(result.Any(r => r.Error.IsError)) + + if (result.Any(r => r.Error.IsError)) { Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetException( @@ -835,7 +847,7 @@ private Task StartPollTask(CancellationToken ct) else { Task.Run(() => - ((TaskCompletionSource>) adminClientResult).TrySetResult(result)); + ((TaskCompletionSource>)adminClientResult).TrySetResult(result)); } } break; @@ -863,12 +875,12 @@ private Task StartPollTask(CancellationToken ct) Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetResult( result.Select(a => new DeleteRecordsResult - { - Topic = a.Topic, - Partition = a.Partition, - Offset = a.Offset, - Error = a.Error // internal, not exposed in success case. - }).ToList())); + { + Topic = a.Topic, + Partition = a.Partition, + Offset = a.Offset, + Error = a.Error // internal, not exposed in success case. + }).ToList())); } } break; @@ -909,7 +921,7 @@ private Task StartPollTask(CancellationToken ct) { if (errorCode != ErrorCode.NoError) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetException( new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; @@ -921,13 +933,13 @@ private Task StartPollTask(CancellationToken ct) if (reports.Any(r => r.Error.IsError)) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetException( new CreateAclsException(reports))); } else { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetResult(null)); } } @@ -936,7 +948,7 @@ private Task StartPollTask(CancellationToken ct) { if (errorCode != ErrorCode.NoError) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetException( new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; @@ -946,7 +958,7 @@ private Task StartPollTask(CancellationToken ct) if (report.Error.IsError) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetException( new DescribeAclsException(report))); } @@ -956,16 +968,16 @@ private Task StartPollTask(CancellationToken ct) { AclBindings = report.AclBindings }; - Task.Run(() => + Task.Run(() => ((TaskCompletionSource)adminClientResult).TrySetResult(result)); } } - break; + break; case Librdkafka.EventType.DeleteAcls_Result: { if (errorCode != ErrorCode.NoError) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetException( new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; @@ -975,54 +987,54 @@ private Task StartPollTask(CancellationToken ct) if (reports.Any(r => r.Error.IsError)) { - Task.Run(() => + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetException( new DeleteAclsException(reports))); } else { var results = reports.Select(report => new DeleteAclsResult - { - AclBindings = report.AclBindings - }).ToList(); - Task.Run(() => + { + AclBindings = report.AclBindings + }).ToList(); + Task.Run(() => ((TaskCompletionSource>)adminClientResult).TrySetResult(results)); } } - break; + break; case Librdkafka.EventType.AlterConsumerGroupOffsets_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var results = extractAlterConsumerGroupOffsetsResults(eventPtr); + if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new AlterConsumerGroupOffsetsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetResult( + results + .Select(r => new AlterConsumerGroupOffsetsResult() + { + Group = r.Group, + Partitions = r.Partitions + }) + .ToList() + )); + } + break; } - var results = extractAlterConsumerGroupOffsetsResults(eventPtr); - if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) - { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetException( - new AlterConsumerGroupOffsetsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetResult( - results - .Select(r => new AlterConsumerGroupOffsetsResult() - { - Group = r.Group, - Partitions = r.Partitions - }) - .ToList() - )); - } - break; - } - + case Librdkafka.EventType.IncrementalAlterConfigs_Result: { if (errorCode != ErrorCode.NoError) @@ -1047,10 +1059,10 @@ private Task StartPollTask(CancellationToken ct) else { Task.Run(() => - ((TaskCompletionSource>) adminClientResult).TrySetResult( + ((TaskCompletionSource>)adminClientResult).TrySetResult( result.Select(r => new IncrementalAlterConfigsResult { - ConfigResource = r.ConfigResource, + ConfigResource = r.ConfigResource, }).ToList() )); } @@ -1058,200 +1070,200 @@ private Task StartPollTask(CancellationToken ct) break; case Librdkafka.EventType.ListConsumerGroupOffsets_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var results = extractListConsumerGroupOffsetsResults(eventPtr); + if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new ListConsumerGroupOffsetsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetResult( + results + .Select(r => new ListConsumerGroupOffsetsResult() { Group = r.Group, Partitions = r.Partitions }) + .ToList() + )); + } + break; } - var results = extractListConsumerGroupOffsetsResults(eventPtr); - if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) - { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetException( - new ListConsumerGroupOffsetsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource>)adminClientResult).TrySetResult( - results - .Select(r => new ListConsumerGroupOffsetsResult() { Group = r.Group, Partitions = r.Partitions }) - .ToList() - )); - } - break; - } case Librdkafka.EventType.ListConsumerGroups_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var results = extractListConsumerGroupsResults(eventPtr); + if (results.Errors.Count() != 0) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new ListConsumerGroupsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new ListConsumerGroupsResult() { Valid = results.Valid } + )); + } + break; } - var results = extractListConsumerGroupsResults(eventPtr); - if (results.Errors.Count() != 0) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new ListConsumerGroupsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult( - new ListConsumerGroupsResult() { Valid = results.Valid } - )); - } - break; - } case Librdkafka.EventType.DescribeConsumerGroups_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var results = extractDescribeConsumerGroupsResults(eventPtr); + if (results.ConsumerGroupDescriptions.Any(desc => desc.Error.IsError)) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new DescribeConsumerGroupsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new DescribeConsumerGroupsResult() { ConsumerGroupDescriptions = results.ConsumerGroupDescriptions } + )); + } + break; } - var results = extractDescribeConsumerGroupsResults(eventPtr); - if (results.ConsumerGroupDescriptions.Any(desc => desc.Error.IsError)) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new DescribeConsumerGroupsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult( - new DescribeConsumerGroupsResult() { ConsumerGroupDescriptions = results.ConsumerGroupDescriptions } - )); - } - break; - } case Librdkafka.EventType.DescribeUserScramCredentials_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; - } - var results = extractDescribeUserScramCredentialsResult(eventPtr); - if (results.UserScramCredentialsDescriptions.Any(desc => desc.Error.IsError)) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new DescribeUserScramCredentialsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult( - new DescribeUserScramCredentialsResult() { UserScramCredentialsDescriptions = results.UserScramCredentialsDescriptions } - )); - } - break; + } + var results = extractDescribeUserScramCredentialsResult(eventPtr); + if (results.UserScramCredentialsDescriptions.Any(desc => desc.Error.IsError)) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new DescribeUserScramCredentialsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new DescribeUserScramCredentialsResult() { UserScramCredentialsDescriptions = results.UserScramCredentialsDescriptions } + )); + } + break; - } + } case Librdkafka.EventType.AlterUserScramCredentials_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; - } - - var results = extractAlterUserScramCredentialsResults(eventPtr); + } - if (results.Any(r => r.Error.IsError)) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new AlterUserScramCredentialsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult(null)); + var results = extractAlterUserScramCredentialsResults(eventPtr); + + if (results.Any(r => r.Error.IsError)) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new AlterUserScramCredentialsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult(null)); + } + + break; } - - break; - } case Librdkafka.EventType.DescribeTopics_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var results = extractDescribeTopicsResults(eventPtr); + if (results.TopicDescriptions.Any(desc => desc.Error.IsError)) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new DescribeTopicsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new DescribeTopicsResult() { TopicDescriptions = results.TopicDescriptions } + )); + } + break; } - var results = extractDescribeTopicsResults(eventPtr); - if (results.TopicDescriptions.Any(desc => desc.Error.IsError)) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new DescribeTopicsException(results))); - } - else - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult( - new DescribeTopicsResult() { TopicDescriptions = results.TopicDescriptions } - )); - } - break; - } case Librdkafka.EventType.DescribeCluster_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + var res = extractDescribeClusterResult(eventPtr); + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult(res)); + break; } - var res = extractDescribeClusterResult(eventPtr); - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult(res)); - break; - } case Librdkafka.EventType.ListOffsets_Result: - { - if (errorCode != ErrorCode.NoError) { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); break; + } + ListOffsetsReport report = extractListOffsetsReport(eventPtr); + if (report.Error.IsError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new ListOffsetsException(report))); + } + else + { + var result = new ListOffsetsResult() { ResultInfos = report.ResultInfos }; + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + result)); + } + break; } - ListOffsetsReport report = extractListOffsetsReport(eventPtr); - if (report.Error.IsError) - { - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetException( - new ListOffsetsException(report))); - } - else - { - var result = new ListOffsetsResult() { ResultInfos = report.ResultInfos }; - Task.Run(() => - ((TaskCompletionSource)adminClientResult).TrySetResult( - result)); - } - break; - } default: // Should never happen. throw new InvalidOperationException($"Unknown result type: {type}"); @@ -1276,7 +1288,7 @@ private Task StartPollTask(CancellationToken ct) } } } - catch (OperationCanceledException) {} + catch (OperationCanceledException) { } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); @@ -1467,7 +1479,7 @@ private SafeKafkaHandle kafkaHandle /// internal AdminClient(Handle handle) { - Config.ExtractCancellationDelayMaxMs(new AdminClientConfig(), out this.cancellationDelayMaxMs); + Config.ExtractCancellationDelayMaxMs(new AdminClientConfig(), out this.cancellationDelayMaxMs); this.ownedClient = null; this.handle = handle; Init(); @@ -1490,9 +1502,9 @@ internal AdminClient(AdminClientBuilder builder) if (builder.StatisticsHandler != null) { producerBuilder.SetStatisticsHandler((_, stats) => builder.StatisticsHandler(this, stats)); } if (builder.OAuthBearerTokenRefreshHandler != null) { producerBuilder.SetOAuthBearerTokenRefreshHandler(builder.OAuthBearerTokenRefreshHandler); } this.ownedClient = producerBuilder.Build(); - + this.handle = new Handle - { + { Owner = this, LibrdkafkaHandle = ownedClient.Handle.LibrdkafkaHandle }; @@ -1768,7 +1780,8 @@ public Task DescribeClusterAsync(DescribeClusterOptions o /// /// Refer to /// - public Task ListOffsetsAsync(IEnumerable topicPartitionOffsetSpecs,ListOffsetsOptions options = null) { + public Task ListOffsetsAsync(IEnumerable topicPartitionOffsetSpecs, ListOffsetsOptions options = null) + { var completionSource = new TaskCompletionSource(); var gch = GCHandle.Alloc(completionSource); Handle.LibrdkafkaHandle.ListOffsets( diff --git a/src/Confluent.Kafka/ConsumerGroupType.cs b/src/Confluent.Kafka/ConsumerGroupType.cs new file mode 100644 index 000000000..61bc3e4b6 --- /dev/null +++ b/src/Confluent.Kafka/ConsumerGroupType.cs @@ -0,0 +1,40 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +namespace Confluent.Kafka +{ + /// + /// Enumerates the different consumer group types. + /// + public enum ConsumerGroupType : int + { + /// + /// Unknown + /// + Unknown = 0, + + /// + /// Consumer + /// + Consumer = 1, + + /// + /// Classic + /// + Classic = 2, + }; +} diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index 2f542f7ef..817f68013 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -57,7 +57,7 @@ internal enum AdminOp Any = 0, CreateTopics = 1, DeleteTopics = 2, - CreatePartitions= 3, + CreatePartitions = 3, AlterConfigs = 4, DescribeConfigs = 5, DeleteRecords = 6, @@ -300,6 +300,7 @@ static bool SetDelegates(Type nativeMethodsClass) _AdminOptions_set_require_stable_offsets = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_require_stable_offsets").CreateDelegate(typeof(Func)); _AdminOptions_set_include_authorized_operations = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_include_authorized_operations").CreateDelegate(typeof(Func)); _AdminOptions_set_match_consumer_group_states = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(Func)); + _AdminOptions_set_match_consumer_group_types = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_types").CreateDelegate(typeof(Func)); _AdminOptions_set_isolation_level = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_isolation_level").CreateDelegate(typeof(Func)); _NewTopic_new = (Func)methods.Single(m => m.Name == "rd_kafka_NewTopic_new").CreateDelegate(typeof(Func)); @@ -406,36 +407,38 @@ static bool SetDelegates(Type nativeMethodsClass) _ListConsumerGroupOffsets_result_groups = (_ListConsumerGroupOffsets_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets_result_groups").CreateDelegate(typeof(_ListConsumerGroupOffsets_result_groups_delegate)); _ListConsumerGroupOffsets = (_ListConsumerGroupOffsets_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets").CreateDelegate(typeof(_ListConsumerGroupOffsets_delegate)); - _ListConsumerGroups = (_ListConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups").CreateDelegate(typeof (_ListConsumerGroups_delegate)); - _ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof (_ConsumerGroupListing_group_id_delegate)); - _ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupListing_is_simple_consumer_group_delegate)); - _ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof (_ConsumerGroupListing_state_delegate)); - _ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof (_ListConsumerGroups_result_valid_delegate)); - _ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof (_ListConsumerGroups_result_errors_delegate)); - - _DescribeConsumerGroups = (_DescribeConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups").CreateDelegate(typeof (_DescribeConsumerGroups_delegate)); - _DescribeConsumerGroups_result_groups = (_DescribeConsumerGroups_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups_result_groups").CreateDelegate(typeof (_DescribeConsumerGroups_result_groups_delegate)); - _ConsumerGroupDescription_group_id = (_ConsumerGroupDescription_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_group_id").CreateDelegate(typeof (_ConsumerGroupDescription_group_id_delegate)); - _ConsumerGroupDescription_error = (_ConsumerGroupDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_error").CreateDelegate(typeof (_ConsumerGroupDescription_error_delegate)); - _ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupDescription_is_simple_consumer_group_delegate)); - _ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof (_ConsumerGroupDescription_partition_assignor_delegate)); - _ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof (_ConsumerGroupDescription_state_delegate)); - _ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof (_ConsumerGroupDescription_coordinator_delegate)); - _ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof (_ConsumerGroupDescription_member_count_delegate)); - _ConsumerGroupDescription_authorized_operations = (_ConsumerGroupDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_authorized_operations").CreateDelegate(typeof (_ConsumerGroupDescription_authorized_operations_delegate)); - _ConsumerGroupDescription_member = (_ConsumerGroupDescription_member_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member").CreateDelegate(typeof (_ConsumerGroupDescription_member_delegate)); - _MemberDescription_client_id = (_MemberDescription_client_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_client_id").CreateDelegate(typeof (_MemberDescription_client_id_delegate)); - _MemberDescription_group_instance_id = (_MemberDescription_group_instance_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_group_instance_id").CreateDelegate(typeof (_MemberDescription_group_instance_id_delegate)); - _MemberDescription_consumer_id = (_MemberDescription_consumer_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_consumer_id").CreateDelegate(typeof (_MemberDescription_consumer_id_delegate)); - _MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof (_MemberDescription_host_delegate)); - _MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof (_MemberDescription_assignment_delegate)); - _MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof (_MemberAssignment_partitions_delegate)); - _Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof (_Node_id_delegate)); - _Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof (_Node_host_delegate)); - _Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof (_Node_port_delegate)); - _Node_rack = (_Node_rack_delegate)methods.Single(m => m.Name == "rd_kafka_Node_rack").CreateDelegate(typeof (_Node_rack_delegate)); - - _DescribeUserScramCredentials = (_DescribeUserScramCredentials_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeUserScramCredentials").CreateDelegate(typeof (_DescribeUserScramCredentials_delegate)); + _ListConsumerGroups = (_ListConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups").CreateDelegate(typeof(_ListConsumerGroups_delegate)); + _ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof(_ConsumerGroupListing_group_id_delegate)); + _ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof(_ConsumerGroupListing_is_simple_consumer_group_delegate)); + _ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof(_ConsumerGroupListing_state_delegate)); + _ConsumerGroupListing_type = (_ConsumerGroupListing_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_type").CreateDelegate(typeof(_ConsumerGroupListing_type_delegate)); + + _ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof(_ListConsumerGroups_result_valid_delegate)); + _ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof(_ListConsumerGroups_result_errors_delegate)); + + _DescribeConsumerGroups = (_DescribeConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups").CreateDelegate(typeof(_DescribeConsumerGroups_delegate)); + _DescribeConsumerGroups_result_groups = (_DescribeConsumerGroups_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups_result_groups").CreateDelegate(typeof(_DescribeConsumerGroups_result_groups_delegate)); + _ConsumerGroupDescription_group_id = (_ConsumerGroupDescription_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_group_id").CreateDelegate(typeof(_ConsumerGroupDescription_group_id_delegate)); + _ConsumerGroupDescription_error = (_ConsumerGroupDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_error").CreateDelegate(typeof(_ConsumerGroupDescription_error_delegate)); + _ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof(_ConsumerGroupDescription_is_simple_consumer_group_delegate)); + _ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof(_ConsumerGroupDescription_partition_assignor_delegate)); + _ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof(_ConsumerGroupDescription_state_delegate)); + _ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof(_ConsumerGroupDescription_coordinator_delegate)); + _ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof(_ConsumerGroupDescription_member_count_delegate)); + _ConsumerGroupDescription_authorized_operations = (_ConsumerGroupDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_authorized_operations").CreateDelegate(typeof(_ConsumerGroupDescription_authorized_operations_delegate)); + _ConsumerGroupDescription_member = (_ConsumerGroupDescription_member_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member").CreateDelegate(typeof(_ConsumerGroupDescription_member_delegate)); + _MemberDescription_client_id = (_MemberDescription_client_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_client_id").CreateDelegate(typeof(_MemberDescription_client_id_delegate)); + _MemberDescription_group_instance_id = (_MemberDescription_group_instance_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_group_instance_id").CreateDelegate(typeof(_MemberDescription_group_instance_id_delegate)); + _MemberDescription_consumer_id = (_MemberDescription_consumer_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_consumer_id").CreateDelegate(typeof(_MemberDescription_consumer_id_delegate)); + _MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof(_MemberDescription_host_delegate)); + _MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof(_MemberDescription_assignment_delegate)); + _MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof(_MemberAssignment_partitions_delegate)); + _Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof(_Node_id_delegate)); + _Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof(_Node_host_delegate)); + _Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof(_Node_port_delegate)); + _Node_rack = (_Node_rack_delegate)methods.Single(m => m.Name == "rd_kafka_Node_rack").CreateDelegate(typeof(_Node_rack_delegate)); + + _DescribeUserScramCredentials = (_DescribeUserScramCredentials_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeUserScramCredentials").CreateDelegate(typeof(_DescribeUserScramCredentials_delegate)); _DescribeUserScramCredentials_result_descriptions = (_DescribeUserScramCredentials_result_descriptions_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeUserScramCredentials_result_descriptions").CreateDelegate(typeof(_DescribeUserScramCredentials_result_descriptions_delegate)); _UserScramCredentialsDescription_user = (_UserScramCredentialsDescription_user_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialsDescription_user").CreateDelegate(typeof(_UserScramCredentialsDescription_user_delegate)); _UserScramCredentialsDescription_error = (_UserScramCredentialsDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialsDescription_error").CreateDelegate(typeof(_UserScramCredentialsDescription_error_delegate)); @@ -444,39 +447,39 @@ static bool SetDelegates(Type nativeMethodsClass) _ScramCredentialInfo_mechanism = (_ScramCredentialInfo_mechanism_delegate)methods.Single(m => m.Name == "rd_kafka_ScramCredentialInfo_mechanism").CreateDelegate(typeof(_ScramCredentialInfo_mechanism_delegate)); _ScramCredentialInfo_iterations = (_ScramCredentialInfo_iterations_delegate)methods.Single(m => m.Name == "rd_kafka_ScramCredentialInfo_iterations").CreateDelegate(typeof(_ScramCredentialInfo_iterations_delegate)); - _UserScramCredentialUpsertion_new = (_UserScramCredentialUpsertion_new_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialUpsertion_new").CreateDelegate(typeof (_UserScramCredentialUpsertion_new_delegate)); - _UserScramCredentialDeletion_new = (_UserScramCredentialDeletion_new_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialDeletion_new").CreateDelegate(typeof (_UserScramCredentialDeletion_new_delegate)); - _UserScramCredentialAlteration_destroy = (_UserScramCredentialAlteration_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialAlteration_destroy").CreateDelegate(typeof (_UserScramCredentialAlteration_destroy_delegate)); - _AlterUserScramCredentials = (_AlterUserScramCredentials_delegate)methods.Single(m => m.Name == "rd_kafka_AlterUserScramCredentials").CreateDelegate(typeof (_AlterUserScramCredentials_delegate)); + _UserScramCredentialUpsertion_new = (_UserScramCredentialUpsertion_new_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialUpsertion_new").CreateDelegate(typeof(_UserScramCredentialUpsertion_new_delegate)); + _UserScramCredentialDeletion_new = (_UserScramCredentialDeletion_new_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialDeletion_new").CreateDelegate(typeof(_UserScramCredentialDeletion_new_delegate)); + _UserScramCredentialAlteration_destroy = (_UserScramCredentialAlteration_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_UserScramCredentialAlteration_destroy").CreateDelegate(typeof(_UserScramCredentialAlteration_destroy_delegate)); + _AlterUserScramCredentials = (_AlterUserScramCredentials_delegate)methods.Single(m => m.Name == "rd_kafka_AlterUserScramCredentials").CreateDelegate(typeof(_AlterUserScramCredentials_delegate)); _AlterUserScramCredentials_result_responses = (_AlterUserScramCredentials_result_responses_delegate)methods.Single(m => m.Name == "rd_kafka_AlterUserScramCredentials_result_responses").CreateDelegate(typeof(_AlterUserScramCredentials_result_responses_delegate)); _AlterUserScramCredentials_result_response_user = (_AlterUserScramCredentials_result_response_user_delegate)methods.Single(m => m.Name == "rd_kafka_AlterUserScramCredentials_result_response_user").CreateDelegate(typeof(_AlterUserScramCredentials_result_response_user_delegate)); _AlterUserScramCredentials_result_response_error = (_AlterUserScramCredentials_result_response_error_delegate)methods.Single(m => m.Name == "rd_kafka_AlterUserScramCredentials_result_response_error").CreateDelegate(typeof(_AlterUserScramCredentials_result_response_error_delegate)); - - _ListOffsets = (_ListOffsets_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsets").CreateDelegate(typeof (_ListOffsets_delegate)); - _ListOffsets_result_infos = (_ListOffsets_result_infos_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsets_result_infos").CreateDelegate(typeof (_ListOffsets_result_infos_delegate)); - _ListOffsetsResultInfo_timestamp = (_ListOffsetsResultInfo_timestamp_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsetsResultInfo_timestamp").CreateDelegate(typeof (_ListOffsetsResultInfo_timestamp_delegate)); - _ListOffsetsResultInfo_topic_partition = (_ListOffsetsResultInfo_topic_partition_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsetsResultInfo_topic_partition").CreateDelegate(typeof (_ListOffsetsResultInfo_topic_partition_delegate)); - - _DescribeTopics = (_DescribeTopics_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeTopics").CreateDelegate(typeof (_DescribeTopics_delegate)); - _DescribeTopics_result_topics = (_DescribeTopics_result_topics_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeTopics_result_topics").CreateDelegate(typeof (_DescribeTopics_result_topics_delegate)); - _TopicCollection_of_topic_names = (_TopicCollection_of_topic_names_delegate)methods.Single(m => m.Name == "rd_kafka_TopicCollection_of_topic_names").CreateDelegate(typeof (_TopicCollection_of_topic_names_delegate)); - _TopicCollection_destroy = (_TopicCollection_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_TopicCollection_destroy").CreateDelegate(typeof (_TopicCollection_destroy_delegate)); - _TopicDescription_error = (_TopicDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_error").CreateDelegate(typeof (_TopicDescription_error_delegate)); - _TopicDescription_name = (_TopicDescription_name_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_name").CreateDelegate(typeof (_TopicDescription_name_delegate)); - _TopicDescription_topic_id = (_TopicDescription_topic_id_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_topic_id").CreateDelegate(typeof (_TopicDescription_topic_id_delegate)); - _TopicDescription_partitions = (_TopicDescription_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_partitions").CreateDelegate(typeof (_TopicDescription_partitions_delegate)); - _TopicDescription_is_internal = (_TopicDescription_is_internal_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_is_internal").CreateDelegate(typeof (_TopicDescription_is_internal_delegate)); - _TopicDescription_authorized_operations = (_TopicDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_authorized_operations").CreateDelegate(typeof (_TopicDescription_authorized_operations_delegate)); - _TopicPartitionInfo_isr = (_TopicPartitionInfo_isr_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_isr").CreateDelegate(typeof (_TopicPartitionInfo_isr_delegate)); - _TopicPartitionInfo_leader = (_TopicPartitionInfo_leader_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_leader").CreateDelegate(typeof (_TopicPartitionInfo_leader_delegate)); - _TopicPartitionInfo_partition = (_TopicPartitionInfo_partition_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_partition").CreateDelegate(typeof (_TopicPartitionInfo_partition_delegate)); - _TopicPartitionInfo_replicas = (_TopicPartitionInfo_replicas_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_replicas").CreateDelegate(typeof (_TopicPartitionInfo_replicas_delegate)); - - _DescribeCluster = (_DescribeCluster_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster").CreateDelegate(typeof (_DescribeCluster_delegate)); - _DescribeCluster_result_nodes = (_DescribeCluster_result_nodes_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_nodes").CreateDelegate(typeof (_DescribeCluster_result_nodes_delegate)); - _DescribeCluster_result_authorized_operations = (_DescribeCluster_result_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_authorized_operations").CreateDelegate(typeof (_DescribeCluster_result_authorized_operations_delegate)); - _DescribeCluster_result_controller = (_DescribeCluster_result_controller_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_controller").CreateDelegate(typeof (_DescribeCluster_result_controller_delegate)); - _DescribeCluster_result_cluster_id = (_DescribeCluster_result_cluster_id_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_cluster_id").CreateDelegate(typeof (_DescribeCluster_result_cluster_id_delegate)); + + _ListOffsets = (_ListOffsets_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsets").CreateDelegate(typeof(_ListOffsets_delegate)); + _ListOffsets_result_infos = (_ListOffsets_result_infos_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsets_result_infos").CreateDelegate(typeof(_ListOffsets_result_infos_delegate)); + _ListOffsetsResultInfo_timestamp = (_ListOffsetsResultInfo_timestamp_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsetsResultInfo_timestamp").CreateDelegate(typeof(_ListOffsetsResultInfo_timestamp_delegate)); + _ListOffsetsResultInfo_topic_partition = (_ListOffsetsResultInfo_topic_partition_delegate)methods.Single(m => m.Name == "rd_kafka_ListOffsetsResultInfo_topic_partition").CreateDelegate(typeof(_ListOffsetsResultInfo_topic_partition_delegate)); + + _DescribeTopics = (_DescribeTopics_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeTopics").CreateDelegate(typeof(_DescribeTopics_delegate)); + _DescribeTopics_result_topics = (_DescribeTopics_result_topics_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeTopics_result_topics").CreateDelegate(typeof(_DescribeTopics_result_topics_delegate)); + _TopicCollection_of_topic_names = (_TopicCollection_of_topic_names_delegate)methods.Single(m => m.Name == "rd_kafka_TopicCollection_of_topic_names").CreateDelegate(typeof(_TopicCollection_of_topic_names_delegate)); + _TopicCollection_destroy = (_TopicCollection_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_TopicCollection_destroy").CreateDelegate(typeof(_TopicCollection_destroy_delegate)); + _TopicDescription_error = (_TopicDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_error").CreateDelegate(typeof(_TopicDescription_error_delegate)); + _TopicDescription_name = (_TopicDescription_name_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_name").CreateDelegate(typeof(_TopicDescription_name_delegate)); + _TopicDescription_topic_id = (_TopicDescription_topic_id_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_topic_id").CreateDelegate(typeof(_TopicDescription_topic_id_delegate)); + _TopicDescription_partitions = (_TopicDescription_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_partitions").CreateDelegate(typeof(_TopicDescription_partitions_delegate)); + _TopicDescription_is_internal = (_TopicDescription_is_internal_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_is_internal").CreateDelegate(typeof(_TopicDescription_is_internal_delegate)); + _TopicDescription_authorized_operations = (_TopicDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_authorized_operations").CreateDelegate(typeof(_TopicDescription_authorized_operations_delegate)); + _TopicPartitionInfo_isr = (_TopicPartitionInfo_isr_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_isr").CreateDelegate(typeof(_TopicPartitionInfo_isr_delegate)); + _TopicPartitionInfo_leader = (_TopicPartitionInfo_leader_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_leader").CreateDelegate(typeof(_TopicPartitionInfo_leader_delegate)); + _TopicPartitionInfo_partition = (_TopicPartitionInfo_partition_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_partition").CreateDelegate(typeof(_TopicPartitionInfo_partition_delegate)); + _TopicPartitionInfo_replicas = (_TopicPartitionInfo_replicas_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_replicas").CreateDelegate(typeof(_TopicPartitionInfo_replicas_delegate)); + + _DescribeCluster = (_DescribeCluster_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster").CreateDelegate(typeof(_DescribeCluster_delegate)); + _DescribeCluster_result_nodes = (_DescribeCluster_result_nodes_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_nodes").CreateDelegate(typeof(_DescribeCluster_result_nodes_delegate)); + _DescribeCluster_result_authorized_operations = (_DescribeCluster_result_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_authorized_operations").CreateDelegate(typeof(_DescribeCluster_result_authorized_operations_delegate)); + _DescribeCluster_result_controller = (_DescribeCluster_result_controller_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_controller").CreateDelegate(typeof(_DescribeCluster_result_controller_delegate)); + _DescribeCluster_result_cluster_id = (_DescribeCluster_result_cluster_id_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_cluster_id").CreateDelegate(typeof(_DescribeCluster_result_cluster_id_delegate)); _topic_result_error = (Func)methods.Single(m => m.Name == "rd_kafka_topic_result_error").CreateDelegate(typeof(Func)); _topic_result_error_string = (Func)methods.Single(m => m.Name == "rd_kafka_topic_result_error_string").CreateDelegate(typeof(Func)); @@ -1166,7 +1169,7 @@ internal static ErrorCode resume_partitions(IntPtr rk, IntPtr partitions) private static Func _seek; internal static ErrorCode seek(IntPtr rkt, int partition, long offset, IntPtr timeout_ms) => _seek(rkt, partition, offset, timeout_ms); - + private static Func _seek_partitions; internal static IntPtr seek_partitions(IntPtr rkt, IntPtr partitions, IntPtr timeout_ms) => _seek_partitions(rkt, partitions, timeout_ms); @@ -1197,18 +1200,18 @@ internal static unsafe ErrorCode produceva( IntPtr msg_opaque) { IntPtr topicStrPtr = Marshal.StringToHGlobalAnsi(topic); - + try { rd_kafka_vu* vus = stackalloc rd_kafka_vu[] { - new rd_kafka_vu() {vt = rd_kafka_vtype.Topic, data = new vu_data() {topic = topicStrPtr}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Partition, data = new vu_data() {partition = partition}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.MsgFlags, data = new vu_data() {msgflags = msgflags}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Value, data = new vu_data() {val = new ptr_and_size() {ptr = val, size = len}}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Key, data = new vu_data() {key = new ptr_and_size() {ptr = key, size = keylen}}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Timestamp, data = new vu_data() {timestamp = timestamp}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Headers, data = new vu_data() {headers = headers}}, - new rd_kafka_vu() {vt = rd_kafka_vtype.Opaque, data = new vu_data() {opaque = msg_opaque}}, + new rd_kafka_vu() { vt = rd_kafka_vtype.Topic, data = new vu_data() { topic = topicStrPtr } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Partition, data = new vu_data() { partition = partition } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.MsgFlags, data = new vu_data() { msgflags = msgflags } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Value, data = new vu_data() { val = new ptr_and_size() { ptr = val, size = len } } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Key, data = new vu_data() { key = new ptr_and_size() { ptr = key, size = keylen } } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Timestamp, data = new vu_data() { timestamp = timestamp } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Headers, data = new vu_data() { headers = headers } }, + new rd_kafka_vu() { vt = rd_kafka_vtype.Opaque, data = new vu_data() { opaque = msg_opaque } }, }; IntPtr result = _produceva(rk, vus, new IntPtr(8)); @@ -1340,6 +1343,10 @@ internal static IntPtr AdminOptions_set_include_authorized_operations( internal static IntPtr AdminOptions_set_match_consumer_group_states(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt) => _AdminOptions_set_match_consumer_group_states(options, states, statesCnt); + private static Func _AdminOptions_set_match_consumer_group_types; + internal static IntPtr AdminOptions_set_match_consumer_group_types(IntPtr options, ConsumerGroupType[] groupTypes, UIntPtr groupTypesCnt) + => _AdminOptions_set_match_consumer_group_types(options, groupTypes, groupTypesCnt); + private static Func _AdminOptions_set_isolation_level; internal static IntPtr AdminOptions_set_isolation_level(IntPtr options, IntPtr IsolationLevel) => _AdminOptions_set_isolation_level(options, IsolationLevel); @@ -1481,7 +1488,7 @@ internal static IntPtr ConfigEntry_name( IntPtr entry) => _ConfigEntry_name(entry); private static Func _ConfigEntry_value; - internal static IntPtr ConfigEntry_value ( + internal static IntPtr ConfigEntry_value( IntPtr entry) => _ConfigEntry_value(entry); private static Func _ConfigEntry_source; @@ -1501,7 +1508,7 @@ internal static IntPtr ConfigEntry_is_sensitive( IntPtr entry) => _ConfigEntry_is_sensitive(entry); private static Func _ConfigEntry_is_synonym; - internal static IntPtr ConfigEntry_is_synonym ( + internal static IntPtr ConfigEntry_is_synonym( IntPtr entry) => _ConfigEntry_is_synonym(entry); private delegate IntPtr _ConfigEntry_synonyms_delegate(IntPtr entry, out UIntPtr cntp); @@ -1543,7 +1550,7 @@ internal static ErrorCode ConfigResource_delete_config( private static Func _ConfigResource_add_incremental_config; internal static IntPtr ConfigResource_add_incremental_config( IntPtr config, - string name, + string name, AlterConfigOpType optype, string value) => _ConfigResource_add_incremental_config(config, name, optype, value); @@ -1572,7 +1579,7 @@ internal static IntPtr ConfigResource_error_string( private static Action _AlterConfigs; - internal static void AlterConfigs ( + internal static void AlterConfigs( IntPtr rk, IntPtr[] configs, UIntPtr config_cnt, @@ -1584,9 +1591,9 @@ internal static void AlterConfigs ( internal static IntPtr AlterConfigs_result_resources( IntPtr result, out UIntPtr cntp) => _AlterConfigs_result_resources(result, out cntp); - + private static Action _IncrementalAlterConfigs; - internal static void IncrementalAlterConfigs ( + internal static void IncrementalAlterConfigs( IntPtr rk, IntPtr[] configs, UIntPtr config_cnt, @@ -1600,7 +1607,7 @@ internal static IntPtr IncrementalAlterConfigs_result_resources( out UIntPtr cntp) => _IncrementalAlterConfigs_result_resources(result, out cntp); private static Action _DescribeConfigs; - internal static void DescribeConfigs ( + internal static void DescribeConfigs( IntPtr rk, IntPtr[] configs, UIntPtr config_cnt, @@ -1663,7 +1670,7 @@ internal static IntPtr DeleteConsumerGroupOffsets_result_groups( // ACLs // private delegate IntPtr _AclBinding_new_delegate(ResourceType restype, string name, ResourcePatternType resource_pattern_type, string principal, string host, AclOperation operation, AclPermissionType permission_type, StringBuilder errstr, UIntPtr errstr_size); - private static _AclBinding_new_delegate _AclBinding_new; + private static _AclBinding_new_delegate _AclBinding_new; internal static IntPtr AclBinding_new( ResourceType restype, string name, @@ -1677,7 +1684,7 @@ UIntPtr errstr_size ) => _AclBinding_new(restype, name, resource_pattern_type, principal, host, operation, permission_type, errstr, errstr_size); private delegate IntPtr _AclBindingFilter_new_delegate(ResourceType restype, string name, ResourcePatternType resource_pattern_type, string principal, string host, AclOperation operation, AclPermissionType permission_type, StringBuilder errstr, UIntPtr errstr_size); - private static _AclBindingFilter_new_delegate _AclBindingFilter_new; + private static _AclBindingFilter_new_delegate _AclBindingFilter_new; internal static IntPtr AclBindingFilter_new( ResourceType restype, string name, @@ -1863,123 +1870,129 @@ internal static IntPtr ListConsumerGroupOffsets_result_groups( out UIntPtr groupsTopicPartitionsCount ) => _ListConsumerGroupOffsets_result_groups(resultResponse, out groupsTopicPartitionsCount); - private delegate void _ListConsumerGroups_delegate(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr); - private static _ListConsumerGroups_delegate _ListConsumerGroups; - internal static void ListConsumerGroups(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr) - => _ListConsumerGroups(handle, optionsPtr, resultQueuePtr); - - private delegate IntPtr _ConsumerGroupListing_group_id_delegate(IntPtr grplist); - private static _ConsumerGroupListing_group_id_delegate _ConsumerGroupListing_group_id; - internal static IntPtr ConsumerGroupListing_group_id(IntPtr grplist) - => _ConsumerGroupListing_group_id(grplist); - - private delegate IntPtr _ConsumerGroupListing_is_simple_consumer_group_delegate(IntPtr grplist); - private static _ConsumerGroupListing_is_simple_consumer_group_delegate _ConsumerGroupListing_is_simple_consumer_group; - internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist) - => _ConsumerGroupListing_is_simple_consumer_group(grplist); - - private delegate ConsumerGroupState _ConsumerGroupListing_state_delegate(IntPtr grplist); - private static _ConsumerGroupListing_state_delegate _ConsumerGroupListing_state; - internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist) - => _ConsumerGroupListing_state(grplist); - - private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp); - private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid; - internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp) - => _ListConsumerGroups_result_valid(result, out cntp); - - private delegate IntPtr _ListConsumerGroups_result_errors_delegate(IntPtr result, out UIntPtr cntp); - private static _ListConsumerGroups_result_errors_delegate _ListConsumerGroups_result_errors; - internal static IntPtr ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp) - => _ListConsumerGroups_result_errors(result, out cntp); - - private delegate void _DescribeConsumerGroups_delegate( - IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - private static _DescribeConsumerGroups_delegate _DescribeConsumerGroups; - internal static void DescribeConsumerGroups( - IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr) - => _DescribeConsumerGroups(handle, groups, groupsCnt, optionsPtr, resultQueuePtr); - - private delegate IntPtr _DescribeConsumerGroups_result_groups_delegate(IntPtr result, out UIntPtr cntp); - private static _DescribeConsumerGroups_result_groups_delegate _DescribeConsumerGroups_result_groups; - internal static IntPtr DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp) - => _DescribeConsumerGroups_result_groups(result, out cntp); - - private delegate IntPtr _ConsumerGroupDescription_group_id_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_group_id_delegate _ConsumerGroupDescription_group_id; - internal static IntPtr ConsumerGroupDescription_group_id(IntPtr grpdesc) - => _ConsumerGroupDescription_group_id(grpdesc); - - private delegate IntPtr _ConsumerGroupDescription_error_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_error_delegate _ConsumerGroupDescription_error; - internal static IntPtr ConsumerGroupDescription_error(IntPtr grpdesc) - => _ConsumerGroupDescription_error(grpdesc); - - private delegate int _ConsumerGroupDescription_is_simple_consumer_group_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_is_simple_consumer_group_delegate _ConsumerGroupDescription_is_simple_consumer_group; - internal static int ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc) - => _ConsumerGroupDescription_is_simple_consumer_group(grpdesc); - - private delegate IntPtr _ConsumerGroupDescription_partition_assignor_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_partition_assignor_delegate _ConsumerGroupDescription_partition_assignor; - internal static IntPtr ConsumerGroupDescription_partition_assignor(IntPtr grpdesc) - => _ConsumerGroupDescription_partition_assignor(grpdesc); - - private delegate ConsumerGroupState _ConsumerGroupDescription_state_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_state_delegate _ConsumerGroupDescription_state; - internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdesc) { + private delegate void _ListConsumerGroups_delegate(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _ListConsumerGroups_delegate _ListConsumerGroups; + internal static void ListConsumerGroups(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr) + => _ListConsumerGroups(handle, optionsPtr, resultQueuePtr); + + private delegate IntPtr _ConsumerGroupListing_group_id_delegate(IntPtr grplist); + private static _ConsumerGroupListing_group_id_delegate _ConsumerGroupListing_group_id; + internal static IntPtr ConsumerGroupListing_group_id(IntPtr grplist) + => _ConsumerGroupListing_group_id(grplist); + + private delegate IntPtr _ConsumerGroupListing_is_simple_consumer_group_delegate(IntPtr grplist); + private static _ConsumerGroupListing_is_simple_consumer_group_delegate _ConsumerGroupListing_is_simple_consumer_group; + internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist) + => _ConsumerGroupListing_is_simple_consumer_group(grplist); + + private delegate ConsumerGroupState _ConsumerGroupListing_state_delegate(IntPtr grplist); + private static _ConsumerGroupListing_state_delegate _ConsumerGroupListing_state; + internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist) + => _ConsumerGroupListing_state(grplist); + + private delegate ConsumerGroupType _ConsumerGroupListing_type_delegate(IntPtr grplist); + private static _ConsumerGroupListing_type_delegate _ConsumerGroupListing_type; + internal static ConsumerGroupType ConsumerGroupListing_type(IntPtr grplist) + => _ConsumerGroupListing_type(grplist); + + private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp); + private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid; + internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp) + => _ListConsumerGroups_result_valid(result, out cntp); + + private delegate IntPtr _ListConsumerGroups_result_errors_delegate(IntPtr result, out UIntPtr cntp); + private static _ListConsumerGroups_result_errors_delegate _ListConsumerGroups_result_errors; + internal static IntPtr ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp) + => _ListConsumerGroups_result_errors(result, out cntp); + + private delegate void _DescribeConsumerGroups_delegate( + IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _DescribeConsumerGroups_delegate _DescribeConsumerGroups; + internal static void DescribeConsumerGroups( + IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr) + => _DescribeConsumerGroups(handle, groups, groupsCnt, optionsPtr, resultQueuePtr); + + private delegate IntPtr _DescribeConsumerGroups_result_groups_delegate(IntPtr result, out UIntPtr cntp); + private static _DescribeConsumerGroups_result_groups_delegate _DescribeConsumerGroups_result_groups; + internal static IntPtr DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp) + => _DescribeConsumerGroups_result_groups(result, out cntp); + + private delegate IntPtr _ConsumerGroupDescription_group_id_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_group_id_delegate _ConsumerGroupDescription_group_id; + internal static IntPtr ConsumerGroupDescription_group_id(IntPtr grpdesc) + => _ConsumerGroupDescription_group_id(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_error_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_error_delegate _ConsumerGroupDescription_error; + internal static IntPtr ConsumerGroupDescription_error(IntPtr grpdesc) + => _ConsumerGroupDescription_error(grpdesc); + + private delegate int _ConsumerGroupDescription_is_simple_consumer_group_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_is_simple_consumer_group_delegate _ConsumerGroupDescription_is_simple_consumer_group; + internal static int ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc) + => _ConsumerGroupDescription_is_simple_consumer_group(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_partition_assignor_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_partition_assignor_delegate _ConsumerGroupDescription_partition_assignor; + internal static IntPtr ConsumerGroupDescription_partition_assignor(IntPtr grpdesc) + => _ConsumerGroupDescription_partition_assignor(grpdesc); + + private delegate ConsumerGroupState _ConsumerGroupDescription_state_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_state_delegate _ConsumerGroupDescription_state; + internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdesc) + { return _ConsumerGroupDescription_state(grpdesc); - } - - private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator; - internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc) - => _ConsumerGroupDescription_coordinator(grpdesc); - - private delegate IntPtr _ConsumerGroupDescription_member_count_delegate(IntPtr grpdesc); - private static _ConsumerGroupDescription_member_count_delegate _ConsumerGroupDescription_member_count; - internal static IntPtr ConsumerGroupDescription_member_count(IntPtr grpdesc) - => _ConsumerGroupDescription_member_count(grpdesc); - - private delegate IntPtr _ConsumerGroupDescription_authorized_operations_delegate(IntPtr grpdesc, out UIntPtr cntp); - private static _ConsumerGroupDescription_authorized_operations_delegate _ConsumerGroupDescription_authorized_operations; - internal static IntPtr ConsumerGroupDescription_authorized_operations(IntPtr grpdesc, out UIntPtr cntp) - => _ConsumerGroupDescription_authorized_operations(grpdesc, out cntp); - - private delegate IntPtr _ConsumerGroupDescription_member_delegate(IntPtr grpdesc, IntPtr idx); - private static _ConsumerGroupDescription_member_delegate _ConsumerGroupDescription_member; - internal static IntPtr ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx) - => _ConsumerGroupDescription_member(grpdesc, idx); - - private delegate IntPtr _MemberDescription_client_id_delegate(IntPtr member); - private static _MemberDescription_client_id_delegate _MemberDescription_client_id; - internal static IntPtr MemberDescription_client_id(IntPtr member) - => _MemberDescription_client_id(member); - - private delegate IntPtr _MemberDescription_group_instance_id_delegate(IntPtr member); - private static _MemberDescription_group_instance_id_delegate _MemberDescription_group_instance_id; - internal static IntPtr MemberDescription_group_instance_id(IntPtr member) - => _MemberDescription_group_instance_id(member); - - private delegate IntPtr _MemberDescription_consumer_id_delegate(IntPtr member); - private static _MemberDescription_consumer_id_delegate _MemberDescription_consumer_id; - internal static IntPtr MemberDescription_consumer_id(IntPtr member) - => _MemberDescription_consumer_id(member); - - private delegate IntPtr _MemberDescription_host_delegate(IntPtr member); - private static _MemberDescription_host_delegate _MemberDescription_host; - internal static IntPtr MemberDescription_host(IntPtr member) - => _MemberDescription_host(member); - - private delegate IntPtr _MemberDescription_assignment_delegate(IntPtr member); - private static _MemberDescription_assignment_delegate _MemberDescription_assignment; - internal static IntPtr MemberDescription_assignment(IntPtr member) - => _MemberDescription_assignment(member); - - private delegate IntPtr _MemberAssignment_partitions_delegate(IntPtr assignment); - private static _MemberAssignment_partitions_delegate _MemberAssignment_partitions; - internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment) - => _MemberAssignment_partitions(assignment); + } + + private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator; + internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc) + => _ConsumerGroupDescription_coordinator(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_member_count_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_member_count_delegate _ConsumerGroupDescription_member_count; + internal static IntPtr ConsumerGroupDescription_member_count(IntPtr grpdesc) + => _ConsumerGroupDescription_member_count(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_authorized_operations_delegate(IntPtr grpdesc, out UIntPtr cntp); + private static _ConsumerGroupDescription_authorized_operations_delegate _ConsumerGroupDescription_authorized_operations; + internal static IntPtr ConsumerGroupDescription_authorized_operations(IntPtr grpdesc, out UIntPtr cntp) + => _ConsumerGroupDescription_authorized_operations(grpdesc, out cntp); + + private delegate IntPtr _ConsumerGroupDescription_member_delegate(IntPtr grpdesc, IntPtr idx); + private static _ConsumerGroupDescription_member_delegate _ConsumerGroupDescription_member; + internal static IntPtr ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx) + => _ConsumerGroupDescription_member(grpdesc, idx); + + private delegate IntPtr _MemberDescription_client_id_delegate(IntPtr member); + private static _MemberDescription_client_id_delegate _MemberDescription_client_id; + internal static IntPtr MemberDescription_client_id(IntPtr member) + => _MemberDescription_client_id(member); + + private delegate IntPtr _MemberDescription_group_instance_id_delegate(IntPtr member); + private static _MemberDescription_group_instance_id_delegate _MemberDescription_group_instance_id; + internal static IntPtr MemberDescription_group_instance_id(IntPtr member) + => _MemberDescription_group_instance_id(member); + + private delegate IntPtr _MemberDescription_consumer_id_delegate(IntPtr member); + private static _MemberDescription_consumer_id_delegate _MemberDescription_consumer_id; + internal static IntPtr MemberDescription_consumer_id(IntPtr member) + => _MemberDescription_consumer_id(member); + + private delegate IntPtr _MemberDescription_host_delegate(IntPtr member); + private static _MemberDescription_host_delegate _MemberDescription_host; + internal static IntPtr MemberDescription_host(IntPtr member) + => _MemberDescription_host(member); + + private delegate IntPtr _MemberDescription_assignment_delegate(IntPtr member); + private static _MemberDescription_assignment_delegate _MemberDescription_assignment; + internal static IntPtr MemberDescription_assignment(IntPtr member) + => _MemberDescription_assignment(member); + + private delegate IntPtr _MemberAssignment_partitions_delegate(IntPtr assignment); + private static _MemberAssignment_partitions_delegate _MemberAssignment_partitions; + internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment) + => _MemberAssignment_partitions(assignment); private delegate IntPtr _Node_id_delegate(IntPtr node); private static _Node_id_delegate _Node_id; @@ -2000,23 +2013,23 @@ internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment) private delegate void _ListOffsets_delegate(IntPtr handle, IntPtr topic_partition_list, IntPtr options, IntPtr resultQueuePtr); private static _ListOffsets_delegate _ListOffsets; internal static void ListOffsets(IntPtr handle, IntPtr topic_partition_list, IntPtr options, IntPtr resultQueuePtr) - => _ListOffsets(handle,topic_partition_list,options, resultQueuePtr); - - private delegate IntPtr _ListOffsets_result_infos_delegate(IntPtr resultPtr,out UIntPtr cntp); + => _ListOffsets(handle, topic_partition_list, options, resultQueuePtr); + + private delegate IntPtr _ListOffsets_result_infos_delegate(IntPtr resultPtr, out UIntPtr cntp); private static _ListOffsets_result_infos_delegate _ListOffsets_result_infos; - internal static IntPtr ListOffsets_result_infos(IntPtr resultPtr,out UIntPtr cntp) + internal static IntPtr ListOffsets_result_infos(IntPtr resultPtr, out UIntPtr cntp) => _ListOffsets_result_infos(resultPtr, out cntp); - + private delegate long _ListOffsetsResultInfo_timestamp_delegate(IntPtr element); private static _ListOffsetsResultInfo_timestamp_delegate _ListOffsetsResultInfo_timestamp; internal static long ListOffsetsResultInfo_timestamp(IntPtr element) => _ListOffsetsResultInfo_timestamp(element); - + private delegate IntPtr _ListOffsetsResultInfo_topic_partition_delegate(IntPtr element); private static _ListOffsetsResultInfo_topic_partition_delegate _ListOffsetsResultInfo_topic_partition; internal static IntPtr ListOffsetsResultInfo_topic_partition(IntPtr element) => _ListOffsetsResultInfo_topic_partition(element); - + private static Func _topic_result_error; internal static ErrorCode topic_result_error(IntPtr topicres) => _topic_result_error(topicres); @@ -2034,18 +2047,18 @@ internal static IntPtr ListOffsetsResultInfo_topic_partition(IntPtr element) private static Func _group_result_partitions; internal static IntPtr group_result_partitions(IntPtr groupres) => _group_result_partitions(groupres); - + // // User SCRAM credentials // - + private delegate void _DescribeUserScramCredentials_delegate( IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] users, UIntPtr usersCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); private static _DescribeUserScramCredentials_delegate _DescribeUserScramCredentials; internal static void DescribeUserScramCredentials( IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] users, UIntPtr usersCnt, IntPtr optionsPtr, IntPtr resultQueuePtr) => _DescribeUserScramCredentials(handle, users, usersCnt, optionsPtr, resultQueuePtr); - + private delegate IntPtr _DescribeUserScramCredentials_result_descriptions_delegate( IntPtr event_result, out UIntPtr cntp); private static _DescribeUserScramCredentials_result_descriptions_delegate _DescribeUserScramCredentials_result_descriptions; @@ -2079,7 +2092,7 @@ private delegate IntPtr _UserScramCredentialsDescription_scramcredentialinfo_del private static _UserScramCredentialsDescription_scramcredentialinfo_delegate _UserScramCredentialsDescription_scramcredentialinfo; internal static IntPtr UserScramCredentialsDescription_scramcredentialinfo( IntPtr description, int i) - => _UserScramCredentialsDescription_scramcredentialinfo(description,i); + => _UserScramCredentialsDescription_scramcredentialinfo(description, i); private delegate ScramMechanism _ScramCredentialInfo_mechanism_delegate( IntPtr scramcredentialinfo); @@ -2109,8 +2122,8 @@ private delegate IntPtr _UserScramCredentialDeletion_new_delegate( string user, ScramMechanism mechanism); private static _UserScramCredentialDeletion_new_delegate _UserScramCredentialDeletion_new; internal static IntPtr UserScramCredentialDeletion_new( - string user,ScramMechanism mechanism) - => _UserScramCredentialDeletion_new(user,mechanism); + string user, ScramMechanism mechanism) + => _UserScramCredentialDeletion_new(user, mechanism); private delegate void _UserScramCredentialAlteration_destroy_delegate( IntPtr alteration); @@ -2118,7 +2131,7 @@ private delegate void _UserScramCredentialAlteration_destroy_delegate( internal static void UserScramCredentialAlteration_destroy( IntPtr alteration) => _UserScramCredentialAlteration_destroy(alteration); - + private delegate ErrorCode _AlterUserScramCredentials_delegate( IntPtr handle, IntPtr[] alterations, UIntPtr alterationsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); @@ -2170,88 +2183,88 @@ private delegate void _TopicCollection_destroy_delegate( internal static void TopicCollection_destroy(IntPtr topic_collection) => _TopicCollection_destroy(topic_collection); - private delegate IntPtr _DescribeTopics_result_topics_delegate(IntPtr result, out UIntPtr cntp); - private static _DescribeTopics_result_topics_delegate _DescribeTopics_result_topics; - internal static IntPtr DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp) - => _DescribeTopics_result_topics(result, out cntp); - - private delegate IntPtr _TopicDescription_error_delegate(IntPtr topicdesc); - private static _TopicDescription_error_delegate _TopicDescription_error; - internal static IntPtr TopicDescription_error(IntPtr topicdesc) - => _TopicDescription_error(topicdesc); - - private delegate IntPtr _TopicDescription_name_delegate(IntPtr topicdesc); - private static _TopicDescription_name_delegate _TopicDescription_name; - internal static IntPtr TopicDescription_name(IntPtr topicdesc) - => _TopicDescription_name(topicdesc); - - - private delegate IntPtr _TopicDescription_topic_id_delegate(IntPtr topicdesc); - private static _TopicDescription_topic_id_delegate _TopicDescription_topic_id; - internal static IntPtr TopicDescription_topic_id(IntPtr topicdesc) - => _TopicDescription_topic_id(topicdesc); - - private delegate IntPtr _TopicDescription_partitions_delegate(IntPtr topicdesc, out UIntPtr cntp); - private static _TopicDescription_partitions_delegate _TopicDescription_partitions; - internal static IntPtr TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp) - => _TopicDescription_partitions(topicdesc, out cntp); - - private delegate IntPtr _TopicDescription_is_internal_delegate(IntPtr topicdesc); - private static _TopicDescription_is_internal_delegate _TopicDescription_is_internal; - internal static IntPtr TopicDescription_is_internal(IntPtr topicdesc) - => _TopicDescription_is_internal(topicdesc); - - private delegate IntPtr _TopicDescription_authorized_operations_delegate(IntPtr topicdesc, out UIntPtr cntp); - private static _TopicDescription_authorized_operations_delegate _TopicDescription_authorized_operations; - internal static IntPtr TopicDescription_authorized_operations(IntPtr topicdesc, out UIntPtr cntp) - => _TopicDescription_authorized_operations(topicdesc, out cntp); - - private delegate IntPtr _TopicPartitionInfo_isr_delegate(IntPtr topic_partition_info, out UIntPtr cntp); - private static _TopicPartitionInfo_isr_delegate _TopicPartitionInfo_isr; - internal static IntPtr TopicPartitionInfo_isr(IntPtr topic_partition_info, out UIntPtr cntp) - => _TopicPartitionInfo_isr(topic_partition_info, out cntp); - - private delegate IntPtr _TopicPartitionInfo_leader_delegate(IntPtr topic_partition_info); - private static _TopicPartitionInfo_leader_delegate _TopicPartitionInfo_leader; - internal static IntPtr TopicPartitionInfo_leader(IntPtr topic_partition_info) - => _TopicPartitionInfo_leader(topic_partition_info); - - private delegate int _TopicPartitionInfo_partition_delegate(IntPtr topic_partition_info); - private static _TopicPartitionInfo_partition_delegate _TopicPartitionInfo_partition; - internal static int TopicPartitionInfo_partition(IntPtr topic_partition_info) - => _TopicPartitionInfo_partition(topic_partition_info); - - private delegate IntPtr _TopicPartitionInfo_replicas_delegate(IntPtr topic_partition_info, out UIntPtr cntp); - private static _TopicPartitionInfo_replicas_delegate _TopicPartitionInfo_replicas; - internal static IntPtr TopicPartitionInfo_replicas(IntPtr topic_partition_info, out UIntPtr cntp) - => _TopicPartitionInfo_replicas(topic_partition_info, out cntp); - - private delegate void _DescribeCluster_delegate( - IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr); - private static _DescribeCluster_delegate _DescribeCluster; - internal static void DescribeCluster( - IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr) - => _DescribeCluster(handle, optionsPtr, resultQueuePtr); - - private delegate IntPtr _DescribeCluster_result_nodes_delegate(IntPtr result, out UIntPtr cntp); - private static _DescribeCluster_result_nodes_delegate _DescribeCluster_result_nodes; - internal static IntPtr DescribeCluster_result_nodes(IntPtr result, out UIntPtr cntp) - => _DescribeCluster_result_nodes(result, out cntp); - - private delegate IntPtr _DescribeCluster_result_authorized_operations_delegate(IntPtr result, out UIntPtr cntp); - private static _DescribeCluster_result_authorized_operations_delegate _DescribeCluster_result_authorized_operations; - internal static IntPtr DescribeCluster_result_authorized_operations(IntPtr result, out UIntPtr cntp) - => _DescribeCluster_result_authorized_operations(result, out cntp); - - private delegate IntPtr _DescribeCluster_result_controller_delegate(IntPtr result); - private static _DescribeCluster_result_controller_delegate _DescribeCluster_result_controller; - internal static IntPtr DescribeCluster_result_controller(IntPtr result) - => _DescribeCluster_result_controller(result); - - private delegate IntPtr _DescribeCluster_result_cluster_id_delegate(IntPtr result); - private static _DescribeCluster_result_cluster_id_delegate _DescribeCluster_result_cluster_id; - internal static IntPtr DescribeCluster_result_cluster_id(IntPtr result) - => _DescribeCluster_result_cluster_id(result); + private delegate IntPtr _DescribeTopics_result_topics_delegate(IntPtr result, out UIntPtr cntp); + private static _DescribeTopics_result_topics_delegate _DescribeTopics_result_topics; + internal static IntPtr DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp) + => _DescribeTopics_result_topics(result, out cntp); + + private delegate IntPtr _TopicDescription_error_delegate(IntPtr topicdesc); + private static _TopicDescription_error_delegate _TopicDescription_error; + internal static IntPtr TopicDescription_error(IntPtr topicdesc) + => _TopicDescription_error(topicdesc); + + private delegate IntPtr _TopicDescription_name_delegate(IntPtr topicdesc); + private static _TopicDescription_name_delegate _TopicDescription_name; + internal static IntPtr TopicDescription_name(IntPtr topicdesc) + => _TopicDescription_name(topicdesc); + + + private delegate IntPtr _TopicDescription_topic_id_delegate(IntPtr topicdesc); + private static _TopicDescription_topic_id_delegate _TopicDescription_topic_id; + internal static IntPtr TopicDescription_topic_id(IntPtr topicdesc) + => _TopicDescription_topic_id(topicdesc); + + private delegate IntPtr _TopicDescription_partitions_delegate(IntPtr topicdesc, out UIntPtr cntp); + private static _TopicDescription_partitions_delegate _TopicDescription_partitions; + internal static IntPtr TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp) + => _TopicDescription_partitions(topicdesc, out cntp); + + private delegate IntPtr _TopicDescription_is_internal_delegate(IntPtr topicdesc); + private static _TopicDescription_is_internal_delegate _TopicDescription_is_internal; + internal static IntPtr TopicDescription_is_internal(IntPtr topicdesc) + => _TopicDescription_is_internal(topicdesc); + + private delegate IntPtr _TopicDescription_authorized_operations_delegate(IntPtr topicdesc, out UIntPtr cntp); + private static _TopicDescription_authorized_operations_delegate _TopicDescription_authorized_operations; + internal static IntPtr TopicDescription_authorized_operations(IntPtr topicdesc, out UIntPtr cntp) + => _TopicDescription_authorized_operations(topicdesc, out cntp); + + private delegate IntPtr _TopicPartitionInfo_isr_delegate(IntPtr topic_partition_info, out UIntPtr cntp); + private static _TopicPartitionInfo_isr_delegate _TopicPartitionInfo_isr; + internal static IntPtr TopicPartitionInfo_isr(IntPtr topic_partition_info, out UIntPtr cntp) + => _TopicPartitionInfo_isr(topic_partition_info, out cntp); + + private delegate IntPtr _TopicPartitionInfo_leader_delegate(IntPtr topic_partition_info); + private static _TopicPartitionInfo_leader_delegate _TopicPartitionInfo_leader; + internal static IntPtr TopicPartitionInfo_leader(IntPtr topic_partition_info) + => _TopicPartitionInfo_leader(topic_partition_info); + + private delegate int _TopicPartitionInfo_partition_delegate(IntPtr topic_partition_info); + private static _TopicPartitionInfo_partition_delegate _TopicPartitionInfo_partition; + internal static int TopicPartitionInfo_partition(IntPtr topic_partition_info) + => _TopicPartitionInfo_partition(topic_partition_info); + + private delegate IntPtr _TopicPartitionInfo_replicas_delegate(IntPtr topic_partition_info, out UIntPtr cntp); + private static _TopicPartitionInfo_replicas_delegate _TopicPartitionInfo_replicas; + internal static IntPtr TopicPartitionInfo_replicas(IntPtr topic_partition_info, out UIntPtr cntp) + => _TopicPartitionInfo_replicas(topic_partition_info, out cntp); + + private delegate void _DescribeCluster_delegate( + IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _DescribeCluster_delegate _DescribeCluster; + internal static void DescribeCluster( + IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr) + => _DescribeCluster(handle, optionsPtr, resultQueuePtr); + + private delegate IntPtr _DescribeCluster_result_nodes_delegate(IntPtr result, out UIntPtr cntp); + private static _DescribeCluster_result_nodes_delegate _DescribeCluster_result_nodes; + internal static IntPtr DescribeCluster_result_nodes(IntPtr result, out UIntPtr cntp) + => _DescribeCluster_result_nodes(result, out cntp); + + private delegate IntPtr _DescribeCluster_result_authorized_operations_delegate(IntPtr result, out UIntPtr cntp); + private static _DescribeCluster_result_authorized_operations_delegate _DescribeCluster_result_authorized_operations; + internal static IntPtr DescribeCluster_result_authorized_operations(IntPtr result, out UIntPtr cntp) + => _DescribeCluster_result_authorized_operations(result, out cntp); + + private delegate IntPtr _DescribeCluster_result_controller_delegate(IntPtr result); + private static _DescribeCluster_result_controller_delegate _DescribeCluster_result_controller; + internal static IntPtr DescribeCluster_result_controller(IntPtr result) + => _DescribeCluster_result_controller(result); + + private delegate IntPtr _DescribeCluster_result_cluster_id_delegate(IntPtr result); + private static _DescribeCluster_result_cluster_id_delegate _DescribeCluster_result_cluster_id; + internal static IntPtr DescribeCluster_result_cluster_id(IntPtr result) + => _DescribeCluster_result_cluster_id(result); // // Queues diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs index 0fac01c87..32c4d84e0 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs @@ -558,12 +558,18 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_isolation_level( IntPtr options, IntPtr isolation_level); - [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -722,7 +728,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_name( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_value ( + internal static extern IntPtr rd_kafka_ConfigEntry_value( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -742,7 +748,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_is_sensitive( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym ( + internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -811,7 +817,7 @@ internal static extern IntPtr rd_kafka_ConfigResource_error_string( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_AlterConfigs ( + internal static extern void rd_kafka_AlterConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ** */ IntPtr[] configs, UIntPtr config_cnt, @@ -822,7 +828,7 @@ internal static extern void rd_kafka_AlterConfigs ( internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_AlterConfigs_result_resources( /* rd_kafka_AlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_IncrementalAlterConfigs( /* rd_kafka_t * */ IntPtr rk, @@ -830,14 +836,14 @@ internal static extern void rd_kafka_IncrementalAlterConfigs( UIntPtr config_cnt, /* rd_kafka_AdminOptions_t * */ IntPtr options, /* rd_kafka_queue_t * */ IntPtr rkqu); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_IncrementalAlterConfigs_result_resources( /* rd_kafka_IncrementalAlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_DescribeConfigs ( + internal static extern void rd_kafka_DescribeConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ***/ IntPtr[] configs, UIntPtr config_cnt, @@ -1032,6 +1038,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); @@ -1132,7 +1141,7 @@ internal static extern void rd_kafka_DescribeUserScramCredentials( UIntPtr usersCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( IntPtr handle, @@ -1140,7 +1149,7 @@ internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( UIntPtr alterationsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialDeletion_new( string user, @@ -1159,7 +1168,7 @@ internal static extern IntPtr rd_kafka_UserScramCredentialUpsertion_new( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_UserScramCredentialAlteration_destroy( IntPtr alteration); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descriptions( IntPtr event_result, @@ -1174,24 +1183,24 @@ internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descr [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(IntPtr description); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(IntPtr description, int i); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ScramMechanism rd_kafka_ScramCredentialInfo_mechanism(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_ScramCredentialInfo_iterations(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_responses( IntPtr event_result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_user(IntPtr element); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_error(IntPtr element); @@ -1213,14 +1222,14 @@ internal static extern void rd_kafka_DescribeTopics( IntPtr topicCollection, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs(UnmanagedType.LPArray)] string[] topics, UIntPtr topicsCnt); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_TopicCollection_destroy(IntPtr topic_collection); @@ -1247,7 +1256,7 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicPartitionInfo_leader(IntPtr topic_partition_info); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_TopicPartitionInfo_partition(IntPtr topic_partition_info); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs index ee40ec23b..d69b3c36c 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs @@ -562,12 +562,18 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_isolation_level( IntPtr options, IntPtr isolation_level); - [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -726,7 +732,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_name( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_value ( + internal static extern IntPtr rd_kafka_ConfigEntry_value( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -746,7 +752,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_is_sensitive( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym ( + internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -815,7 +821,7 @@ internal static extern IntPtr rd_kafka_ConfigResource_error_string( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_AlterConfigs ( + internal static extern void rd_kafka_AlterConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ** */ IntPtr[] configs, UIntPtr config_cnt, @@ -826,7 +832,7 @@ internal static extern void rd_kafka_AlterConfigs ( internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_AlterConfigs_result_resources( /* rd_kafka_AlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_IncrementalAlterConfigs( /* rd_kafka_t * */ IntPtr rk, @@ -834,14 +840,14 @@ internal static extern void rd_kafka_IncrementalAlterConfigs( UIntPtr config_cnt, /* rd_kafka_AdminOptions_t * */ IntPtr options, /* rd_kafka_queue_t * */ IntPtr rkqu); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_IncrementalAlterConfigs_result_resources( /* rd_kafka_IncrementalAlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_DescribeConfigs ( + internal static extern void rd_kafka_DescribeConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ***/ IntPtr[] configs, UIntPtr config_cnt, @@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); @@ -1136,7 +1145,7 @@ internal static extern void rd_kafka_DescribeUserScramCredentials( UIntPtr usersCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( IntPtr handle, @@ -1144,7 +1153,7 @@ internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( UIntPtr alterationsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialDeletion_new( string user, @@ -1163,7 +1172,7 @@ internal static extern IntPtr rd_kafka_UserScramCredentialUpsertion_new( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_UserScramCredentialAlteration_destroy( IntPtr alteration); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descriptions( IntPtr event_result, @@ -1178,24 +1187,24 @@ internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descr [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(IntPtr description); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(IntPtr description, int i); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ScramMechanism rd_kafka_ScramCredentialInfo_mechanism(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_ScramCredentialInfo_iterations(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_responses( IntPtr event_result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_user(IntPtr element); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_error(IntPtr element); @@ -1217,14 +1226,14 @@ internal static extern void rd_kafka_DescribeTopics( IntPtr topicCollection, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs(UnmanagedType.LPArray)] string[] topics, UIntPtr topicsCnt); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_TopicCollection_destroy(IntPtr topic_collection); @@ -1251,7 +1260,7 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicPartitionInfo_leader(IntPtr topic_partition_info); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_TopicPartitionInfo_partition(IntPtr topic_partition_info); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs index 5032651f5..8abe11bf3 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs @@ -562,12 +562,19 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_isolation_level( IntPtr options, IntPtr isolation_level); - [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -726,7 +733,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_name( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_value ( + internal static extern IntPtr rd_kafka_ConfigEntry_value( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -746,7 +753,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_is_sensitive( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym ( + internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym( /* rd_kafka_ConfigEntry_t * */ IntPtr entry); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -815,7 +822,7 @@ internal static extern IntPtr rd_kafka_ConfigResource_error_string( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_AlterConfigs ( + internal static extern void rd_kafka_AlterConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ** */ IntPtr[] configs, UIntPtr config_cnt, @@ -826,7 +833,7 @@ internal static extern void rd_kafka_AlterConfigs ( internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_AlterConfigs_result_resources( /* rd_kafka_AlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_IncrementalAlterConfigs( /* rd_kafka_t * */ IntPtr rk, @@ -834,14 +841,14 @@ internal static extern void rd_kafka_IncrementalAlterConfigs( UIntPtr config_cnt, /* rd_kafka_AdminOptions_t * */ IntPtr options, /* rd_kafka_queue_t * */ IntPtr rkqu); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_IncrementalAlterConfigs_result_resources( /* rd_kafka_IncrementalAlterConfigs_result_t * */ IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] - internal static extern void rd_kafka_DescribeConfigs ( + internal static extern void rd_kafka_DescribeConfigs( /* rd_kafka_t * */ IntPtr rk, /* rd_kafka_ConfigResource_t ***/ IntPtr[] configs, UIntPtr config_cnt, @@ -1036,6 +1043,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); @@ -1136,7 +1146,7 @@ internal static extern void rd_kafka_DescribeUserScramCredentials( UIntPtr usersCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( IntPtr handle, @@ -1144,7 +1154,7 @@ internal static extern ErrorCode rd_kafka_AlterUserScramCredentials( UIntPtr alterationsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialDeletion_new( string user, @@ -1163,7 +1173,7 @@ internal static extern IntPtr rd_kafka_UserScramCredentialUpsertion_new( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_UserScramCredentialAlteration_destroy( IntPtr alteration); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descriptions( IntPtr event_result, @@ -1178,24 +1188,24 @@ internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descr [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(IntPtr description); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(IntPtr description, int i); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ScramMechanism rd_kafka_ScramCredentialInfo_mechanism(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_ScramCredentialInfo_iterations(IntPtr scramcredentialinfo); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_responses( IntPtr event_result, out UIntPtr cntp); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_user(IntPtr element); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_error(IntPtr element); @@ -1217,14 +1227,14 @@ internal static extern void rd_kafka_DescribeTopics( IntPtr topicCollection, IntPtr optionsPtr, IntPtr resultQueuePtr); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs(UnmanagedType.LPArray)] string[] topics, UIntPtr topicsCnt); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_TopicCollection_destroy(IntPtr topic_collection); @@ -1251,7 +1261,7 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_TopicPartitionInfo_leader(IntPtr topic_partition_info); - + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern int rd_kafka_TopicPartitionInfo_partition(IntPtr topic_partition_info); diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index bd553b304..fff8943a4 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -1472,6 +1472,15 @@ private void setOption_MatchConsumerGroupStates(IntPtr optionsPtr, ConsumerGroup } } + private void setOption_MatchConsumerGroupTypes(IntPtr optionsPtr, ConsumerGroupType[] groupTypes) + { + var error = Librdkafka.AdminOptions_set_match_consumer_group_types(optionsPtr, groupTypes, (UIntPtr)groupTypes.Count()); + if (error != IntPtr.Zero) + { + throw new KafkaException(new Error(error, true)); + } + } + private void setOption_IsolationLevel(IntPtr optionsPtr, IsolationLevel IsolationLevel) { var rError = Librdkafka.AdminOptions_set_isolation_level(optionsPtr, (IntPtr)(int)IsolationLevel); @@ -2337,8 +2346,11 @@ internal void ListConsumerGroups( { setOption_MatchConsumerGroupStates(optionsPtr, options.MatchStates.ToArray()); } + if (options.MatchGroupTypes != null) + { + setOption_MatchConsumerGroupTypes(optionsPtr, options.MatchGroupTypes.ToArray()); + } setOption_completionSource(optionsPtr, completionSourcePtr); - // Call ListConsumerGroups (async). Librdkafka.ListConsumerGroups(handle, optionsPtr, resultQueuePtr); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs index 67251d01a..d662a2b5a 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs @@ -63,22 +63,76 @@ private void checkConsumerGroupDescription( [Theory, MemberData(nameof(KafkaParameters))] public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) { + var groupID = Guid.NewGuid().ToString(); + var nonExistentGroupID = Guid.NewGuid().ToString(); if (!TestConsumerGroupProtocol.IsClassic()) { - LogToFile("KIP 848 Admin operations changes still aren't " + - "available"); + LogToFile("start AdminClient_ListDescribeConsumerGroups with Consumer Protocol"); + const string clientID = "test.client"; + + // Create an AdminClient here - we need it throughout the test. + using (var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = bootstrapServers + }).Build()) + { + var listOptionsWithTimeout = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30) }; + var listOptionsWithClassic = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30), MatchGroupTypes = new List { ConsumerGroupType.Classic } }; + var listOptionsWithConsumer = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30), MatchGroupTypes = new List { ConsumerGroupType.Consumer } }; + // We should not have any group initially. + var groups = adminClient.ListConsumerGroupsAsync().Result; + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); + + // Ensure that the partitioned topic we are using has exactly two partitions. + Assert.Equal(2, partitionedTopicNumPartitions); + + var consumerConfig = new ConsumerConfig + { + GroupId = groupID, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range, + ClientId = clientID, + + }; + var consumer = new TestConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe(new string[] { partitionedTopic }); + // Wait for rebalance. + consumer.Consume(TimeSpan.FromSeconds(10)); + + // No Consumer Group Type should be present as we passed the Classic Type option + groups = adminClient.ListConsumerGroupsAsync(listOptionsWithClassic).Result; + Assert.Empty(groups.Valid.Where(group => group.GroupType == ConsumerGroupType.Consumer)); + + // Our Consumer Group should be listed with Consumer Type option + groups = adminClient.ListConsumerGroupsAsync(listOptionsWithConsumer).Result; + Assert.Single(groups.Valid.Where(group => group.GroupId == groupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); + + var group = groups.Valid.Find(group => group.GroupId == groupID); + Assert.Equal(ConsumerGroupState.Stable, group.State); + Assert.False(group.IsSimpleConsumerGroup); + Assert.Equal(ConsumerGroupType.Consumer, group.GroupType); + + consumer.Close(); + consumer.Dispose(); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end AdminClient_ListDescribeConsumerGroups with Consumer Protocol"); return; } LogToFile("start AdminClient_ListDescribeConsumerGroups"); - var groupID = Guid.NewGuid().ToString(); - var nonExistentGroupID = Guid.NewGuid().ToString(); const string clientID1 = "test.client.1"; const string clientID2 = "test.client.2"; // Create an AdminClient here - we need it throughout the test. - using (var adminClient = new AdminClientBuilder(new AdminClientConfig { - BootstrapServers = bootstrapServers }).Build()) + using (var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = bootstrapServers + }).Build()) { var listOptionsWithTimeout = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30) }; var describeOptionsWithTimeout = new Admin.DescribeConsumerGroupsOptions() @@ -169,8 +223,10 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) // Check the 'States' option by listing Stable consumer groups, which shouldn't // include `groupID`. groups = adminClient.ListConsumerGroupsAsync(new Admin.ListConsumerGroupsOptions() - { MatchStates = new List() { ConsumerGroupState.Stable }, - RequestTimeout = TimeSpan.FromSeconds(30) }).Result; + { + MatchStates = new List() { ConsumerGroupState.Stable }, + RequestTimeout = TimeSpan.FromSeconds(30) + }).Result; Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); descResult = adminClient.DescribeConsumerGroupsAsync( diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs index a6b2b1ed2..ca75d0bb0 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs @@ -174,7 +174,8 @@ public static IEnumerable OAuthBearerKafkaParameters() } return oAuthBearerKafkaParameters; } - public static bool semaphoreSkipFlakyTests(){ + public static bool semaphoreSkipFlakyTests() + { string onSemaphore = Environment.GetEnvironmentVariable("SEMAPHORE_SKIP_FLAKY_TESTS"); if (onSemaphore != null) { diff --git a/test/Confluent.SchemaRegistry.IntegrationTests/Tests/Tests.cs b/test/Confluent.SchemaRegistry.IntegrationTests/Tests/Tests.cs index 71aeb8fa5..12a749922 100644 --- a/test/Confluent.SchemaRegistry.IntegrationTests/Tests/Tests.cs +++ b/test/Confluent.SchemaRegistry.IntegrationTests/Tests/Tests.cs @@ -57,7 +57,8 @@ public static IEnumerable SchemaRegistryParameters() } return schemaRegistryParameters; } - public static bool semaphoreSkipFlakyTests(){ + public static bool semaphoreSkipFlakyTests() + { string onSemaphore = Environment.GetEnvironmentVariable("SEMAPHORE_SKIP_FLAKY_TESTS"); if (onSemaphore != null) {