Skip to content

Commit

Permalink
ListGroups API KIP 848 (#7)
Browse files Browse the repository at this point in the history
* rebase commit

* change
  • Loading branch information
mahajanadhitya authored Aug 8, 2024
1 parent 385429a commit 8452969
Show file tree
Hide file tree
Showing 15 changed files with 907 additions and 699 deletions.
141 changes: 83 additions & 58 deletions examples/AdminClient/Program.cs

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/Confluent.Kafka/Admin/ConsumerGroupListing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class ConsumerGroupListing
/// </summary>
public ConsumerGroupState State { get; set; }

/// <summary>
/// The group type of the consumer group.
/// </summary>
public ConsumerGroupType GroupType { get; set; }

/// <summary>
/// Whether the consumer group is simple or not.
/// </summary>
Expand All @@ -42,7 +47,7 @@ public class ConsumerGroupListing
/// </summary>
public override string ToString()
{
return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}";
return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}, Type = {GroupType}";
}
}
}
8 changes: 8 additions & 0 deletions src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public class ListConsumerGroupsOptions
/// Default: null
/// </summary>
public IEnumerable<ConsumerGroupState> MatchStates { get; set; } = null;

/// <summary>
/// An enumerable with the group types to query, null to query for all
/// the group types.
///
/// Default: null
/// </summary>
public IEnumerable<ConsumerGroupType> MatchGroupTypes { get; set; } = null;
}
}
12 changes: 8 additions & 4 deletions src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ public class ListConsumerGroupsReport
/// <summary>
/// Returns a human readable representation of this object.
/// </summary>
public override string ToString() {
public override string ToString()
{
string res = "Groups:\n";
foreach (ConsumerGroupListing cgl in Valid) {
foreach (ConsumerGroupListing cgl in Valid)
{
res += "\t" + cgl.ToString() + "\n";
}
if (Errors.Count != 0) {
if (Errors.Count != 0)
{
res += "Errors:\n";
foreach (Error err in Errors) {
foreach (Error err in Errors)
{
res += "\t" + err.ToString() + "\n";
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public class ListConsumerGroupsResult
/// <summary>
/// Returns a human readable representation of this object.
/// </summary>
public override string ToString() {
public override string ToString()
{
string res = "Groups:\n";
foreach (ConsumerGroupListing cgl in Valid) {
foreach (ConsumerGroupListing cgl in Valid)
{
res += "\t" + cgl.ToString() + "\n";
}
return res;
Expand Down
561 changes: 287 additions & 274 deletions src/Confluent.Kafka/AdminClient.cs

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions src/Confluent.Kafka/ConsumerGroupType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.


namespace Confluent.Kafka
{
/// <summary>
/// Enumerates the different consumer group types.
/// </summary>
public enum ConsumerGroupType : int
{
/// <summary>
/// Unknown
/// </summary>
Unknown = 0,

/// <summary>
/// Consumer
/// </summary>
Consumer = 1,

/// <summary>
/// Classic
/// </summary>
Classic = 2,
};
}
597 changes: 305 additions & 292 deletions src/Confluent.Kafka/Impl/LibRdKafka.cs

Large diffs are not rendered by default.

47 changes: 28 additions & 19 deletions src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,18 @@ internal static extern IntPtr rd_kafka_AdminOptions_set_isolation_level(
IntPtr options,
IntPtr isolation_level);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states(
IntPtr options,
ConsumerGroupState[] states,
UIntPtr statesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_types(
IntPtr options,
ConsumerGroupType[] groupTypes,
UIntPtr groupTypesCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_NewTopic_new(
[MarshalAs(UnmanagedType.LPStr)] string topic,
Expand Down Expand Up @@ -722,7 +728,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_name(
/* rd_kafka_ConfigEntry_t * */ IntPtr entry);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ConfigEntry_value (
internal static extern IntPtr rd_kafka_ConfigEntry_value(
/* rd_kafka_ConfigEntry_t * */ IntPtr entry);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
Expand All @@ -742,7 +748,7 @@ internal static extern IntPtr rd_kafka_ConfigEntry_is_sensitive(
/* rd_kafka_ConfigEntry_t * */ IntPtr entry);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym (
internal static extern IntPtr rd_kafka_ConfigEntry_is_synonym(
/* rd_kafka_ConfigEntry_t * */ IntPtr entry);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
Expand Down Expand Up @@ -811,7 +817,7 @@ internal static extern IntPtr rd_kafka_ConfigResource_error_string(


[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_AlterConfigs (
internal static extern void rd_kafka_AlterConfigs(
/* rd_kafka_t * */ IntPtr rk,
/* rd_kafka_ConfigResource_t ** */ IntPtr[] configs,
UIntPtr config_cnt,
Expand All @@ -822,22 +828,22 @@ internal static extern void rd_kafka_AlterConfigs (
internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_AlterConfigs_result_resources(
/* rd_kafka_AlterConfigs_result_t * */ IntPtr result,
out UIntPtr cntp);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_IncrementalAlterConfigs(
/* rd_kafka_t * */ IntPtr rk,
/* rd_kafka_ConfigResource_t ** */ IntPtr[] configs,
UIntPtr config_cnt,
/* rd_kafka_AdminOptions_t * */ IntPtr options,
/* rd_kafka_queue_t * */ IntPtr rkqu);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern /* rd_kafka_ConfigResource_t ** */ IntPtr rd_kafka_IncrementalAlterConfigs_result_resources(
/* rd_kafka_IncrementalAlterConfigs_result_t * */ IntPtr result,
out UIntPtr cntp);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_DescribeConfigs (
internal static extern void rd_kafka_DescribeConfigs(
/* rd_kafka_t * */ IntPtr rk,
/* rd_kafka_ConfigResource_t ***/ IntPtr[] configs,
UIntPtr config_cnt,
Expand Down Expand Up @@ -1032,6 +1038,9 @@ internal static extern void rd_kafka_ListConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupListing_type(IntPtr grplist);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp);

Expand Down Expand Up @@ -1132,15 +1141,15 @@ internal static extern void rd_kafka_DescribeUserScramCredentials(
UIntPtr usersCnt,
IntPtr optionsPtr,
IntPtr resultQueuePtr);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ErrorCode rd_kafka_AlterUserScramCredentials(
IntPtr handle,
IntPtr[] alterations,
UIntPtr alterationsCnt,
IntPtr optionsPtr,
IntPtr resultQueuePtr);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_UserScramCredentialDeletion_new(
string user,
Expand All @@ -1159,7 +1168,7 @@ internal static extern IntPtr rd_kafka_UserScramCredentialUpsertion_new(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_UserScramCredentialAlteration_destroy(
IntPtr alteration);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descriptions(
IntPtr event_result,
Expand All @@ -1174,24 +1183,24 @@ internal static extern IntPtr rd_kafka_DescribeUserScramCredentials_result_descr

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern int rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(IntPtr description);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(IntPtr description, int i);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ScramMechanism rd_kafka_ScramCredentialInfo_mechanism(IntPtr scramcredentialinfo);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern int rd_kafka_ScramCredentialInfo_iterations(IntPtr scramcredentialinfo);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_responses(
IntPtr event_result,
out UIntPtr cntp);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_user(IntPtr element);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_AlterUserScramCredentials_result_response_error(IntPtr element);

Expand All @@ -1213,14 +1222,14 @@ internal static extern void rd_kafka_DescribeTopics(
IntPtr topicCollection,
IntPtr optionsPtr,
IntPtr resultQueuePtr);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_DescribeTopics_result_topics(IntPtr result, out UIntPtr cntp);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs(UnmanagedType.LPArray)] string[] topics,
UIntPtr topicsCnt);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern void rd_kafka_TopicCollection_destroy(IntPtr topic_collection);

Expand All @@ -1247,7 +1256,7 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_TopicPartitionInfo_leader(IntPtr topic_partition_info);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern int rd_kafka_TopicPartitionInfo_partition(IntPtr topic_partition_info);

Expand Down
Loading

0 comments on commit 8452969

Please sign in to comment.