Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848 ListGroups API #2245

Open
wants to merge 6 commits into
base: dev_fix_formatting
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,27 +583,67 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
{
var timeout = TimeSpan.FromSeconds(30);
var statesList = new List<ConsumerGroupState>();
try
var groupTypesList = new List<ConsumerGroupType>();
var isType = false;
var isState = false;
foreach (var commandArg in commandArgs)
{
if (commandArgs.Length > 0)
if (commandArg == "-states")
{
timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0]));
if (isState)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [-states <match_state_1> <match_state_2> ... <match_state_N>] [-types <group_type_1> .. <group_type_M>]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve the error message, state that this was a Duplicate states argument. Same for types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the duplicate state is Invalid args but still the example will run and yield a error like Invalid args.
This is basically error with exit code so user has put in wrong input format for the example.
like -state_cnt=2 state_1 -group_type_cnt=..
The above will error because only 1 state was given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please suggest something if you feel it is still not sufficient.

Environment.ExitCode = 1;
return;
}
isState = true;
}
if (commandArgs.Length > 1)
else if (commandArg == "-types")
{
for (int i = 1; i < commandArgs.Length; i++)
if (isType)
{
statesList.Add(Enum.Parse<ConsumerGroupState>(commandArgs[i]));
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [-states <match_state_1> <match_state_2> ... <match_state_N>] [-types <group_type_1> .. <group_type_M>]");
Environment.ExitCode = 1;
return;
}
isType = true;
}
else
{
if (isState)
{
try
{
statesList.Add(Enum.Parse<ConsumerGroupState>(commandArg));
}
catch (Exception e)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [-states <match_state_1> <match_state_2> ... <match_state_N>] [-types <group_type_1> .. <group_type_M>]");
Environment.ExitCode = 1;
return;
}
}
else if (isType)
{
try
{
groupTypesList.Add(Enum.Parse<ConsumerGroupType>(commandArg));
}
catch (Exception e)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [-states <match_state_1> <match_state_2> ... <match_state_N>] [-types <group_type_1> .. <group_type_M>]");
Environment.ExitCode = 1;
return;
}
}
else
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [-states <match_state_1> <match_state_2> ... <match_state_N>] [-types <group_type_1> .. <group_type_M>]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing <timeout_seconds>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the usage in codebase mentions timeout seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the examples convention I am saying

Copy link
Member

@anchitj anchitj Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the remove line 602(just below this comment), it had < timeout_seconds>

Environment.ExitCode = 1;
return;
}
}
}
catch (SystemException)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [<timeout_seconds> <match_state_1> <match_state_2> ... <match_state_N>]");
Environment.ExitCode = 1;
return;
}

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
Expand All @@ -612,7 +652,9 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
{
RequestTimeout = timeout,
MatchStates = statesList,
MatchGroupTypes = groupTypesList,
});
Console.WriteLine("The Broker Response depends on the Broker Version Being used, all the Request Options may not be used for the final request");
Console.WriteLine(result);
}
catch (KafkaException e)
Expand Down
7 changes: 6 additions & 1 deletion src/Confluent.Kafka/Admin/ConsumerGroupListing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class ConsumerGroupListing
/// </summary>
public ConsumerGroupState State { get; set; }

/// <summary>
/// The group type of the consumer group.
/// </summary>
public ConsumerGroupType GroupType { get; set; }

/// <summary>
/// Whether the consumer group is simple or not.
/// </summary>
Expand All @@ -42,7 +47,7 @@ public class ConsumerGroupListing
/// </summary>
public override string ToString()
{
return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}";
return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}, Type = {GroupType}";
}
}
}
8 changes: 8 additions & 0 deletions src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public class ListConsumerGroupsOptions
/// Default: null
/// </summary>
public IEnumerable<ConsumerGroupState> MatchStates { get; set; } = null;

/// <summary>
/// An enumerable with the group types to query, null to query for all
/// the group types.
///
/// Default: null
/// </summary>
public IEnumerable<ConsumerGroupType> MatchGroupTypes { get; set; } = null;
}
}
3 changes: 2 additions & 1 deletion src/Confluent.Kafka/AdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ private ListConsumerGroupsReport extractListConsumerGroupsResults(IntPtr resultP
GroupId = PtrToStringUTF8(Librdkafka.ConsumerGroupListing_group_id(cglPtr)),
IsSimpleConsumerGroup =
(int)Librdkafka.ConsumerGroupListing_is_simple_consumer_group(cglPtr) == 1,
State = Librdkafka.ConsumerGroupListing_state(cglPtr),
State = (ConsumerGroupState)Librdkafka.ConsumerGroupListing_state(cglPtr),
GroupType = (ConsumerGroupType)Librdkafka.ConsumerGroupListing_type(cglPtr),
};
}).ToList();
}
Expand Down
40 changes: 40 additions & 0 deletions src/Confluent.Kafka/ConsumerGroupType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.


namespace Confluent.Kafka
{
/// <summary>
/// Enumerates the different consumer group types.
/// </summary>
public enum ConsumerGroupType : int
{
/// <summary>
/// Unknown
/// </summary>
Unknown = 0,

/// <summary>
/// Consumer
/// </summary>
Consumer = 1,

/// <summary>
/// Classic
/// </summary>
Classic = 2,
};
}
12 changes: 12 additions & 0 deletions src/Confluent.Kafka/Impl/LibRdKafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ static bool SetDelegates(Type nativeMethodsClass)
_AdminOptions_set_require_stable_offsets = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_require_stable_offsets").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
_AdminOptions_set_include_authorized_operations = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_include_authorized_operations").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
_AdminOptions_set_match_consumer_group_states = (Func<IntPtr, ConsumerGroupState[], UIntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(Func<IntPtr, ConsumerGroupState[], UIntPtr, IntPtr>));
_AdminOptions_set_match_consumer_group_types = (Func<IntPtr, ConsumerGroupType[], UIntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_types").CreateDelegate(typeof(Func<IntPtr, ConsumerGroupType[], UIntPtr, IntPtr>));
_AdminOptions_set_isolation_level = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_isolation_level").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));

_NewTopic_new = (Func<string, IntPtr, IntPtr, StringBuilder, UIntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_NewTopic_new").CreateDelegate(typeof(Func<string, IntPtr, IntPtr, StringBuilder, UIntPtr, IntPtr>));
Expand Down Expand Up @@ -410,6 +411,8 @@ static bool SetDelegates(Type nativeMethodsClass)
_ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof(_ConsumerGroupListing_group_id_delegate));
_ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof(_ConsumerGroupListing_is_simple_consumer_group_delegate));
_ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof(_ConsumerGroupListing_state_delegate));
_ConsumerGroupListing_type = (_ConsumerGroupListing_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_type").CreateDelegate(typeof(_ConsumerGroupListing_type_delegate));

_ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof(_ListConsumerGroups_result_valid_delegate));
_ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof(_ListConsumerGroups_result_errors_delegate));

Expand Down Expand Up @@ -1340,6 +1343,10 @@ internal static IntPtr AdminOptions_set_include_authorized_operations(
internal static IntPtr AdminOptions_set_match_consumer_group_states(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt)
=> _AdminOptions_set_match_consumer_group_states(options, states, statesCnt);

private static Func<IntPtr, ConsumerGroupType[], UIntPtr, IntPtr> _AdminOptions_set_match_consumer_group_types;
internal static IntPtr AdminOptions_set_match_consumer_group_types(IntPtr options, ConsumerGroupType[] groupTypes, UIntPtr groupTypesCnt)
=> _AdminOptions_set_match_consumer_group_types(options, groupTypes, groupTypesCnt);

private static Func<IntPtr, IntPtr, IntPtr> _AdminOptions_set_isolation_level;
internal static IntPtr AdminOptions_set_isolation_level(IntPtr options, IntPtr IsolationLevel)
=> _AdminOptions_set_isolation_level(options, IsolationLevel);
Expand Down Expand Up @@ -1883,6 +1890,11 @@ internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grpl
internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist)
=> _ConsumerGroupListing_state(grplist);

private delegate ConsumerGroupType _ConsumerGroupListing_type_delegate(IntPtr grplist);
private static _ConsumerGroupListing_type_delegate _ConsumerGroupListing_type;
internal static ConsumerGroupType ConsumerGroupListing_type(IntPtr grplist)
=> _ConsumerGroupListing_type(grplist);

private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp);
private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid;
internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp)
Expand Down
9 changes: 9 additions & 0 deletions src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
ConsumerGroupState[] states,
UIntPtr statesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
IntPtr options,
ConsumerGroupType[] groupTypes,
UIntPtr groupTypesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_NewTopic_new(
[MarshalAs(UnmanagedType.LPStr)] string topic,
Expand Down Expand Up @@ -1032,6 +1038,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
ConsumerGroupState[] states,
UIntPtr statesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
IntPtr options,
ConsumerGroupType[] groupTypes,
UIntPtr groupTypesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_NewTopic_new(
[MarshalAs(UnmanagedType.LPStr)] string topic,
Expand Down Expand Up @@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,12 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_sta
ConsumerGroupState[] states,
UIntPtr statesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
IntPtr options,
ConsumerGroupType[] groupTypes,
UIntPtr groupTypesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_NewTopic_new(
[MarshalAs(UnmanagedType.LPStr)] string topic,
Expand Down Expand Up @@ -1036,6 +1042,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);

Expand Down
14 changes: 13 additions & 1 deletion src/Confluent.Kafka/Impl/SafeKafkaHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,15 @@ private void setOption_MatchConsumerGroupStates(IntPtr optionsPtr, ConsumerGroup
}
}

private void setOption_MatchConsumerGroupTypes(IntPtr optionsPtr, ConsumerGroupType[] groupTypes)
{
var error = Librdkafka.AdminOptions_set_match_consumer_group_types(optionsPtr, groupTypes, (UIntPtr)groupTypes.Count());
if (error != IntPtr.Zero)
{
throw new KafkaException(new Error(error, true));
}
}

private void setOption_IsolationLevel(IntPtr optionsPtr, IsolationLevel IsolationLevel)
{
var rError = Librdkafka.AdminOptions_set_isolation_level(optionsPtr, (IntPtr)(int)IsolationLevel);
Expand Down Expand Up @@ -2339,8 +2348,11 @@ internal void ListConsumerGroups(
{
setOption_MatchConsumerGroupStates(optionsPtr, options.MatchStates.ToArray());
}
if (options.MatchGroupTypes != null)
{
setOption_MatchConsumerGroupTypes(optionsPtr, options.MatchGroupTypes.ToArray());
}
setOption_completionSource(optionsPtr, completionSourcePtr);

// Call ListConsumerGroups (async).
Librdkafka.ListConsumerGroups(handle, optionsPtr, resultQueuePtr);
}
Expand Down
Loading