Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanup for SchemaRegistry projects #2145

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Confluent.Kafka/IAsyncDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace Confluent.Kafka
{
/// <summary>
/// A deserializer for use with <see cref="Confluent.Kafka.Consumer{TKey,TValue}" />.
/// A deserializer for use with <see cref="Consumer{TKey,TValue}" />.
/// </summary>
public interface IAsyncDeserializer<T>
{
Expand All @@ -32,7 +32,7 @@ public interface IAsyncDeserializer<T>
/// The raw byte data to deserialize.
/// </param>
/// <param name="isNull">
/// True if this is a null value.
/// True if this is a <see langword="null"/> value.
/// </param>
/// <param name="context">
/// Context relevant to the deserialize operation.
Expand Down
5 changes: 2 additions & 3 deletions src/Confluent.Kafka/IAsyncSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
//
// Refer to LICENSE for more information.

using System;
using System.Threading.Tasks;


namespace Confluent.Kafka
{
/// <summary>
/// Defines a serializer for use with <see cref="Confluent.Kafka.Producer{TKey,TValue}" />.
/// Defines a serializer for use with <see cref="Producer{TKey,TValue}" />.
/// </summary>
public interface IAsyncSerializer<T>
{
Expand All @@ -36,7 +35,7 @@ public interface IAsyncSerializer<T>
/// Context relevant to the serialize operation.
/// </param>
/// <returns>
/// A <see cref="System.Threading.Tasks.Task" /> that
/// A <see cref="Task" /> that
/// completes with the serialized data.
/// </returns>
Task<byte[]> SerializeAsync(T data, SerializationContext context);
Expand Down
6 changes: 3 additions & 3 deletions src/Confluent.Kafka/SerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public struct SerializationContext
/// The default SerializationContext value (representing no context defined).
/// </summary>
public static SerializationContext Empty
=> default(SerializationContext);
=> default;

/// <summary>
/// Create a new SerializationContext object instance.
Expand All @@ -41,7 +41,7 @@ public static SerializationContext Empty
/// The collection of message headers (or null). Specifying null or an
/// empty list are equivalent. The order of headers is maintained, and
/// duplicate header keys are allowed.
/// </param>
/// </param>
public SerializationContext(MessageComponentType component, string topic, Headers headers = null)
{
Component = component;
Expand All @@ -53,7 +53,7 @@ public SerializationContext(MessageComponentType component, string topic, Header
/// The topic the data is being written to or read from.
/// </summary>
public string Topic { get; private set; }

/// <summary>
/// The component of the message the serialization operation relates to.
/// </summary>
Expand Down
16 changes: 8 additions & 8 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
namespace Confluent.SchemaRegistry.Serdes
{
/// <summary>
/// (async) Avro deserializer. Use this deserializer with GenericRecord,
/// types generated using the avrogen.exe tool or one of the following
/// (async) Avro deserializer. Use this deserializer with <see cref="GenericRecord"/>,
/// types generated using the avrogen.exe tool or one of the following
/// primitive types: int, long, float, double, boolean, string, byte[].
/// </summary>
/// <remarks>
Expand All @@ -39,17 +39,17 @@ public class AvroDeserializer<T> : IAsyncDeserializer<T>
{
private IAvroDeserializerImpl<T> deserializerImpl;

private ISchemaRegistryClient schemaRegistryClient;
private readonly ISchemaRegistryClient schemaRegistryClient;

/// <summary>
/// Initialize a new AvroDeserializer instance.
/// Initialize a new <see cref="AvroDeserializer{T}"/> instance.
/// </summary>
/// <param name="schemaRegistryClient">
/// An implementation of ISchemaRegistryClient used for
/// An implementation of <see cref="ISchemaRegistryClient"/> used for
/// communication with Confluent Schema Registry.
/// </param>
/// <param name="config">
/// Deserializer configuration properties (refer to
/// Deserializer configuration properties (refer to
/// <see cref="AvroDeserializerConfig" />).
/// </param>
public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config = null)
Expand Down Expand Up @@ -79,13 +79,13 @@ public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable<
/// The raw byte data to deserialize.
/// </param>
/// <param name="isNull">
/// True if this is a null value.
/// True if this is a <see langword="null"/> value.
/// </param>
/// <param name="context">
/// Context relevant to the deserialize operation.
/// </param>
/// <returns>
/// A <see cref="System.Threading.Tasks.Task" /> that completes
/// A <see cref="Task" /> that completes
/// with the deserialized value.
/// </returns>
public async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace Confluent.SchemaRegistry.Serdes
{
/// <summary>
/// <see cref="Confluent.SchemaRegistry.Serdes.AvroDeserializer{T}" />
/// <see cref="AvroDeserializer{T}" />
/// configuration properties.
/// </summary>
public class AvroDeserializerConfig : Config {}
Expand Down
48 changes: 24 additions & 24 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
namespace Confluent.SchemaRegistry.Serdes
{
/// <summary>
/// (async) Avro serializer. Use this serializer with GenericRecord,
/// types generated using the avrogen.exe tool or one of the following
/// (async) Avro serializer. Use this serializer with <see cref="GenericRecord"/>,
/// types generated using the avrogen.exe tool or one of the following
/// primitive types: int, long, float, double, boolean, string, byte[].
/// </summary>
/// <remarks>
Expand All @@ -37,53 +37,53 @@ namespace Confluent.SchemaRegistry.Serdes
/// </remarks>
public class AvroSerializer<T> : IAsyncSerializer<T>
{
private bool autoRegisterSchema = true;
private bool normalizeSchemas = false;
private bool useLatestVersion = false;
private int initialBufferSize = DefaultInitialBufferSize;
private SubjectNameStrategyDelegate subjectNameStrategy = null;
private readonly bool autoRegisterSchema = true;
private readonly bool normalizeSchemas = false;
private readonly bool useLatestVersion = false;
private readonly int initialBufferSize = DefaultInitialBufferSize;
private readonly SubjectNameStrategyDelegate subjectNameStrategy;

private IAvroSerializerImpl<T> serializerImpl;

private ISchemaRegistryClient schemaRegistryClient;
private readonly ISchemaRegistryClient schemaRegistryClient;

/// <summary>
/// The default initial size (in bytes) of buffers used for message
/// The default initial size (in bytes) of buffers used for message
/// serialization.
/// </summary>
public const int DefaultInitialBufferSize = 1024;


/// <summary>
/// Initialize a new instance of the AvroSerializer class.
/// Initialize a new instance of the <see cref="AvroSerializer{T}"/> class.
/// </summary>
[Obsolete("Superseded by AvroSerializer(ISchemaRegistryClient, AvroSerializerConfig)")]
public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable<KeyValuePair<string, string>> config)
{}


/// <summary>
/// Initialize a new instance of the AvroSerializer class.
/// Initialize a new instance of the <see cref="AvroSerializer{T}"/> class.
/// When passed as a parameter to the Confluent.Kafka.Producer constructor,
/// the following configuration properties will be extracted from the producer's
/// configuration property collection:
///
/// avro.serializer.buffer.bytes (default: 128) - Initial size (in bytes) of the buffer
/// used for message serialization. Use a value high enough to avoid resizing
/// the buffer, but small enough to avoid excessive memory use. Inspect the size of
/// the byte array returned by the Serialize method to estimate an appropriate value.
///
/// avro.serializer.buffer.bytes (default: 1024) - Initial size (in bytes) of the buffer
/// used for message serialization. Use a value high enough to avoid resizing
/// the buffer, but small enough to avoid excessive memory use. Inspect the size of
/// the byte array returned by the Serialize method to estimate an appropriate value.
/// Note: each call to serialize creates a new buffer.
///
/// avro.serializer.auto.register.schemas (default: true) - true if the serializer should
/// attempt to auto-register unrecognized schemas with Confluent Schema Registry,
///
/// avro.serializer.auto.register.schemas (default: true) - true if the serializer should
/// attempt to auto-register unrecognized schemas with Confluent Schema Registry,
/// false if not.
/// </summary>
/// <param name="schemaRegistryClient">
/// An implementation of ISchemaRegistryClient used for
/// An implementation of <see cref="ISchemaRegistryClient"/> used for
/// communication with Confluent Schema Registry.
/// </param>
/// <param name="config">
/// Serializer configuration properties (refer to
/// Serializer configuration properties (refer to
/// <see cref="AvroSerializerConfig" />)
/// </param>
public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializerConfig config = null)
Expand Down Expand Up @@ -126,7 +126,7 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
/// <summary>
/// Serialize an instance of type <typeparamref name="T"/> to a byte array in Avro format. The serialized
/// data is preceded by a "magic byte" (1 byte) and the id of the schema as registered
/// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw
/// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw
/// on first use for a particular topic during schema registration.
/// </summary>
/// <param name="value">
Expand All @@ -136,11 +136,11 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
/// Context relevant to the serialize operation.
/// </param>
/// <returns>
/// A <see cref="System.Threading.Tasks.Task" /> that completes with
/// A <see cref="Task" /> that completes with
/// <paramref name="value" /> serialized as a byte array.
/// </returns>
public async Task<byte[]> SerializeAsync(T value, SerializationContext context)
{
{
try
{
// null needs to treated specially since the client most likely just wants to send
Expand Down
67 changes: 19 additions & 48 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
namespace Confluent.SchemaRegistry.Serdes
{
/// <summary>
/// <see cref="Confluent.SchemaRegistry.Serdes.AvroSerializer{T}" />
/// <see cref="AvroSerializer{T}" />
/// configuration properties.
/// </summary>
public class AvroSerializerConfig : Config
{
/// <summary>
/// Configuration property names specific to
/// <see cref="Confluent.SchemaRegistry.Serdes.AvroSerializer{T}" />.
/// Configuration property names specific to
/// <see cref="AvroSerializer{T}" />.
/// </summary>
public static class PropertyNames
{
Expand All @@ -49,15 +49,15 @@ public static class PropertyNames
/// Specifies whether or not the Avro serializer should attempt to auto-register
/// unrecognized schemas with Confluent Schema Registry.
///
/// default: true
/// default: <see langword="true"/>
/// </summary>
public const string AutoRegisterSchemas = "avro.serializer.auto.register.schemas";

/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// default: <see langword="false"/>
/// </summary>
public const string NormalizeSchemas = "avro.serializer.normalize.schemas";

Expand All @@ -67,13 +67,15 @@ public static class PropertyNames
/// WARNING: There is no check that the latest schema is backwards compatible
/// with the schema of the object being serialized.
///
/// default: false
/// default: <see langword="false"/>
/// </summary>
public const string UseLatestVersion = "avro.serializer.use.latest.version";

/// <summary>
/// The subject name strategy to use for schema registration / lookup.
/// Possible values: <see cref="Confluent.SchemaRegistry.SubjectNameStrategy" />
/// Possible values: <see cref="SchemaRegistry.SubjectNameStrategy" />
///
/// default: <see cref="SubjectNameStrategy.Topic"/>
/// </summary>
public const string SubjectNameStrategy = "avro.serializer.subject.name.strategy";
}
Expand All @@ -92,68 +94,39 @@ public AvroSerializerConfig() { }
public AvroSerializerConfig(IEnumerable<KeyValuePair<string, string>> config) : base(config.ToDictionary(v => v.Key, v => v.Value)) { }


/// <summary>
/// Specifies the initial size (in bytes) of the buffer used for Avro message
/// serialization. Use a value high enough to avoid resizing the buffer, but
/// small enough to avoid excessive memory use. Inspect the size of the byte
/// array returned by the Serialize method to estimate an appropriate value.
/// Note: each call to serialize creates a new buffer.
///
/// default: 1024
/// </summary>
/// <inheritdoc cref="PropertyNames.BufferBytes"/>
public int? BufferBytes
{
get { return GetInt(PropertyNames.BufferBytes); }
set { SetObject(PropertyNames.BufferBytes, value); }
}


/// <summary>
/// Specifies whether or not the Avro serializer should attempt to auto-register
/// unrecognized schemas with Confluent Schema Registry.
///
/// default: true
/// </summary>
/// <inheritdoc cref="PropertyNames.AutoRegisterSchemas"/>
public bool? AutoRegisterSchemas
{
get { return GetBool(PropertyNames.AutoRegisterSchemas); }
set { SetObject(PropertyNames.AutoRegisterSchemas, value); }
}


/// <summary>
/// Specifies whether to normalize schemas, which will transform schemas
/// to have a consistent format, including ordering properties and references.
///
/// default: false
/// </summary>


/// <inheritdoc cref="PropertyNames.NormalizeSchemas"/>
public bool? NormalizeSchemas
{
get { return GetBool(PropertyNames.NormalizeSchemas); }
set { SetObject(PropertyNames.NormalizeSchemas, value); }
}


/// <summary>
/// Specifies whether or not the Avro serializer should use the latest subject
/// version for serialization.
/// WARNING: There is no check that the latest schema is backwards compatible
/// with the schema of the object being serialized.
///
/// default: false
/// </summary>
/// <inheritdoc cref="PropertyNames.UseLatestVersion"/>
public bool? UseLatestVersion
{
get { return GetBool(PropertyNames.UseLatestVersion); }
set { SetObject(PropertyNames.UseLatestVersion, value); }
}


/// <summary>
/// Subject name strategy.
///
/// default: SubjectNameStrategy.Topic
/// </summary>
/// <inheritdoc cref="PropertyNames.SubjectNameStrategy"/>
public SubjectNameStrategy? SubjectNameStrategy
{
get
Expand All @@ -162,8 +135,7 @@ public SubjectNameStrategy? SubjectNameStrategy
if (r == null) { return null; }
else
{
SubjectNameStrategy result;
if (!Enum.TryParse<SubjectNameStrategy>(r, out result))
if (!Enum.TryParse(r, out SubjectNameStrategy result))
throw new ArgumentException(
$"Unknown ${PropertyNames.SubjectNameStrategy} value: {r}.");
else
Expand All @@ -172,10 +144,9 @@ public SubjectNameStrategy? SubjectNameStrategy
}
set
{
if (value == null) { this.properties.Remove(PropertyNames.SubjectNameStrategy); }
else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); }
if (value == null) { properties.Remove(PropertyNames.SubjectNameStrategy); }
else { properties[PropertyNames.SubjectNameStrategy] = value.ToString(); }
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Confluent.Kafka\Confluent.Kafka.csproj" />
<ProjectReference Include="..\Confluent.SchemaRegistry\Confluent.SchemaRegistry.csproj" />
</ItemGroup>

Expand Down
Loading