Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a UTF-8 statistics handler #2225

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Confluent.Kafka/AdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,9 @@ internal AdminClient(AdminClientBuilder builder)
var producerBuilder = new ProducerBuilder<Null, Null>(config);
if (builder.LogHandler != null) { producerBuilder.SetLogHandler((_, logMessage) => builder.LogHandler(this, logMessage)); }
if (builder.ErrorHandler != null) { producerBuilder.SetErrorHandler((_, error) => builder.ErrorHandler(this, error)); }
#if NET6_0_OR_GREATER
if (builder.StatisticsUtf8Handler != null) { producerBuilder.SetStatisticsUtf8Handler((stats, _) => builder.StatisticsUtf8Handler(stats, this)); }
#endif
if (builder.StatisticsHandler != null) { producerBuilder.SetStatisticsHandler((_, stats) => builder.StatisticsHandler(this, stats)); }
if (builder.OAuthBearerTokenRefreshHandler != null) { producerBuilder.SetOAuthBearerTokenRefreshHandler(builder.OAuthBearerTokenRefreshHandler); }
this.ownedClient = producerBuilder.Build();
Expand Down
28 changes: 28 additions & 0 deletions src/Confluent.Kafka/AdminClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.Collections.Generic;


Expand All @@ -40,6 +41,15 @@ public class AdminClientBuilder
/// </summary>
internal protected Action<IAdminClient, LogMessage> LogHandler { get; set; }

#if NET6_0_OR_GREATER
/// <summary>
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
/// </summary>
internal protected ReadOnlySpanAction<byte, IAdminClient> StatisticsUtf8Handler { get; set; }
#endif

/// <summary>
/// The configured statistics handler.
/// </summary>
Expand All @@ -65,6 +75,24 @@ public AdminClientBuilder(IEnumerable<KeyValuePair<string, string>> config)
this.Config = config;
}

#if NET6_0_OR_GREATER
/// <summary>
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
/// </summary>
public AdminClientBuilder SetStatisticsUtf8Handler(
ReadOnlySpanAction<byte, IAdminClient> statisticsHandler)
{
if (this.StatisticsUtf8Handler != null)
{
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
}
this.StatisticsUtf8Handler = statisticsHandler;
return this;
}
#endif

/// <summary>
/// Set the handler to call on statistics events. Statistics are provided
/// as a JSON formatted string as defined here:
Expand Down
29 changes: 28 additions & 1 deletion src/Confluent.Kafka/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
Expand All @@ -39,6 +40,9 @@ internal class Config
internal IEnumerable<KeyValuePair<string, string>> config;
internal Action<Error> errorHandler;
internal Action<LogMessage> logHandler;
#if NET6_0_OR_GREATER
internal ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
#endif
internal Action<string> statisticsHandler;
internal Action<CommittedOffsets> offsetsCommittedHandler;
internal Action<string> oAuthBearerTokenRefreshHandler;
Expand Down Expand Up @@ -104,6 +108,9 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu
}
}

#if NET6_0_OR_GREATER
private ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
#endif
private Action<string> statisticsHandler;
private Librdkafka.StatsDelegate statisticsCallbackDelegate;
private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque)
Expand All @@ -112,7 +119,20 @@ private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr
try
{
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
#if NET6_0_OR_GREATER
if (statisticsUtf8Handler != null)
{
unsafe
{
statisticsUtf8Handler.Invoke(new ReadOnlySpan<byte>(json.ToPointer(), (int)json_len), null);
}
}
#endif

if (statisticsHandler != null)
{
statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -628,6 +648,9 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
var baseConfig = builder.ConstructBaseConfig(this);

#if NET6_0_OR_GREATER
this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler;
#endif
this.statisticsHandler = baseConfig.statisticsHandler;
this.logHandler = baseConfig.logHandler;
this.errorHandler = baseConfig.errorHandler;
Expand Down Expand Up @@ -710,7 +733,11 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
}
#if NET6_0_OR_GREATER
if (statisticsUtf8Handler != null || statisticsHandler != null)
#else
if (statisticsHandler != null)
#endif
{
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
}
Expand Down
36 changes: 35 additions & 1 deletion src/Confluent.Kafka/ConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;

Expand All @@ -41,8 +42,18 @@ public class ConsumerBuilder<TKey, TValue>
/// </summary>
internal protected Action<IConsumer<TKey, TValue>, LogMessage> LogHandler { get; set; }

#if NET6_0_OR_GREATER
/// <summary>
/// The configured statistics handler.
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
/// </summary>
internal protected ReadOnlySpanAction<byte, IConsumer<TKey, TValue>> StatisticsUtf8Handler { get; set; }
#endif

/// <summary>
/// The configured statistics handler. With .NET 6+, prefer the <see cref="StatisticsUtf8Handler"/> which
/// doesn't allocate the JSON string and gives access to the raw UTF-8 bytes.
/// </summary>
internal protected Action<IConsumer<TKey, TValue>, string> StatisticsHandler { get; set; }

Expand Down Expand Up @@ -98,6 +109,11 @@ internal Consumer<TKey,TValue>.Config ConstructBaseConfig(Consumer<TKey, TValue>
logHandler = this.LogHandler == null
? default(Action<LogMessage>)
: logMessage => this.LogHandler(consumer, logMessage),
#if NET6_0_OR_GREATER
statisticsUtf8Handler = this.StatisticsUtf8Handler == null
? default(ReadOnlySpanAction<byte, object>)
: (stats, _) => this.StatisticsUtf8Handler(stats, consumer),
#endif
statisticsHandler = this.StatisticsHandler == null
? default(Action<string>)
: stats => this.StatisticsHandler(consumer, stats),
Expand Down Expand Up @@ -136,6 +152,24 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
this.Config = config;
}

#if NET6_0_OR_GREATER
/// <summary>
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
/// </summary>
public ConsumerBuilder<TKey, TValue> SetStatisticsUtf8Handler(
ReadOnlySpanAction<byte, IConsumer<TKey, TValue>> statisticsHandler)
{
if (this.StatisticsUtf8Handler != null)
{
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
}
this.StatisticsUtf8Handler = statisticsHandler;
return this;
}
#endif

/// <summary>
/// Set the handler to call on statistics events. Statistics
/// are provided as a JSON formatted string as defined here:
Expand Down
38 changes: 32 additions & 6 deletions src/Confluent.Kafka/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
Expand All @@ -36,6 +37,9 @@ internal class Config
public IEnumerable<KeyValuePair<string, string>> config;
public Action<Error> errorHandler;
public Action<LogMessage> logHandler;
#if NET6_0_OR_GREATER
internal ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
#endif
public Action<string> statisticsHandler;
public Action<string> oAuthBearerTokenRefreshHandler;
public Dictionary<string, PartitionerDelegate> partitioners;
Expand Down Expand Up @@ -139,23 +143,38 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu
}


#if NET6_0_OR_GREATER
private ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
#endif
private Action<string> statisticsHandler;
private Librdkafka.StatsDelegate statisticsCallbackDelegate;
private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque)
{
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
if (ownedKafkaHandle.IsClosed) { return 0; }

try
{
statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json));
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
#if NET6_0_OR_GREATER
if (statisticsUtf8Handler != null)
{
unsafe
{
statisticsUtf8Handler.Invoke(new ReadOnlySpan<byte>(json.ToPointer(), (int)json_len), null);
}
}
#endif

if (statisticsHandler != null)
{
statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
}
}
catch (Exception e)
{
handlerException = e;
}

return 0; // instruct librdkafka to immediately free the json ptr.
return 0; // instruct librdkafka to immediately free the json ptr
}

private Action<string> oAuthBearerTokenRefreshHandler;
Expand Down Expand Up @@ -399,8 +418,8 @@ public void Flush(CancellationToken cancellationToken)
throw new OperationCanceledException();
}
}
}
}


/// <inheritdoc/>
public void Dispose()
Expand Down Expand Up @@ -576,6 +595,9 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
// TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks
// that never complete are never created when this is set to true.

#if NET6_0_OR_GREATER
this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler;
#endif
this.statisticsHandler = baseConfig.statisticsHandler;
this.logHandler = baseConfig.logHandler;
this.errorHandler = baseConfig.errorHandler;
Expand Down Expand Up @@ -675,7 +697,11 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
{
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
}
#if NET6_0_OR_GREATER
if (statisticsUtf8Handler != null || statisticsHandler != null)
#else
if (statisticsHandler != null)
#endif
{
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
}
Expand Down
33 changes: 33 additions & 0 deletions src/Confluent.Kafka/ProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.Collections.Generic;


Expand Down Expand Up @@ -74,6 +75,15 @@ public class ProducerBuilder<TKey, TValue>
/// </summary>
internal protected Action<IProducer<TKey, TValue>, LogMessage> LogHandler { get; set; }

#if NET6_0_OR_GREATER
/// <summary>
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
/// </summary>
internal protected ReadOnlySpanAction<byte, IProducer<TKey, TValue>> StatisticsUtf8Handler { get; set; }
#endif

/// <summary>
/// The configured statistics handler.
/// </summary>
Expand Down Expand Up @@ -125,6 +135,11 @@ internal Producer<TKey,TValue>.Config ConstructBaseConfig(Producer<TKey, TValue>
logHandler = this.LogHandler == null
? default(Action<LogMessage>)
: logMessage => this.LogHandler(producer, logMessage),
#if NET6_0_OR_GREATER
statisticsUtf8Handler = this.StatisticsUtf8Handler == null
? default(ReadOnlySpanAction<byte, object>)
: (stats, _) => this.StatisticsUtf8Handler(stats, producer),
#endif
statisticsHandler = this.StatisticsHandler == null
? default(Action<string>)
: stats => this.StatisticsHandler(producer, stats),
Expand All @@ -148,6 +163,24 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
this.Config = config;
}

#if NET6_0_OR_GREATER
/// <summary>
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
/// </summary>
public ProducerBuilder<TKey, TValue> SetStatisticsUtf8Handler(
ReadOnlySpanAction<byte, IProducer<TKey, TValue>> statisticsHandler)
{
if (this.StatisticsUtf8Handler != null)
{
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
}
this.StatisticsUtf8Handler = statisticsHandler;
return this;
}
#endif

/// <summary>
/// Set the handler to call on statistics events. Statistics are provided as
/// a JSON formatted string as defined here:
Expand Down
Loading