diff --git a/examples/AdminClient/Program.cs b/examples/AdminClient/Program.cs index 7b3f101ec..de8950bc9 100644 --- a/examples/AdminClient/Program.cs +++ b/examples/AdminClient/Program.cs @@ -583,27 +583,67 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm { var timeout = TimeSpan.FromSeconds(30); var statesList = new List(); - try + var groupTypesList = new List(); + var isType = false; + var isState = false; + foreach (var commandArg in commandArgs) { - if (commandArgs.Length > 0) + if (commandArg == "-states") { - timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0])); + if (isState) + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + isState = true; } - if (commandArgs.Length > 1) + else if (commandArg == "-types") { - for (int i = 1; i < commandArgs.Length; i++) + if (isType) { - statesList.Add(Enum.Parse(commandArgs[i])); + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + isType = true; + } + else + { + if (isState) + { + try + { + statesList.Add(Enum.Parse(commandArg)); + } + catch (Exception e) + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + } + else if (isType) + { + try + { + groupTypesList.Add(Enum.Parse(commandArg)); + } + catch (Exception e) + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; + } + } + else + { + Console.WriteLine("usage: .. list-consumer-groups [-states ... ] [-types .. ]"); + Environment.ExitCode = 1; + return; } } } - catch (SystemException) - { - Console.WriteLine("usage: .. list-consumer-groups [ ... ]"); - Environment.ExitCode = 1; - return; - } - using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try @@ -612,7 +652,9 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm { RequestTimeout = timeout, MatchStates = statesList, + MatchGroupTypes = groupTypesList, }); + Console.WriteLine("The Broker Response depends on the Broker Version Being used, all the Request Options may not be used for the final request"); Console.WriteLine(result); } catch (KafkaException e) diff --git a/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs index 27caece2f..c51f2e2f3 100644 --- a/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs +++ b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs @@ -32,6 +32,11 @@ public class ConsumerGroupListing /// public ConsumerGroupState State { get; set; } + /// + /// The group type of the consumer group. + /// + public ConsumerGroupType GroupType { get; set; } + /// /// Whether the consumer group is simple or not. /// @@ -42,7 +47,7 @@ public class ConsumerGroupListing /// public override string ToString() { - return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}"; + return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}, Type = {GroupType}"; } } } diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs index 1bc23b500..91b91d159 100644 --- a/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs @@ -41,5 +41,13 @@ public class ListConsumerGroupsOptions /// Default: null /// public IEnumerable MatchStates { get; set; } = null; + + /// + /// An enumerable with the group types to query, null to query for all + /// the group types. + /// + /// Default: null + /// + public IEnumerable MatchGroupTypes { get; set; } = null; } } diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 55f816916..bca989fdb 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -288,7 +288,8 @@ private ListConsumerGroupsReport extractListConsumerGroupsResults(IntPtr resultP GroupId = PtrToStringUTF8(Librdkafka.ConsumerGroupListing_group_id(cglPtr)), IsSimpleConsumerGroup = (int)Librdkafka.ConsumerGroupListing_is_simple_consumer_group(cglPtr) == 1, - State = Librdkafka.ConsumerGroupListing_state(cglPtr), + State = (ConsumerGroupState)Librdkafka.ConsumerGroupListing_state(cglPtr), + GroupType = (ConsumerGroupType)Librdkafka.ConsumerGroupListing_type(cglPtr), }; }).ToList(); } diff --git a/src/Confluent.Kafka/ConsumerGroupType.cs b/src/Confluent.Kafka/ConsumerGroupType.cs new file mode 100644 index 000000000..61bc3e4b6 --- /dev/null +++ b/src/Confluent.Kafka/ConsumerGroupType.cs @@ -0,0 +1,40 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +namespace Confluent.Kafka +{ + /// + /// Enumerates the different consumer group types. + /// + public enum ConsumerGroupType : int + { + /// + /// Unknown + /// + Unknown = 0, + + /// + /// Consumer + /// + Consumer = 1, + + /// + /// Classic + /// + Classic = 2, + }; +} diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index 1a6504d45..817f68013 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -300,6 +300,7 @@ static bool SetDelegates(Type nativeMethodsClass) _AdminOptions_set_require_stable_offsets = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_require_stable_offsets").CreateDelegate(typeof(Func)); _AdminOptions_set_include_authorized_operations = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_include_authorized_operations").CreateDelegate(typeof(Func)); _AdminOptions_set_match_consumer_group_states = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(Func)); + _AdminOptions_set_match_consumer_group_types = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_types").CreateDelegate(typeof(Func)); _AdminOptions_set_isolation_level = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_isolation_level").CreateDelegate(typeof(Func)); _NewTopic_new = (Func)methods.Single(m => m.Name == "rd_kafka_NewTopic_new").CreateDelegate(typeof(Func)); @@ -410,6 +411,8 @@ static bool SetDelegates(Type nativeMethodsClass) _ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof(_ConsumerGroupListing_group_id_delegate)); _ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof(_ConsumerGroupListing_is_simple_consumer_group_delegate)); _ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof(_ConsumerGroupListing_state_delegate)); + _ConsumerGroupListing_type = (_ConsumerGroupListing_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_type").CreateDelegate(typeof(_ConsumerGroupListing_type_delegate)); + _ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof(_ListConsumerGroups_result_valid_delegate)); _ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof(_ListConsumerGroups_result_errors_delegate)); @@ -1340,6 +1343,10 @@ internal static IntPtr AdminOptions_set_include_authorized_operations( internal static IntPtr AdminOptions_set_match_consumer_group_states(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt) => _AdminOptions_set_match_consumer_group_states(options, states, statesCnt); + private static Func _AdminOptions_set_match_consumer_group_types; + internal static IntPtr AdminOptions_set_match_consumer_group_types(IntPtr options, ConsumerGroupType[] groupTypes, UIntPtr groupTypesCnt) + => _AdminOptions_set_match_consumer_group_types(options, groupTypes, groupTypesCnt); + private static Func _AdminOptions_set_isolation_level; internal static IntPtr AdminOptions_set_isolation_level(IntPtr options, IntPtr IsolationLevel) => _AdminOptions_set_isolation_level(options, IsolationLevel); @@ -1883,6 +1890,11 @@ internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grpl internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist) => _ConsumerGroupListing_state(grplist); + private delegate ConsumerGroupType _ConsumerGroupListing_type_delegate(IntPtr grplist); + private static _ConsumerGroupListing_type_delegate _ConsumerGroupListing_type; + internal static ConsumerGroupType ConsumerGroupListing_type(IntPtr grplist) + => _ConsumerGroupListing_type(grplist); + private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp); private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid; internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp) diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs index 80e61b14d..c58cb3ffc 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs @@ -564,6 +564,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -1032,6 +1038,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs index 00c159c58..afab73b69 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs @@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs index 3561559f5..152653c3e 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs @@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta ConsumerGroupState[] states, UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types( + IntPtr options, + ConsumerGroupType[] groupTypes, + UIntPtr groupTypesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 21716bf23..4cce3a7c8 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -1474,6 +1474,15 @@ private void setOption_MatchConsumerGroupStates(IntPtr optionsPtr, ConsumerGroup } } + private void setOption_MatchConsumerGroupTypes(IntPtr optionsPtr, ConsumerGroupType[] groupTypes) + { + var error = Librdkafka.AdminOptions_set_match_consumer_group_types(optionsPtr, groupTypes, (UIntPtr)groupTypes.Count()); + if (error != IntPtr.Zero) + { + throw new KafkaException(new Error(error, true)); + } + } + private void setOption_IsolationLevel(IntPtr optionsPtr, IsolationLevel IsolationLevel) { var rError = Librdkafka.AdminOptions_set_isolation_level(optionsPtr, (IntPtr)(int)IsolationLevel); @@ -2339,8 +2348,11 @@ internal void ListConsumerGroups( { setOption_MatchConsumerGroupStates(optionsPtr, options.MatchStates.ToArray()); } + if (options.MatchGroupTypes != null) + { + setOption_MatchConsumerGroupTypes(optionsPtr, options.MatchGroupTypes.ToArray()); + } setOption_completionSource(optionsPtr, completionSourcePtr); - // Call ListConsumerGroups (async). Librdkafka.ListConsumerGroups(handle, optionsPtr, resultQueuePtr); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs index 9e596cae2..d22ff4483 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs @@ -63,16 +63,69 @@ private void checkConsumerGroupDescription( [Theory, MemberData(nameof(KafkaParameters))] public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) { + var groupId = Guid.NewGuid().ToString(); + var nonExistentGroupId = Guid.NewGuid().ToString(); if (!TestConsumerGroupProtocol.IsClassic()) { - LogToFile("KIP 848 Admin operations changes still aren't " + - "available"); + LogToFile("start AdminClient_ListDescribeConsumerGroups with Consumer Protocol"); + const string clientId = "test.client"; + + // Create an AdminClient here - we need it throughout the test. + using (var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = bootstrapServers + }).Build()) + { + var listOptionsWithTimeout = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30) }; + var listOptionsWithClassic = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30), MatchGroupTypes = new List { ConsumerGroupType.Classic } }; + var listOptionsWithConsumer = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30), MatchGroupTypes = new List { ConsumerGroupType.Consumer } }; + // We should not have any group initially. + var groups = adminClient.ListConsumerGroupsAsync().Result; + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupId)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupId)); + + // Ensure that the partitioned topic we are using has exactly two partitions. + Assert.Equal(2, partitionedTopicNumPartitions); + + var consumerConfig = new ConsumerConfig + { + GroupId = groupId, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range, + ClientId = clientId, + + }; + var consumer = new TestConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe(new string[] { partitionedTopic }); + // Wait for rebalance. + consumer.Consume(TimeSpan.FromSeconds(10)); + + // Our Consumer Group should not be present with Classic Type option + groups = adminClient.ListConsumerGroupsAsync(listOptionsWithClassic).Result; + Assert.Empty(groups.Valid.Where(group => group.GroupType != ConsumerGroupType.Classic)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupId)); + + // Our Consumer Group should be present with Consumer Type option + groups = adminClient.ListConsumerGroupsAsync(listOptionsWithConsumer).Result; + Assert.Empty(groups.Valid.Where(group => group.GroupType != ConsumerGroupType.Consumer)); + Assert.Single(groups.Valid.Where(group => group.GroupId == groupId)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupId)); + + var group = groups.Valid.Find(group => group.GroupId == groupId); + Assert.Equal(ConsumerGroupState.Stable, group.State); + Assert.False(group.IsSimpleConsumerGroup); + + consumer.Close(); + consumer.Dispose(); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end AdminClient_ListDescribeConsumerGroups with Consumer Protocol"); return; } LogToFile("start AdminClient_ListDescribeConsumerGroups"); - var groupID = Guid.NewGuid().ToString(); - var nonExistentGroupID = Guid.NewGuid().ToString(); const string clientID1 = "test.client.1"; const string clientID2 = "test.client.2"; @@ -91,8 +144,8 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) // We should not have any group initially. var groups = adminClient.ListConsumerGroupsAsync().Result; - Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); - Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupId)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupId)); // Ensure that the partitioned topic we are using has exactly two partitions. Assert.Equal(2, partitionedTopicNumPartitions); @@ -100,7 +153,7 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) // 1. One consumer group with one client. var consumerConfig = new ConsumerConfig { - GroupId = groupID, + GroupId = groupId, BootstrapServers = bootstrapServers, SessionTimeoutMs = 6000, PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range, @@ -113,16 +166,16 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) consumer1.Consume(TimeSpan.FromSeconds(10)); groups = adminClient.ListConsumerGroupsAsync(listOptionsWithTimeout).Result; - Assert.Single(groups.Valid.Where(group => group.GroupId == groupID)); - Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); - var group = groups.Valid.Find(group => group.GroupId == groupID); + Assert.Single(groups.Valid.Where(group => group.GroupId == groupId)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupId)); + var group = groups.Valid.Find(group => group.GroupId == groupId); Assert.Equal(ConsumerGroupState.Stable, group.State); Assert.False(group.IsSimpleConsumerGroup); var descResult = adminClient.DescribeConsumerGroupsAsync( - new List() { groupID }, + new List() { groupId }, describeOptionsWithTimeout).Result; - var groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + var groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupId); var clientIdToToppars = new Dictionary>(); clientIdToToppars[clientID1] = new List() { @@ -130,7 +183,7 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) new TopicPartition(partitionedTopic, 1), }; checkConsumerGroupDescription( - groupDesc, ConsumerGroupState.Stable, "range", groupID, clientIdToToppars); + groupDesc, ConsumerGroupState.Stable, "range", groupId, clientIdToToppars); // 2. One consumer group with two clients. consumerConfig.ClientId = clientID2; @@ -145,10 +198,10 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) consumer2.Consume(TimeSpan.FromSeconds(1)); descResult = adminClient.DescribeConsumerGroupsAsync( - new List() { groupID }, + new List() { groupId }, describeOptionsWithTimeout).Result; - Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupID)); - groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupId)); + groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupId); state = groupDesc.State; } @@ -159,7 +212,7 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) new TopicPartition(partitionedTopic, 1) }; checkConsumerGroupDescription( - groupDesc, ConsumerGroupState.Stable, "range", groupID, clientIdToToppars); + groupDesc, ConsumerGroupState.Stable, "range", groupId, clientIdToToppars); // 3. Empty consumer group. consumer1.Close(); @@ -175,16 +228,16 @@ public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) MatchStates = new List() { ConsumerGroupState.Stable }, RequestTimeout = TimeSpan.FromSeconds(30) }).Result; - Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupId)); descResult = adminClient.DescribeConsumerGroupsAsync( - new List() { groupID }, + new List() { groupId }, describeOptionsWithTimeout).Result; - Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupID)); - groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupId)); + groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupId); clientIdToToppars = new Dictionary>(); checkConsumerGroupDescription( - groupDesc, ConsumerGroupState.Empty, "", groupID, clientIdToToppars); + groupDesc, ConsumerGroupState.Empty, "", groupId, clientIdToToppars); } Assert.Equal(0, Library.HandleCount);