diff --git a/src/Confluent.Kafka/IAsyncDeserializer.cs b/src/Confluent.Kafka/IAsyncDeserializer.cs index 35a38a427..2e0147a03 100644 --- a/src/Confluent.Kafka/IAsyncDeserializer.cs +++ b/src/Confluent.Kafka/IAsyncDeserializer.cs @@ -21,7 +21,7 @@ namespace Confluent.Kafka { /// - /// A deserializer for use with . + /// A deserializer for use with . /// public interface IAsyncDeserializer { @@ -32,7 +32,7 @@ public interface IAsyncDeserializer /// The raw byte data to deserialize. /// /// - /// True if this is a null value. + /// True if this is a value. /// /// /// Context relevant to the deserialize operation. diff --git a/src/Confluent.Kafka/IAsyncSerializer.cs b/src/Confluent.Kafka/IAsyncSerializer.cs index 8be1431a7..612a0f7cb 100644 --- a/src/Confluent.Kafka/IAsyncSerializer.cs +++ b/src/Confluent.Kafka/IAsyncSerializer.cs @@ -14,14 +14,13 @@ // // Refer to LICENSE for more information. -using System; using System.Threading.Tasks; namespace Confluent.Kafka { /// - /// Defines a serializer for use with . + /// Defines a serializer for use with . /// public interface IAsyncSerializer { @@ -36,7 +35,7 @@ public interface IAsyncSerializer /// Context relevant to the serialize operation. /// /// - /// A that + /// A that /// completes with the serialized data. /// Task SerializeAsync(T data, SerializationContext context); diff --git a/src/Confluent.Kafka/SerializationContext.cs b/src/Confluent.Kafka/SerializationContext.cs index a8b20f857..c3b000822 100644 --- a/src/Confluent.Kafka/SerializationContext.cs +++ b/src/Confluent.Kafka/SerializationContext.cs @@ -26,7 +26,7 @@ public struct SerializationContext /// The default SerializationContext value (representing no context defined). /// public static SerializationContext Empty - => default(SerializationContext); + => default; /// /// Create a new SerializationContext object instance. @@ -41,7 +41,7 @@ public static SerializationContext Empty /// The collection of message headers (or null). Specifying null or an /// empty list are equivalent. The order of headers is maintained, and /// duplicate header keys are allowed. - /// + /// public SerializationContext(MessageComponentType component, string topic, Headers headers = null) { Component = component; @@ -53,7 +53,7 @@ public SerializationContext(MessageComponentType component, string topic, Header /// The topic the data is being written to or read from. /// public string Topic { get; private set; } - + /// /// The component of the message the serialization operation relates to. /// diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs index 5616b04c1..ceae4ba15 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializer.cs @@ -25,8 +25,8 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// (async) Avro deserializer. Use this deserializer with GenericRecord, - /// types generated using the avrogen.exe tool or one of the following + /// (async) Avro deserializer. Use this deserializer with , + /// types generated using the avrogen.exe tool or one of the following /// primitive types: int, long, float, double, boolean, string, byte[]. /// /// @@ -39,17 +39,17 @@ public class AvroDeserializer : IAsyncDeserializer { private IAvroDeserializerImpl deserializerImpl; - private ISchemaRegistryClient schemaRegistryClient; + private readonly ISchemaRegistryClient schemaRegistryClient; /// - /// Initialize a new AvroDeserializer instance. + /// Initialize a new instance. /// /// - /// An implementation of ISchemaRegistryClient used for + /// An implementation of used for /// communication with Confluent Schema Registry. /// /// - /// Deserializer configuration properties (refer to + /// Deserializer configuration properties (refer to /// ). /// public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable> config = null) @@ -79,13 +79,13 @@ public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable< /// The raw byte data to deserialize. /// /// - /// True if this is a null value. + /// True if this is a value. /// /// /// Context relevant to the deserialize operation. /// /// - /// A that completes + /// A that completes /// with the deserialized value. /// public async Task DeserializeAsync(ReadOnlyMemory data, bool isNull, SerializationContext context) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializerConfig.cs index 93dd140c1..5973469cc 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroDeserializerConfig.cs @@ -20,7 +20,7 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class AvroDeserializerConfig : Config {} diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs index 4901b9386..ccdbe0150 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs @@ -25,8 +25,8 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// (async) Avro serializer. Use this serializer with GenericRecord, - /// types generated using the avrogen.exe tool or one of the following + /// (async) Avro serializer. Use this serializer with , + /// types generated using the avrogen.exe tool or one of the following /// primitive types: int, long, float, double, boolean, string, byte[]. /// /// @@ -37,25 +37,25 @@ namespace Confluent.SchemaRegistry.Serdes /// public class AvroSerializer : IAsyncSerializer { - private bool autoRegisterSchema = true; - private bool normalizeSchemas = false; - private bool useLatestVersion = false; - private int initialBufferSize = DefaultInitialBufferSize; - private SubjectNameStrategyDelegate subjectNameStrategy = null; + private readonly bool autoRegisterSchema = true; + private readonly bool normalizeSchemas = false; + private readonly bool useLatestVersion = false; + private readonly int initialBufferSize = DefaultInitialBufferSize; + private readonly SubjectNameStrategyDelegate subjectNameStrategy; private IAvroSerializerImpl serializerImpl; - private ISchemaRegistryClient schemaRegistryClient; + private readonly ISchemaRegistryClient schemaRegistryClient; /// - /// The default initial size (in bytes) of buffers used for message + /// The default initial size (in bytes) of buffers used for message /// serialization. /// public const int DefaultInitialBufferSize = 1024; /// - /// Initialize a new instance of the AvroSerializer class. + /// Initialize a new instance of the class. /// [Obsolete("Superseded by AvroSerializer(ISchemaRegistryClient, AvroSerializerConfig)")] public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable> config) @@ -63,27 +63,27 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, IEnumerable - /// Initialize a new instance of the AvroSerializer class. + /// Initialize a new instance of the class. /// When passed as a parameter to the Confluent.Kafka.Producer constructor, /// the following configuration properties will be extracted from the producer's /// configuration property collection: - /// - /// avro.serializer.buffer.bytes (default: 128) - Initial size (in bytes) of the buffer - /// used for message serialization. Use a value high enough to avoid resizing - /// the buffer, but small enough to avoid excessive memory use. Inspect the size of - /// the byte array returned by the Serialize method to estimate an appropriate value. + /// + /// avro.serializer.buffer.bytes (default: 1024) - Initial size (in bytes) of the buffer + /// used for message serialization. Use a value high enough to avoid resizing + /// the buffer, but small enough to avoid excessive memory use. Inspect the size of + /// the byte array returned by the Serialize method to estimate an appropriate value. /// Note: each call to serialize creates a new buffer. - /// - /// avro.serializer.auto.register.schemas (default: true) - true if the serializer should - /// attempt to auto-register unrecognized schemas with Confluent Schema Registry, + /// + /// avro.serializer.auto.register.schemas (default: true) - true if the serializer should + /// attempt to auto-register unrecognized schemas with Confluent Schema Registry, /// false if not. /// /// - /// An implementation of ISchemaRegistryClient used for + /// An implementation of used for /// communication with Confluent Schema Registry. /// /// - /// Serializer configuration properties (refer to + /// Serializer configuration properties (refer to /// ) /// public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializerConfig config = null) @@ -126,7 +126,7 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer /// /// Serialize an instance of type to a byte array in Avro format. The serialized /// data is preceded by a "magic byte" (1 byte) and the id of the schema as registered - /// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw + /// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw /// on first use for a particular topic during schema registration. /// /// @@ -136,11 +136,11 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer /// Context relevant to the serialize operation. /// /// - /// A that completes with + /// A that completes with /// serialized as a byte array. /// public async Task SerializeAsync(T value, SerializationContext context) - { + { try { // null needs to treated specially since the client most likely just wants to send diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs index d89a408c0..8eac50490 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializerConfig.cs @@ -23,14 +23,14 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class AvroSerializerConfig : Config { /// - /// Configuration property names specific to - /// . + /// Configuration property names specific to + /// . /// public static class PropertyNames { @@ -49,7 +49,7 @@ public static class PropertyNames /// Specifies whether or not the Avro serializer should attempt to auto-register /// unrecognized schemas with Confluent Schema Registry. /// - /// default: true + /// default: /// public const string AutoRegisterSchemas = "avro.serializer.auto.register.schemas"; @@ -57,7 +57,7 @@ public static class PropertyNames /// Specifies whether to normalize schemas, which will transform schemas /// to have a consistent format, including ordering properties and references. /// - /// default: false + /// default: /// public const string NormalizeSchemas = "avro.serializer.normalize.schemas"; @@ -67,13 +67,15 @@ public static class PropertyNames /// WARNING: There is no check that the latest schema is backwards compatible /// with the schema of the object being serialized. /// - /// default: false + /// default: /// public const string UseLatestVersion = "avro.serializer.use.latest.version"; /// /// The subject name strategy to use for schema registration / lookup. - /// Possible values: + /// Possible values: + /// + /// default: /// public const string SubjectNameStrategy = "avro.serializer.subject.name.strategy"; } @@ -92,15 +94,7 @@ public AvroSerializerConfig() { } public AvroSerializerConfig(IEnumerable> config) : base(config.ToDictionary(v => v.Key, v => v.Value)) { } - /// - /// Specifies the initial size (in bytes) of the buffer used for Avro message - /// serialization. Use a value high enough to avoid resizing the buffer, but - /// small enough to avoid excessive memory use. Inspect the size of the byte - /// array returned by the Serialize method to estimate an appropriate value. - /// Note: each call to serialize creates a new buffer. - /// - /// default: 1024 - /// + /// public int? BufferBytes { get { return GetInt(PropertyNames.BufferBytes); } @@ -108,25 +102,15 @@ public int? BufferBytes } - /// - /// Specifies whether or not the Avro serializer should attempt to auto-register - /// unrecognized schemas with Confluent Schema Registry. - /// - /// default: true - /// + /// public bool? AutoRegisterSchemas { get { return GetBool(PropertyNames.AutoRegisterSchemas); } set { SetObject(PropertyNames.AutoRegisterSchemas, value); } } - - - /// - /// Specifies whether to normalize schemas, which will transform schemas - /// to have a consistent format, including ordering properties and references. - /// - /// default: false - /// + + + /// public bool? NormalizeSchemas { get { return GetBool(PropertyNames.NormalizeSchemas); } @@ -134,14 +118,7 @@ public bool? NormalizeSchemas } - /// - /// Specifies whether or not the Avro serializer should use the latest subject - /// version for serialization. - /// WARNING: There is no check that the latest schema is backwards compatible - /// with the schema of the object being serialized. - /// - /// default: false - /// + /// public bool? UseLatestVersion { get { return GetBool(PropertyNames.UseLatestVersion); } @@ -149,11 +126,7 @@ public bool? UseLatestVersion } - /// - /// Subject name strategy. - /// - /// default: SubjectNameStrategy.Topic - /// + /// public SubjectNameStrategy? SubjectNameStrategy { get @@ -162,8 +135,7 @@ public SubjectNameStrategy? SubjectNameStrategy if (r == null) { return null; } else { - SubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out SubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.SubjectNameStrategy} value: {r}."); else @@ -172,10 +144,9 @@ public SubjectNameStrategy? SubjectNameStrategy } set { - if (value == null) { this.properties.Remove(PropertyNames.SubjectNameStrategy); } - else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } + if (value == null) { properties.Remove(PropertyNames.SubjectNameStrategy); } + else { properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } } } - } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj b/src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj index d908fb990..9dbd68eba 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj @@ -26,7 +26,6 @@ - diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs index 66c4d4364..d48a8173f 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs @@ -32,12 +32,12 @@ internal class GenericDeserializerImpl : IAvroDeserializerImpl /// A datum reader cache (one corresponding to each write schema that's been seen) /// is maintained so that they only need to be constructed once. /// - private readonly Dictionary> datumReaderBySchemaId + private readonly Dictionary> datumReaderBySchemaId = new Dictionary>(); - - private SemaphoreSlim deserializeMutex = new SemaphoreSlim(1); - private ISchemaRegistryClient schemaRegistryClient; + private readonly SemaphoreSlim deserializeMutex = new SemaphoreSlim(1); + + private readonly ISchemaRegistryClient schemaRegistryClient; public GenericDeserializerImpl(ISchemaRegistryClient schemaRegistryClient) { @@ -48,7 +48,7 @@ public async Task Deserialize(string topic, byte[] array) { try { - // Note: topic is not necessary for deserialization (or knowing if it's a key + // Note: topic is not necessary for deserialization (or knowing if it's a key // or value) only the schema id is needed. if (array.Length < 5) @@ -74,8 +74,8 @@ public async Task Deserialize(string topic, byte[] array) if (datumReader == null) { // TODO: If any of this cache fills up, this is probably an - // indication of misuse of the deserializer. Ideally we would do - // something more sophisticated than the below + not allow + // indication of misuse of the deserializer. Ideally we would do + // something more sophisticated than the below + not allow // the misuse to keep happening without warning. if (datumReaderBySchemaId.Count > schemaRegistryClient.MaxCachedSchemas) { @@ -87,7 +87,7 @@ public async Task Deserialize(string topic, byte[] array) { throw new InvalidOperationException("Expecting writer schema to have type Avro, not {writerSchemaResult.SchemaType}"); } - var writerSchema = global::Avro.Schema.Parse(writerSchemaResult.SchemaString); + var writerSchema = Avro.Schema.Parse(writerSchemaResult.SchemaString); datumReader = new GenericReader(writerSchema, writerSchema); datumReaderBySchemaId[writerId] = datumReader; @@ -97,7 +97,7 @@ public async Task Deserialize(string topic, byte[] array) { deserializeMutex.Release(); } - + return datumReader.Read(default(GenericRecord), new BinaryDecoder(stream)); } } @@ -106,6 +106,5 @@ public async Task Deserialize(string topic, byte[] array) throw e.InnerException; } } - } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs index 85c23388c..f70e3f2a8 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs @@ -17,33 +17,33 @@ // Disable obsolete warnings. ConstructValueSubjectName is still used a an internal implementation detail. #pragma warning disable CS0618 +using Avro.Generic; +using Avro.IO; +using Confluent.Kafka; using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka; -using Avro.Generic; -using Avro.IO; namespace Confluent.SchemaRegistry.Serdes { internal class GenericSerializerImpl : IAvroSerializerImpl { - private ISchemaRegistryClient schemaRegistryClient; - private bool autoRegisterSchema; - private bool normalizeSchemas; - private bool useLatestVersion; - private int initialBufferSize; - private SubjectNameStrategyDelegate subjectNameStrategy; + private readonly ISchemaRegistryClient schemaRegistryClient; + private readonly bool autoRegisterSchema; + private readonly bool normalizeSchemas; + private readonly bool useLatestVersion; + private readonly int initialBufferSize; + private readonly SubjectNameStrategyDelegate subjectNameStrategy; - private Dictionary knownSchemas = new Dictionary(); - private HashSet> registeredSchemas = new HashSet>(); - private Dictionary schemaIds = new Dictionary(); + private readonly Dictionary knownSchemas = new Dictionary(); + private readonly HashSet> registeredSchemas = new HashSet>(); + private readonly Dictionary schemaIds = new Dictionary(); - private SemaphoreSlim serializeMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim serializeMutex = new SemaphoreSlim(1); public GenericSerializerImpl( ISchemaRegistryClient schemaRegistryClient, @@ -62,9 +62,9 @@ public GenericSerializerImpl( } /// - /// Serialize GenericRecord instance to a byte array in Avro format. The serialized + /// Serialize instance to a byte array in Avro format. The serialized /// data is preceded by a "magic byte" (1 byte) and the id of the schema as registered - /// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw + /// in Confluent's Schema Registry (4 bytes, network byte order). This call may block or throw /// on first use for a particular topic during schema registration. /// /// @@ -84,13 +84,13 @@ public async Task Serialize(string topic, GenericRecord data, bool isKey try { int schemaId; - global::Avro.RecordSchema writerSchema; + Avro.RecordSchema writerSchema; await serializeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { // TODO: If any of these caches fills up, this is probably an - // indication of misuse of the serializer. Ideally we would do - // something more sophisticated than the below + not allow + // indication of misuse of the serializer. Ideally we would do + // something more sophisticated than the below + not allow // the misuse to keep happening without warning. if (knownSchemas.Count > schemaRegistryClient.MaxCachedSchemas || registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas || @@ -103,7 +103,7 @@ public async Task Serialize(string topic, GenericRecord data, bool isKey // Determine a schema string corresponding to the schema object. // TODO: It would be more efficient to use a hash function based - // on the instance reference, not the implementation provided by + // on the instance reference, not the implementation provided by // Schema. writerSchema = data.Schema; string writerSchemaString = null; @@ -117,12 +117,12 @@ public async Task Serialize(string topic, GenericRecord data, bool isKey knownSchemas.Add(writerSchema, writerSchemaString); } - // Verify schema compatibility (& register as required) + get the + // Verify schema compatibility (& register as required) + get the // id corresponding to the schema. - - // TODO: Again, the hash functions in use below are potentially + + // TODO: Again, the hash functions in use below are potentially // slow since writerSchemaString is potentially long. It would be - // better to use hash functions based on the writerSchemaString + // better to use hash functions based on the writerSchemaString // object reference, not value. string subject = this.subjectNameStrategy != null diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SerdeType.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SerdeType.cs index 087986fcb..5312cdb68 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SerdeType.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SerdeType.cs @@ -18,7 +18,7 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// Enumeration of serde types recognized by the + /// Enumeration of serde types recognized by the /// Avro ProduceAsync and Produce methods. /// public enum SerdeType diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs index 949c04059..fd1b4efbf 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs @@ -14,17 +14,16 @@ // // Refer to LICENSE for more information. +using Avro.Generic; +using Avro.IO; +using Avro.Specific; +using Confluent.Kafka; using System; using System.Collections.Generic; using System.IO; using System.Net; -using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Avro.Specific; -using Avro.IO; -using Avro.Generic; -using Confluent.Kafka; namespace Confluent.SchemaRegistry.Serdes @@ -32,20 +31,20 @@ namespace Confluent.SchemaRegistry.Serdes internal class SpecificDeserializerImpl : IAvroDeserializerImpl { /// - /// A datum reader cache (one corresponding to each write schema that's been seen) + /// A datum reader cache (one corresponding to each write schema that's been seen) /// is maintained so that they only need to be constructed once. /// - private readonly Dictionary> datumReaderBySchemaId + private readonly Dictionary> datumReaderBySchemaId = new Dictionary>(); - private SemaphoreSlim deserializeMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim deserializeMutex = new SemaphoreSlim(1); /// /// The Avro schema used to read values of type /// - public global::Avro.Schema ReaderSchema { get; private set; } + public Avro.Schema ReaderSchema { get; private set; } - private ISchemaRegistryClient schemaRegistryClient; + private readonly ISchemaRegistryClient schemaRegistryClient; public SpecificDeserializerImpl(ISchemaRegistryClient schemaRegistryClient) { @@ -57,35 +56,35 @@ public SpecificDeserializerImpl(ISchemaRegistryClient schemaRegistryClient) } else if (typeof(T).Equals(typeof(int))) { - ReaderSchema = global::Avro.Schema.Parse("int"); + ReaderSchema = Avro.Schema.Parse("int"); } else if (typeof(T).Equals(typeof(bool))) { - ReaderSchema = global::Avro.Schema.Parse("boolean"); + ReaderSchema = Avro.Schema.Parse("boolean"); } else if (typeof(T).Equals(typeof(double))) { - ReaderSchema = global::Avro.Schema.Parse("double"); + ReaderSchema = Avro.Schema.Parse("double"); } else if (typeof(T).Equals(typeof(string))) { - ReaderSchema = global::Avro.Schema.Parse("string"); + ReaderSchema = Avro.Schema.Parse("string"); } else if (typeof(T).Equals(typeof(float))) { - ReaderSchema = global::Avro.Schema.Parse("float"); + ReaderSchema = Avro.Schema.Parse("float"); } else if (typeof(T).Equals(typeof(long))) { - ReaderSchema = global::Avro.Schema.Parse("long"); + ReaderSchema = Avro.Schema.Parse("long"); } else if (typeof(T).Equals(typeof(byte[]))) { - ReaderSchema = global::Avro.Schema.Parse("bytes"); + ReaderSchema = Avro.Schema.Parse("bytes"); } else if (typeof(T).Equals(typeof(Null))) { - ReaderSchema = global::Avro.Schema.Parse("null"); + ReaderSchema = Avro.Schema.Parse("null"); } else { @@ -101,7 +100,7 @@ public async Task Deserialize(string topic, byte[] array) { try { - // Note: topic is not necessary for deserialization (or knowing if it's a key + // Note: topic is not necessary for deserialization (or knowing if it's a key // or value) only the schema id is needed. if (array.Length < 5) @@ -132,7 +131,7 @@ public async Task Deserialize(string topic, byte[] array) } var writerSchemaJson = await schemaRegistryClient.GetSchemaAsync(writerId).ConfigureAwait(continueOnCapturedContext: false); - var writerSchema = global::Avro.Schema.Parse(writerSchemaJson.SchemaString); + var writerSchema = Avro.Schema.Parse(writerSchemaJson.SchemaString); datumReader = new SpecificReader(writerSchema, ReaderSchema); datumReaderBySchemaId[writerId] = datumReader; @@ -145,7 +144,7 @@ public async Task Deserialize(string topic, byte[] array) if (typeof(ISpecificRecord).IsAssignableFrom(typeof(T))) { - // This is a generic deserializer and it knows the type that needs to be serialized into. + // This is a generic deserializer and it knows the type that needs to be serialized into. // Passing default(T) will result in null value and that will force the datumRead to // use the schema namespace and name provided in the schema, which may not match (T). var reuse = Activator.CreateInstance(); @@ -160,6 +159,5 @@ public async Task Deserialize(string topic, byte[] array) throw e.InnerException; } } - } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs index e3faa3cb6..4c92c43fc 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs @@ -17,17 +17,15 @@ // Disable obsolete warnings. ConstructValueSubjectName is still used a an internal implementation detail. #pragma warning disable CS0618 +using Avro.IO; +using Avro.Specific; +using Confluent.Kafka; using System; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Net; -using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Avro.IO; -using Avro.Specific; -using Confluent.Kafka; namespace Confluent.SchemaRegistry.Serdes @@ -37,7 +35,7 @@ internal class SpecificSerializerImpl : IAvroSerializerImpl internal class SerializerSchemaData { private string writerSchemaString; - private global::Avro.Schema writerSchema; + private Avro.Schema writerSchema; /// /// A given schema is uniquely identified by a schema id, even when @@ -80,21 +78,21 @@ public SpecificWriter AvroWriter } } - private ISchemaRegistryClient schemaRegistryClient; - private bool autoRegisterSchema; - private bool normalizeSchemas; - private bool useLatestVersion; - private int initialBufferSize; - private SubjectNameStrategyDelegate subjectNameStrategy; + private readonly ISchemaRegistryClient schemaRegistryClient; + private readonly bool autoRegisterSchema; + private readonly bool normalizeSchemas; + private readonly bool useLatestVersion; + private readonly int initialBufferSize; + private readonly SubjectNameStrategyDelegate subjectNameStrategy; - private Dictionary multiSchemaData = + private readonly Dictionary multiSchemaData = new Dictionary(); - private SerializerSchemaData singleSchemaData = null; + private readonly SerializerSchemaData singleSchemaData; - private SemaphoreSlim serializeMutex = new SemaphoreSlim(1); + private readonly SemaphoreSlim serializeMutex = new SemaphoreSlim(1); public SpecificSerializerImpl( ISchemaRegistryClient schemaRegistryClient, @@ -127,41 +125,41 @@ private static SerializerSchemaData ExtractSchemaData(Type writerType) } else if (writerType.Equals(typeof(int))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("int"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("int"); } else if (writerType.Equals(typeof(bool))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("boolean"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("boolean"); } else if (writerType.Equals(typeof(double))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("double"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("double"); } else if (writerType.Equals(typeof(string))) { // Note: It would arguably be better to make this a union with null, to // exactly match the .NET string type, however we don't for consistency // with the Java Avro serializer. - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("string"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("string"); } else if (writerType.Equals(typeof(float))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("float"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("float"); } else if (writerType.Equals(typeof(long))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("long"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("long"); } else if (writerType.Equals(typeof(byte[]))) { // Note: It would arguably be better to make this a union with null, to // exactly match the .NET byte[] type, however we don't for consistency // with the Java Avro serializer. - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("bytes"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("bytes"); } else if (writerType.Equals(typeof(Null))) { - serializerSchemaData.WriterSchema = global::Avro.Schema.Parse("null"); + serializerSchemaData.WriterSchema = Avro.Schema.Parse("null"); } else { @@ -199,9 +197,9 @@ public async Task Serialize(string topic, T data, bool isKey) } string fullname = null; - if (data is ISpecificRecord && ((ISpecificRecord)data).Schema is Avro.RecordSchema) + if (data is ISpecificRecord record && record.Schema is Avro.RecordSchema schema) { - fullname = ((Avro.RecordSchema)((ISpecificRecord)data).Schema).Fullname; + fullname = schema.Fullname; } string subject = this.subjectNameStrategy != null diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj b/src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj index e035c30c7..d1acb3360 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj +++ b/src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj @@ -26,7 +26,6 @@ - diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs index 593c8334e..fb022d9f1 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs @@ -53,11 +53,11 @@ public class JsonDeserializer : IAsyncDeserializer where T : class private readonly int headerSize = sizeof(int) + sizeof(byte); private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings; - private JsonSchema schema = null; + private JsonSchema schema; private ISchemaRegistryClient schemaRegistryClient; - + /// - /// Initialize a new JsonDeserializer instance + /// Initialize a new instance /// with a given Schema. /// /// @@ -65,7 +65,7 @@ public class JsonDeserializer : IAsyncDeserializer where T : class /// /// /// Schema to use for validation, used when external - /// schema references are present in the schema. + /// schema references are present in the schema. /// Populate the References list of the schema for /// the same. Assuming the referenced schemas have /// already been registered in the registry. @@ -97,7 +97,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, Schema schem } /// - /// Initialize a new JsonDeserializer instance. + /// Initialize a new instance. /// /// /// Deserializer configuration properties (refer to @@ -126,13 +126,13 @@ public JsonDeserializer(IEnumerable> config = null, /// The raw byte data to deserialize. /// /// - /// True if this is a null value. + /// True if this is a value. /// /// /// Context relevant to the deserialize operation. /// /// - /// A that completes + /// A that completes /// with the deserialized value. /// public Task DeserializeAsync(ReadOnlyMemory data, bool isNull, SerializationContext context) @@ -150,15 +150,15 @@ public Task DeserializeAsync(ReadOnlyMemory data, bool isNull, Serializ if (array[0] != Constants.MagicByte) { - throw new InvalidDataException($"Expecting message {context.Component.ToString()} with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}"); + throw new InvalidDataException($"Expecting message {context.Component} with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}"); } // A schema is not required to deserialize json messages. using (var stream = new MemoryStream(array, headerSize, array.Length - headerSize)) - using (var sr = new System.IO.StreamReader(stream, Encoding.UTF8)) + using (var sr = new StreamReader(stream, Encoding.UTF8)) { string serializedString = sr.ReadToEnd(); - + if (this.schema != null) { JsonSchemaValidator validator = new JsonSchemaValidator(); diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs index d495e7e5e..ed04c843d 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializerConfig.cs @@ -20,7 +20,7 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class JsonDeserializerConfig : Config {} diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSchemaResolver.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSchemaResolver.cs index 9f205ea5a..870847dbf 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSchemaResolver.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSchemaResolver.cs @@ -100,7 +100,7 @@ public JsonSchema GetResolvedSchema(){ } /// - /// Initialize a new instance of the JsonSerDesSchemaUtils class. + /// Initialize a new instance of the class. /// /// /// Confluent Schema Registry client instance that would be used to fetch @@ -108,14 +108,15 @@ public JsonSchema GetResolvedSchema(){ /// /// /// Schema to use for validation, used when external - /// schema references are present in the schema. + /// schema references are present in the schema. /// Populate the References list of the schema for /// the same. /// /// /// Schema generator setting to use. /// - public JsonSchemaResolver(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null){ + public JsonSchemaResolver(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null) + { this.schemaRegistryClient = schemaRegistryClient; this.root = schema; this.jsonSchemaGeneratorSettings = jsonSchemaGeneratorSettings; diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index 3f4e03e95..9b8b459f6 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -14,7 +14,7 @@ // // Refer to LICENSE for more information. -// Disable obsolete warnings. ConstructValueSubjectName is still used a an internal implementation detail. +// Disable obsolete warnings. ConstructValueSubjectName is still used as an internal implementation detail. #pragma warning disable CS0618 using System; @@ -46,7 +46,7 @@ namespace Confluent.SchemaRegistry.Serdes /// /// Internally, the serializer uses Newtonsoft.Json for /// serialization and NJsonSchema for schema creation and - /// validation. You can use any property annotations recognised + /// validation. You can use any property annotations recognized /// by these libraries. /// /// Note: Off-the-shelf libraries do not yet exist to enable @@ -62,13 +62,13 @@ public class JsonSerializer : IAsyncSerializer where T : class private bool useLatestVersion = false; private bool latestCompatibilityStrict = false; private int initialBufferSize = DefaultInitialBufferSize; - private SubjectNameStrategyDelegate subjectNameStrategy = null; + private SubjectNameStrategyDelegate subjectNameStrategy; private ISchemaRegistryClient schemaRegistryClient; private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings; - private HashSet subjectsRegistered = new HashSet(); - private SemaphoreSlim serializeMutex = new SemaphoreSlim(1); + private readonly HashSet subjectsRegistered = new HashSet(); + private readonly SemaphoreSlim serializeMutex = new SemaphoreSlim(1); private readonly List ReferenceList = new List(); - private JsonSchemaValidator validator = new JsonSchemaValidator(); + private readonly JsonSchemaValidator validator = new JsonSchemaValidator(); /// /// A given schema is uniquely identified by a schema id, even when @@ -76,9 +76,9 @@ public class JsonSerializer : IAsyncSerializer where T : class /// private int? schemaId; - private JsonSchema schema; - private string schemaText; - private string schemaFullname; + private readonly JsonSchema schema; + private readonly string schemaText; + private readonly string schemaFullname; private void SetConfigUtil(JsonSerializerConfig config) { @@ -97,14 +97,14 @@ private void SetConfigUtil(JsonSerializerConfig config) if (config.LatestCompatibilityStrict != null) { this.latestCompatibilityStrict = config.LatestCompatibilityStrict.Value; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } - if (this.useLatestVersion && this.autoRegisterSchema) + if (useLatestVersion && autoRegisterSchema) { throw new ArgumentException($"JsonSerializer: cannot enable both use.latest.version and auto.register.schemas"); } } /// - /// Initialize a new instance of the JsonSerializer class. + /// Initialize a new instance of the class. /// /// /// Confluent Schema Registry client instance. @@ -130,15 +130,15 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer } /// - /// Initialize a new instance of the JsonSerializer class - /// with a given Schema. + /// Initialize a new instance of the class + /// with a given . /// /// /// Confluent Schema Registry client instance. /// /// /// Schema to use for validation, used when external - /// schema references are present in the schema. + /// schema references are present in the schema. /// Populate the References list of the schema for /// the same. /// @@ -169,8 +169,8 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, } /// - /// Serialize an instance of type to a UTF8 encoded JSON - /// represenation. The serialized data is preceeded by: + /// Serialize an instance of type to a UTF8 encoded JSON + /// representation. The serialized data is preceded by: /// 1. A "magic byte" (1 byte) that identifies this as a message with /// Confluent Platform framing. /// 2. The id of the schema as registered in Confluent's Schema Registry @@ -185,7 +185,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, /// Context relevant to the serialize operation. /// /// - /// A that completes with + /// A that completes with /// serialized as a byte array. /// public async Task SerializeAsync(T value, SerializationContext context) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs index 0180aff35..5948ece5c 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializerConfig.cs @@ -23,14 +23,14 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class JsonSerializerConfig : Config { /// - /// Configuration property names specific to - /// . + /// Configuration property names specific to + /// . /// public static class PropertyNames { @@ -49,7 +49,7 @@ public static class PropertyNames /// Specifies whether or not the JSON serializer should attempt to auto-register /// unrecognized schemas with Confluent Schema Registry. /// - /// default: true + /// default: /// public const string AutoRegisterSchemas = "json.serializer.auto.register.schemas"; @@ -57,7 +57,7 @@ public static class PropertyNames /// Specifies whether to normalize schemas, which will transform schemas /// to have a consistent format, including ordering properties and references. /// - /// default: false + /// default: /// public const string NormalizeSchemas = "json.serializer.normalize.schemas"; @@ -66,23 +66,25 @@ public static class PropertyNames /// version for serialization. /// WARNING: There is no check that the latest schema is compatible /// with the schema of the object being serialized by default. - /// Use the LatestCompatibilityStrict config property to enable this. + /// Use the config property to enable this. /// - /// default: false + /// default: /// public const string UseLatestVersion = "json.serializer.use.latest.version"; /// - /// Specifies whether or not the JSON serializer should check the compatibility - /// with the latest schema of the subject if use.latest.version is set to true. + /// Specifies whether or not the JSON serializer should check the compatibility + /// with the latest schema of the subject if use.latest.version is set to . /// - /// default: false + /// default: /// public const string LatestCompatibilityStrict = "json.serializer.latest.compatibility.strict"; /// /// The subject name strategy to use for schema registration / lookup. - /// Possible values: + /// Possible values: + /// + /// default: /// public const string SubjectNameStrategy = "json.serializer.subject.name.strategy"; } @@ -101,15 +103,7 @@ public JsonSerializerConfig() { } public JsonSerializerConfig(IEnumerable> config) : base(config.ToDictionary(v => v.Key, v => v.Value)) { } - /// - /// Specifies the initial size (in bytes) of the buffer used for JSON message - /// serialization. Use a value high enough to avoid resizing the buffer, but - /// small enough to avoid excessive memory use. Inspect the size of the byte - /// array returned by the Serialize method to estimate an appropriate value. - /// Note: each call to serialize creates a new buffer. - /// - /// default: 1024 - /// + /// public int? BufferBytes { get { return GetInt(PropertyNames.BufferBytes); } @@ -117,25 +111,15 @@ public int? BufferBytes } - /// - /// Specifies whether or not the JSON serializer should attempt to auto-register - /// unrecognized schemas with Confluent Schema Registry. - /// - /// default: true - /// + /// public bool? AutoRegisterSchemas { get { return GetBool(PropertyNames.AutoRegisterSchemas); } set { SetObject(PropertyNames.AutoRegisterSchemas, value); } } - - /// - /// Specifies whether to normalize schemas, which will transform schemas - /// to have a consistent format, including ordering properties and references. - /// - /// default: false - /// + + /// public bool? NormalizeSchemas { get { return GetBool(PropertyNames.NormalizeSchemas); } @@ -143,15 +127,7 @@ public bool? NormalizeSchemas } - /// - /// Specifies whether or not the JSON serializer should use the latest subject - /// version for serialization. - /// WARNING: There is no check that the latest schema is compatible - /// with the schema of the object being serialized by default. - /// Use the LatestCompatibilityStrict config property to enable this. - /// - /// default: false - /// + /// public bool? UseLatestVersion { get { return GetBool(PropertyNames.UseLatestVersion); } @@ -159,24 +135,15 @@ public bool? UseLatestVersion } - /// - /// Specifies whether or not the JSON serializer should check the backwards compatibility - /// with the latest schema of the subject. - /// - /// default: false - /// + /// public bool? LatestCompatibilityStrict { get { return GetBool(PropertyNames.LatestCompatibilityStrict); } set { SetObject(PropertyNames.LatestCompatibilityStrict, value); } } - - /// - /// Subject name strategy. - /// - /// default: SubjectNameStrategy.Topic - /// + + /// public SubjectNameStrategy? SubjectNameStrategy { get @@ -185,8 +152,7 @@ public SubjectNameStrategy? SubjectNameStrategy if (r == null) { return null; } else { - SubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out SubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.SubjectNameStrategy} value: {r}."); else @@ -195,10 +161,9 @@ public SubjectNameStrategy? SubjectNameStrategy } set { - if (value == null) { this.properties.Remove(PropertyNames.SubjectNameStrategy); } - else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } + if (value == null) { properties.Remove(PropertyNames.SubjectNameStrategy); } + else { properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } } } - } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj b/src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj index 476cda61f..220a2a922 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj @@ -26,7 +26,6 @@ - diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs index 3060de0e4..642c38778 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializer.cs @@ -47,15 +47,15 @@ namespace Confluent.SchemaRegistry.Serdes /// public class ProtobufDeserializer : IAsyncDeserializer where T : class, IMessage, new() { - private bool useDeprecatedFormat = false; - - private MessageParser parser; + private readonly bool useDeprecatedFormat = false; + + private readonly MessageParser parser; /// - /// Initialize a new ProtobufDeserializer instance. + /// Initialize a new instance. /// /// - /// Deserializer configuration properties (refer to + /// Deserializer configuration properties (refer to /// ). /// public ProtobufDeserializer(IEnumerable> config = null) @@ -85,13 +85,13 @@ public ProtobufDeserializer(IEnumerable> config = n /// The raw byte data to deserialize. /// /// - /// True if this is a null value. + /// True if this is a value. /// /// /// Context relevant to the deserialize operation. /// /// - /// A that completes + /// A that completes /// with the deserialized value. /// public Task DeserializeAsync(ReadOnlyMemory data, bool isNull, SerializationContext context) @@ -112,7 +112,7 @@ public Task DeserializeAsync(ReadOnlyMemory data, bool isNull, Serializ var magicByte = reader.ReadByte(); if (magicByte != Constants.MagicByte) { - throw new InvalidDataException($"Expecting message {context.Component.ToString()} with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}"); + throw new InvalidDataException($"Expecting message {context.Component} with Confluent Schema Registry framing. Magic byte was {array[0]}, expecting {Constants.MagicByte}"); } // A schema is not required to deserialize protobuf messages since the diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializerConfig.cs index 3b45ce47d..6e347e3aa 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufDeserializerConfig.cs @@ -22,14 +22,14 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class ProtobufDeserializerConfig : Config { /// - /// Configuration property names specific to - /// . + /// Configuration property names specific to + /// . /// public static class PropertyNames { @@ -37,11 +37,11 @@ public static class PropertyNames /// Specifies whether the Protobuf deserializer should deserialize message indexes /// without zig-zag encoding. /// - /// default: false + /// default: /// public const string UseDeprecatedFormat = "protobuf.deserializer.use.deprecated.format"; } - + /// /// Initialize a new . /// @@ -55,12 +55,7 @@ public ProtobufDeserializerConfig() { } public ProtobufDeserializerConfig(IEnumerable> config) : base(config.ToDictionary(v => v.Key, v => v.Value)) { } - /// - /// Specifies whether the Protobuf deserializer should deserialize message indexes - /// without zig-zag encoding. - /// - /// default: false - /// + /// public bool? UseDeprecatedFormat { get { return GetBool(PropertyNames.UseDeprecatedFormat); } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index 24e0dbd37..5cea1c920 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -54,26 +54,26 @@ namespace Confluent.SchemaRegistry.Serdes { private const int DefaultInitialBufferSize = 1024; - private bool autoRegisterSchema = true; - private bool normalizeSchemas = false; - private bool useLatestVersion = false; - private bool skipKnownTypes = false; - private bool useDeprecatedFormat = false; - private int initialBufferSize = DefaultInitialBufferSize; - private SubjectNameStrategyDelegate subjectNameStrategy = null; - private ReferenceSubjectNameStrategyDelegate referenceSubjectNameStrategy = null; - private ISchemaRegistryClient schemaRegistryClient; - - private HashSet subjectsRegistered = new HashSet(); - private SemaphoreSlim serializeMutex = new SemaphoreSlim(1); + private readonly bool autoRegisterSchema = true; + private readonly bool normalizeSchemas = false; + private readonly bool useLatestVersion = false; + private readonly bool skipKnownTypes = false; + private readonly bool useDeprecatedFormat = false; + private readonly int initialBufferSize = DefaultInitialBufferSize; + private readonly SubjectNameStrategyDelegate subjectNameStrategy; + private readonly ReferenceSubjectNameStrategyDelegate referenceSubjectNameStrategy; + private readonly ISchemaRegistryClient schemaRegistryClient; + + private readonly HashSet subjectsRegistered = new HashSet(); + private readonly SemaphoreSlim serializeMutex = new SemaphoreSlim(1); /// /// A given schema is uniquely identified by a schema id, even when /// registered against multiple subjects. /// - private int? schemaId = null; + private int? schemaId; - private byte[] indexArray = null; + private byte[] indexArray; /// @@ -84,8 +84,8 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe this.schemaRegistryClient = schemaRegistryClient; if (config == null) - { - this.referenceSubjectNameStrategy = ReferenceSubjectNameStrategy.ReferenceName.ToDelegate(); + { + referenceSubjectNameStrategy = ReferenceSubjectNameStrategy.ReferenceName.ToDelegate(); return; } @@ -102,18 +102,18 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe if (config.SkipKnownTypes != null) { this.skipKnownTypes = config.SkipKnownTypes.Value; } if (config.UseDeprecatedFormat != null) { this.useDeprecatedFormat = config.UseDeprecatedFormat.Value; } if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); } - this.referenceSubjectNameStrategy = config.ReferenceSubjectNameStrategy == null + referenceSubjectNameStrategy = config.ReferenceSubjectNameStrategy == null ? ReferenceSubjectNameStrategy.ReferenceName.ToDelegate() : config.ReferenceSubjectNameStrategy.Value.ToDelegate(); - if (this.useLatestVersion && this.autoRegisterSchema) + if (useLatestVersion && autoRegisterSchema) { throw new ArgumentException($"ProtobufSerializer: cannot enable both use.latest.version and auto.register.schemas"); } } - private static byte[] createIndexArray(MessageDescriptor md, bool useDeprecatedFormat) + private static byte[] CreateIndexArray(MessageDescriptor md, bool useDeprecatedFormat) { var indices = new List(); @@ -203,8 +203,9 @@ private async Task> RegisterOrGetReferences(FileDescriptor { continue; } - - Func> t = async (FileDescriptor dependency) => { + + Func> t = async (FileDescriptor dependency) => + { var dependencyReferences = await RegisterOrGetReferences(dependency, context, autoRegisterSchema, skipKnownTypes).ConfigureAwait(continueOnCapturedContext: false); var subject = referenceSubjectNameStrategy(context, dependency.Name); var schema = new Schema(dependency.SerializedData.ToBase64(), dependencyReferences, SchemaType.Protobuf); @@ -224,7 +225,7 @@ private async Task> RegisterOrGetReferences(FileDescriptor /// /// Serialize an instance of type to a byte array - /// in Protobuf format. The serialized data is preceeded by: + /// in Protobuf format. The serialized data is preceded by: /// 1. A "magic byte" (1 byte) that identifies this as a message with /// Confluent Platform framing. /// 2. The id of the schema as registered in Confluent's Schema Registry @@ -245,7 +246,7 @@ private async Task> RegisterOrGetReferences(FileDescriptor /// Context relevant to the serialize operation. /// /// - /// A that completes with + /// A that completes with /// serialized as a byte array. /// public async Task SerializeAsync(T value, SerializationContext context) @@ -256,7 +257,7 @@ public async Task SerializeAsync(T value, SerializationContext context) { if (this.indexArray == null) { - this.indexArray = createIndexArray(value.Descriptor, useDeprecatedFormat); + this.indexArray = CreateIndexArray(value.Descriptor, useDeprecatedFormat); } string fullname = value.Descriptor.FullName; @@ -314,7 +315,7 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema { stream.WriteByte(Constants.MagicByte); writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value)); - writer.Write(this.indexArray); + writer.Write(indexArray); value.WriteTo(stream); return stream.ToArray(); } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs index 97cd5aa43..8c2ca2dda 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializerConfig.cs @@ -23,14 +23,14 @@ namespace Confluent.SchemaRegistry.Serdes { /// - /// + /// /// configuration properties. /// public class ProtobufSerializerConfig : Config { /// - /// Configuration property names specific to - /// . + /// Configuration property names specific to + /// . /// public static class PropertyNames { @@ -49,7 +49,7 @@ public static class PropertyNames /// Specifies whether or not the Protobuf serializer should attempt to auto-register /// unrecognized schemas with Confluent Schema Registry. /// - /// default: true + /// default: /// public const string AutoRegisterSchemas = "protobuf.serializer.auto.register.schemas"; @@ -57,7 +57,7 @@ public static class PropertyNames /// Specifies whether to normalize schemas, which will transform schemas /// to have a consistent format, including ordering properties and references. /// - /// default: false + /// default: /// public const string NormalizeSchemas = "protobuf.serializer.normalize.schemas"; @@ -67,7 +67,7 @@ public static class PropertyNames /// WARNING: There is no check that the latest schema is backwards compatible /// with the schema of the object being serialized. /// - /// default: false + /// default: /// public const string UseLatestVersion = "protobuf.serializer.use.latest.version"; @@ -75,7 +75,7 @@ public static class PropertyNames /// Specifies whether or not the Protobuf serializer should skip known types /// when resolving dependencies. /// - /// default: false + /// default: /// public const string SkipKnownTypes = "protobuf.serializer.skip.known.types"; @@ -83,19 +83,23 @@ public static class PropertyNames /// Specifies whether the Protobuf serializer should serialize message indexes /// without zig-zag encoding. /// - /// default: false + /// default: /// public const string UseDeprecatedFormat = "protobuf.serializer.use.deprecated.format"; /// /// The subject name strategy to use for schema registration / lookup. - /// Possible values: + /// Possible values: + /// + /// default: /// public const string SubjectNameStrategy = "protobuf.serializer.subject.name.strategy"; /// /// The subject name strategy to use for registration / lookup of referenced schemas - /// Possible values: + /// Possible values: + /// + /// default: /// public const string ReferenceSubjectNameStrategy = "protobuf.serializer.reference.subject.name.strategy"; } @@ -113,15 +117,7 @@ public ProtobufSerializerConfig() { } public ProtobufSerializerConfig(IEnumerable> config) : base(config.ToDictionary(v => v.Key, v => v.Value)) { } - /// - /// Specifies the initial size (in bytes) of the buffer used for Protobuf message - /// serialization. Use a value high enough to avoid resizing the buffer, but - /// small enough to avoid excessive memory use. Inspect the size of the byte - /// array returned by the Serialize method to estimate an appropriate value. - /// Note: each call to serialize creates a new buffer. - /// - /// default: 1024 - /// + /// public int? BufferBytes { get { return GetInt(PropertyNames.BufferBytes); } @@ -129,25 +125,15 @@ public int? BufferBytes } - /// - /// Specifies whether or not the Protobuf serializer should attempt to auto-register - /// unrecognized schemas with Confluent Schema Registry. - /// - /// default: true - /// + /// public bool? AutoRegisterSchemas { get { return GetBool(PropertyNames.AutoRegisterSchemas); } set { SetObject(PropertyNames.AutoRegisterSchemas, value); } } - - - /// - /// Specifies whether to normalize schemas, which will transform schemas - /// to have a consistent format, including ordering properties and references. - /// - /// default: false - /// + + + /// public bool? NormalizeSchemas { get { return GetBool(PropertyNames.NormalizeSchemas); } @@ -155,52 +141,30 @@ public bool? NormalizeSchemas } - /// - /// Specifies whether or not the Protobuf serializer should use the latest subject - /// version for serialization. - /// WARNING: There is no check that the latest schema is backwards compatible - /// with the schema of the object being serialized. - /// - /// default: false - /// + /// public bool? UseLatestVersion { get { return GetBool(PropertyNames.UseLatestVersion); } set { SetObject(PropertyNames.UseLatestVersion, value); } } - - /// - /// Specifies whether or not the Protobuf serializer should skip known types - /// when resolving dependencies. - /// - /// default: false - /// + /// public bool? SkipKnownTypes { get { return GetBool(PropertyNames.SkipKnownTypes); } set { SetObject(PropertyNames.SkipKnownTypes, value); } } - - /// - /// Specifies whether the Protobuf serializer should serialize message indexes - /// without zig-zag encoding. - /// - /// default: false - /// + + /// public bool? UseDeprecatedFormat { get { return GetBool(PropertyNames.UseDeprecatedFormat); } set { SetObject(PropertyNames.UseDeprecatedFormat, value); } } - - /// - /// Subject name strategy. - /// - /// default: SubjectNameStrategy.Topic - /// + + /// public SubjectNameStrategy? SubjectNameStrategy { get @@ -209,8 +173,7 @@ public SubjectNameStrategy? SubjectNameStrategy if (r == null) { return null; } else { - SubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out SubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.SubjectNameStrategy} value: {r}."); else @@ -219,17 +182,13 @@ public SubjectNameStrategy? SubjectNameStrategy } set { - if (value == null) { this.properties.Remove(PropertyNames.SubjectNameStrategy); } - else { this.properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } + if (value == null) { properties.Remove(PropertyNames.SubjectNameStrategy); } + else { properties[PropertyNames.SubjectNameStrategy] = value.ToString(); } } } - /// - /// Reference subject name strategy. - /// - /// default: ReferenceSubjectNameStrategy.ReferenceName - /// + /// public ReferenceSubjectNameStrategy? ReferenceSubjectNameStrategy { get @@ -238,8 +197,7 @@ public ReferenceSubjectNameStrategy? ReferenceSubjectNameStrategy if (r == null) { return null; } else { - ReferenceSubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out ReferenceSubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.ReferenceSubjectNameStrategy} value: {r}."); else @@ -248,10 +206,9 @@ public ReferenceSubjectNameStrategy? ReferenceSubjectNameStrategy } set { - if (value == null) { this.properties.Remove(PropertyNames.ReferenceSubjectNameStrategy); } - else { this.properties[PropertyNames.ReferenceSubjectNameStrategy] = value.ToString(); } + if (value == null) { properties.Remove(PropertyNames.ReferenceSubjectNameStrategy); } + else { properties[PropertyNames.ReferenceSubjectNameStrategy] = value.ToString(); } } } - } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/Utils.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/Utils.cs index 843e7f105..484c155f8 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/Utils.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/Utils.cs @@ -25,7 +25,7 @@ internal static class Utils public static void WriteVarint(this Stream stream, uint value) { WriteUnsignedVarint(stream, (value << 1) ^ (value >> 31)); } - + /// /// Inspired by: https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L284 /// @@ -42,7 +42,7 @@ public static int ReadVarint(this Stream stream) { var value = ReadUnsignedVarint(stream); return (int)((value >> 1) ^ -(value & 1)); } - + /// /// Inspired by: https://github.com/apache/kafka/blob/2.5/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L142 /// diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index f757e113e..30955b2b5 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -33,22 +33,22 @@ namespace Confluent.SchemaRegistry /// A caching Schema Registry client. /// /// The following method calls cache results: - /// - - /// - - /// - - /// - - /// - - /// - + /// - + /// - + /// - + /// - + /// - + /// - /// /// The following method calls do NOT cache results: - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - /// public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable { @@ -103,7 +103,7 @@ private static SubjectNameStrategyDelegate GetKeySubjectNameStrategy(IEnumerable var keySubjectNameStrategyString = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy).Value ?? ""; SubjectNameStrategy keySubjectNameStrategy = SubjectNameStrategy.Topic; if (keySubjectNameStrategyString != "" && - !Enum.TryParse(keySubjectNameStrategyString, out keySubjectNameStrategy)) + !Enum.TryParse(keySubjectNameStrategyString, out keySubjectNameStrategy)) { throw new ArgumentException($"Unknown KeySubjectNameStrategy: {keySubjectNameStrategyString}"); } @@ -117,7 +117,7 @@ private static SubjectNameStrategyDelegate GetValueSubjectNameStrategy(IEnumerab var valueSubjectNameStrategyString = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy).Value ?? ""; SubjectNameStrategy valueSubjectNameStrategy = SubjectNameStrategy.Topic; if (valueSubjectNameStrategyString != "" && - !Enum.TryParse(valueSubjectNameStrategyString, out valueSubjectNameStrategy)) + !Enum.TryParse(valueSubjectNameStrategyString, out valueSubjectNameStrategy)) { throw new ArgumentException($"Unknown ValueSubjectNameStrategy: {valueSubjectNameStrategyString}"); } @@ -145,7 +145,7 @@ public CachedSchemaRegistryClient(IEnumerable> conf var schemaRegistryUrisMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl); if (schemaRegistryUrisMaybe.Value == null) { throw new ArgumentException($"{SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl} configuration property must be specified."); } - var schemaRegistryUris = (string)schemaRegistryUrisMaybe.Value; + var schemaRegistryUris = schemaRegistryUrisMaybe.Value; var timeoutMsMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs); int timeoutMs; @@ -250,7 +250,7 @@ public CachedSchemaRegistryClient(IEnumerable> conf try { sslVerify = sslVerificationMaybe.Value == null ? DefaultEnableSslCertificateVerification : bool.Parse(sslVerificationMaybe.Value); } catch (FormatException) { throw new ArgumentException($"Configured value for {SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification} must be a bool."); } - this.restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify); + restService = new RestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify); } /// @@ -269,10 +269,10 @@ public CachedSchemaRegistryClient(IEnumerable> conf /// /// This is to make sure memory doesn't explode in the case of incorrect usage. - /// - /// It's behavior is pretty extreme - remove everything and start again if the + /// + /// It's behavior is pretty extreme - remove everything and start again if the /// cache gets full. However, in practical situations this is not expected. - /// + /// /// TODO: Implement an LRU Cache here or something instead. /// private bool CleanCacheIfFull() @@ -280,9 +280,9 @@ private bool CleanCacheIfFull() if (schemaById.Count >= identityMapCapacity) { // TODO: maybe log something somehow if this happens. Maybe throwing an exception (fail fast) is better. - this.schemaById.Clear(); - this.idBySchemaBySubject.Clear(); - this.schemaByVersionBySubject.Clear(); + schemaById.Clear(); + idBySchemaBySubject.Clear(); + schemaByVersionBySubject.Clear(); return true; } @@ -301,13 +301,13 @@ private bool CleanCacheIfFull()             var certificateLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslKeystoreLocation).Value ?? "";             var certificatePassword = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslKeystorePassword).Value ?? ""; -            if (!String.IsNullOrEmpty(certificateLocation)) +            if (!string.IsNullOrEmpty(certificateLocation))             {                 certificates.Add(new X509Certificate2(certificateLocation, certificatePassword));             }             var caLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value ?? ""; -            if (!String.IsNullOrEmpty(caLocation)) +            if (!string.IsNullOrEmpty(caLocation))             {                 certificates.Add(new X509Certificate2(caLocation));             } @@ -319,7 +319,7 @@ private bool CleanCacheIfFull() public Task GetSchemaIdAsync(string subject, string avroSchema, bool normalize = false) => GetSchemaIdAsync(subject, new Schema(avroSchema, EmptyReferencesList, SchemaType.Avro), normalize); - + /// public async Task GetSchemaIdAsync(string subject, Schema schema, bool normalize = false) { @@ -392,12 +392,12 @@ public async Task RegisterSchemaAsync(string subject, Schema schema, bool n /// public Task RegisterSchemaAsync(string subject, string avroSchema, bool normalize = false) => RegisterSchemaAsync(subject, new Schema(avroSchema, EmptyReferencesList, SchemaType.Avro), normalize); - + /// /// Check if the given schema string matches a given format name. /// - private bool checkSchemaMatchesFormat(string format, string schemaString) + private bool CheckSchemaMatchesFormat(string format, string schemaString) { // if a format isn't specified, then assume text is desired. if (format == null) @@ -438,7 +438,7 @@ public async Task GetSchemaAsync(int id, string format = null) await cacheMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false); try { - if (!this.schemaById.TryGetValue(id, out Schema schema) || !checkSchemaMatchesFormat(format, schema.SchemaString)) + if (!this.schemaById.TryGetValue(id, out Schema schema) || !CheckSchemaMatchesFormat(format, schema.SchemaString)) { CleanCacheIfFull(); schema = (await restService.GetSchemaAsync(id, format).ConfigureAwait(continueOnCapturedContext: false)); @@ -538,7 +538,7 @@ public async Task UpdateCompatibilityAsync(Compatibility compatib /// - /// Releases unmanaged resources owned by this CachedSchemaRegistryClient instance. + /// Releases unmanaged resources owned by this instance. /// public void Dispose() { @@ -561,6 +561,5 @@ protected virtual void Dispose(bool disposing) restService.Dispose(); } } - } } diff --git a/src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj b/src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj index 924ffaa7b..f34821408 100644 --- a/src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj +++ b/src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj @@ -19,6 +19,7 @@ true true Confluent.SchemaRegistry.snk + 8.0 diff --git a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs index 952ea61b4..9b2fb1654 100644 --- a/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs @@ -33,7 +33,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Register an Avro schema or get the schema id if it's already + /// Register an Avro schema or get the schema id if it's already /// registered. /// /// @@ -53,7 +53,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Register a schema or get the schema id if it's already + /// Register a schema or get the schema id if it's already /// registered. /// /// @@ -71,7 +71,7 @@ public interface ISchemaRegistryClient : IDisposable Task RegisterSchemaAsync(string subject, Schema schema, bool normalize = false); /// - /// Get the unique id of the specified avro schema registered against + /// Get the unique id of the specified avro schema registered against /// the specified subject. /// /// @@ -94,7 +94,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Get the unique id of the specified schema registered against + /// Get the unique id of the specified schema registered against /// the specified subject. /// /// @@ -223,7 +223,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Check if an avro schema is compatible with latest version registered against a + /// Check if an avro schema is compatible with latest version registered against a /// specified subject. /// /// @@ -233,7 +233,7 @@ public interface ISchemaRegistryClient : IDisposable /// The schema to check. /// /// - /// true if is compatible with the latest version + /// true if is compatible with the latest version /// registered against a specified subject, false otherwise. /// [Obsolete("Superseded by IsCompatibleAsync(string, Schema)")] @@ -241,7 +241,7 @@ public interface ISchemaRegistryClient : IDisposable /// - /// Check if a schema is compatible with latest version registered against a + /// Check if a schema is compatible with latest version registered against a /// specified subject. /// /// @@ -251,7 +251,7 @@ public interface ISchemaRegistryClient : IDisposable /// The schema to check. /// /// - /// true if is compatible with the latest version + /// true if is compatible with the latest version /// registered against a specified subject, false otherwise. /// Task IsCompatibleAsync(string subject, Schema schema); diff --git a/src/Confluent.SchemaRegistry/ReferenceSubjectNameStrategy.cs b/src/Confluent.SchemaRegistry/ReferenceSubjectNameStrategy.cs index 492f1c11d..5fa1ff82e 100644 --- a/src/Confluent.SchemaRegistry/ReferenceSubjectNameStrategy.cs +++ b/src/Confluent.SchemaRegistry/ReferenceSubjectNameStrategy.cs @@ -42,7 +42,7 @@ public enum ReferenceSubjectNameStrategy /// ReferenceName } - + /// /// Extension methods for the ReferenceSubjectNameStrategy type. diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs index 89d062e76..375fe0516 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Compatibility.cs @@ -51,7 +51,7 @@ public enum Compatibility /// [EnumMember(Value = "FULL")] Full, - + /// /// Forward transitive schema compatibility. /// diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Config.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Config.cs index 0ab4f2d2b..749a86242 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Config.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Config.cs @@ -24,15 +24,15 @@ internal class Config { [DataMember(Name = "compatibility")] public Compatibility CompatibilityLevel { get; } - + public Config(Compatibility compatibilityLevel) { CompatibilityLevel = compatibilityLevel; } - public override string ToString() + public override string ToString() => $"{{compatibility={CompatibilityLevel}}}"; - + public override bool Equals(object obj) { if (obj == null || GetType() != obj.GetType()) @@ -42,7 +42,7 @@ public override bool Equals(object obj) return CompatibilityLevel == ((Config)obj).CompatibilityLevel; } - + public override int GetHashCode() => 31 * CompatibilityLevel.GetHashCode(); } diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/ErrorMessage.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/ErrorMessage.cs index 6836bf267..4dd00fbfa 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/ErrorMessage.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/ErrorMessage.cs @@ -36,7 +36,7 @@ public ErrorMessage(int errorCode, string message) ErrorCode = errorCode; Message = message; } - + public override string ToString() => $"{{error_code={ErrorCode}, message={Message}}}"; } diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs index 97c34a496..6412f9abe 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/RegisteredSchema.cs @@ -145,7 +145,7 @@ public override int GetHashCode() } /// - /// Compares this instance with a specified RegisteredSchema object and indicates whether this + /// Compares this instance with a specified RegisteredSchema object and indicates whether this /// instance precedes, follows, or appears in the same position in the sort order as /// the specified schema. /// @@ -154,9 +154,9 @@ public override int GetHashCode() /// /// /// A 32-bit signed integer that indicates whether this instance precedes, follows, or - /// appears in the same position in the sort order as the other parameter. Less than + /// appears in the same position in the sort order as the other parameter. Less than /// zero: this instance precedes other. Zero: this instance has the same position in - /// the sort order as other. Greater than zero: This instance follows other OR other + /// the sort order as other. Greater than zero: This instance follows other OR other /// is null. /// public int CompareTo(RegisteredSchema other) @@ -170,18 +170,18 @@ public int CompareTo(RegisteredSchema other) return result; // If the schema strings + version are equal and any of the other properties are not, - // then this is a logical error. Assume that this prevented/handled elsewhere. + // then this is a logical error. Assume that this prevented/handled elsewhere. } /// - /// Determines whether this instance and a specified object, which must also be a Schema + /// Determines whether this instance and a specified object, which must also be a Schema /// object, have the same value (Overrides Object.Equals(Object)) /// /// /// The Schema to compare to this instance. /// /// - /// true if obj is a Schema and its value is the same as this instance; otherwise, false. + /// true if obj is a Schema and its value is the same as this instance; otherwise, false. /// If obj is null, the method returns false. /// public override bool Equals(object obj) @@ -202,7 +202,7 @@ public override bool Equals(object obj) /// The schema to compare to this instance. /// /// - /// true if the value of the other parameter is the same as the value of this instance; + /// true if the value of the other parameter is the same as the value of this instance; /// otherwise, false. If other is null, the method returns false. /// public bool Equals(RegisteredSchema other) diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs index 9d284ef32..ac905029f 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/Schema.cs @@ -119,23 +119,23 @@ public SchemaType SchemaType { get { - switch (SchemaType_String) + return SchemaType_String switch { - case "AVRO": return SchemaType.Avro; - case "PROTOBUF": return SchemaType.Protobuf; - case "JSON": return SchemaType.Json; - } - throw new InvalidOperationException($"Invalid program state: Unknown schema type {SchemaType_String}"); + "AVRO" => SchemaType.Avro, + "PROTOBUF" => SchemaType.Protobuf, + "JSON" => SchemaType.Json, + _ => throw new InvalidOperationException($"Invalid program state: Unknown schema type {SchemaType_String}") + }; } set { - switch (value) + SchemaType_String = value switch { - case SchemaType.Avro: SchemaType_String = "AVRO"; break; - case SchemaType.Protobuf: SchemaType_String = "PROTOBUF"; break; - case SchemaType.Json: SchemaType_String = "JSON"; break; - default: throw new InvalidOperationException($"Invalid program state: Unknown schema type {SchemaType_String}"); - } + SchemaType.Avro => "AVRO", + SchemaType.Protobuf => "PROTOBUF", + SchemaType.Json => "JSON", + _ => throw new InvalidOperationException($"Invalid program state: Unknown schema type {SchemaType_String}"), + }; } } @@ -209,7 +209,7 @@ public override bool Equals(object obj) /// The instance to compare to this instance. /// /// - /// true if the value of the other parameter is the same as the value of this instance; + /// true if the value of the other parameter is the same as the value of this instance; /// otherwise, false. If other is null, the method returns false. /// public bool Equals(Schema other) @@ -241,9 +241,9 @@ public override int GetHashCode() /// /// /// A 32-bit signed integer that indicates whether this instance precedes, follows, or - /// appears in the same position in the sort order as the other parameter. Less than + /// appears in the same position in the sort order as the other parameter. Less than /// zero: this instance precedes other. Zero: this instance has the same position in - /// the sort order as other. Greater than zero: This instance follows other OR other + /// the sort order as other. Greater than zero: This instance follows other OR other /// is null. /// /// diff --git a/src/Confluent.SchemaRegistry/Rest/DataContracts/SchemaReference.cs b/src/Confluent.SchemaRegistry/Rest/DataContracts/SchemaReference.cs index a256a9567..e946a32da 100644 --- a/src/Confluent.SchemaRegistry/Rest/DataContracts/SchemaReference.cs +++ b/src/Confluent.SchemaRegistry/Rest/DataContracts/SchemaReference.cs @@ -111,9 +111,9 @@ public override int GetHashCode() /// /// /// A 32-bit signed integer that indicates whether this instance precedes, follows, or - /// appears in the same position in the sort order as the other parameter. Less than + /// appears in the same position in the sort order as the other parameter. Less than /// zero: this instance precedes other. Zero: this instance has the same position in - /// the sort order as other. Greater than zero: This instance follows other OR other + /// the sort order as other. Greater than zero: This instance follows other OR other /// is null. /// public int CompareTo(SchemaReference other) @@ -163,7 +163,7 @@ public override bool Equals(object obj) /// The schema reference to compare to this instance. /// /// - /// true if the value of the other parameter is the same as the value of this instance; + /// true if the value of the other parameter is the same as the value of this instance; /// otherwise, false. If other is null, the method returns false. /// public bool Equals(SchemaReference other) diff --git a/src/Confluent.SchemaRegistry/Rest/IRestService.cs b/src/Confluent.SchemaRegistry/Rest/IRestService.cs index 7df234b2c..617a390e0 100644 --- a/src/Confluent.SchemaRegistry/Rest/IRestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/IRestService.cs @@ -23,7 +23,7 @@ namespace Confluent.SchemaRegistry { /// /// It may be useful to expose this publicly, but this is not - /// required by the serializers, so we will keep this internal + /// required by the serializers, so we will keep this internal /// for now to minimize documentation / risk of API change etc. /// internal interface IRestService : IDisposable diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 9f7ec21ae..5fdd8314a 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -21,9 +21,8 @@ using System.Linq; using System.Net; using System.Net.Http; -using System.Text; +using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; namespace Confluent.SchemaRegistry { @@ -61,9 +60,9 @@ internal class RestService : IRestService { this.authenticationHeaderValueProvider = authenticationHeaderValueProvider; - this.clients = schemaRegistryUrl + clients = schemaRegistryUrl .Split(',') - .Select(SanitizeUri)// need http or https - use http if not present. + .Select(SanitizeUri) // need http or https - use http if not present. .Select(uri => { HttpClient client; @@ -102,30 +101,24 @@ private static string SanitizeUri(string uri) private RegisteredSchema SanitizeRegisteredSchema(RegisteredSchema schema) { - if (schema.References == null) - { - // The JSON responses from Schema Registry does not include - // a references list if there are no references, which means - // schema.References will be null here in that case. It's - // semantically better if this is an empty list however, so - // expose that. - schema.References = EmptyReferencesList; - } + // The JSON responses from Schema Registry does not include + // a references list if there are no references, which means + // schema.References will be null here in that case. It's + // semantically better if this is an empty list however, so + // expose that. + schema.References ??= EmptyReferencesList; return schema; } private Schema SanitizeSchema(Schema schema) { - if (schema.References == null) - { - // The JSON response from Schema Registry does not include - // a references list if there are no references, which means - // schema.References will be null here in that case. It's - // semantically better if this is an empty list however, so - // expose that. - schema.References = EmptyReferencesList; - } + // The JSON response from Schema Registry does not include + // a references list if there are no references, which means + // schema.References will be null here in that case. It's + // semantically better if this is an empty list however, so + // expose that. + schema.References ??= EmptyReferencesList; if (schema.SchemaType_String == null) { @@ -144,10 +137,10 @@ private async Task ExecuteOnOneInstanceAsync(Func ExecuteOnOneInstanceAsync(Func ExecuteOnOneInstanceAsync(Func> RequestListOfAsync(string endPoint, HttpMethod me private HttpRequestMessage CreateRequest(string endPoint, HttpMethod method, params object[] jsonBody) { - HttpRequestMessage request = new HttpRequestMessage(method, endPoint); + var request = new HttpRequestMessage(method, endPoint); request.Headers.Add("Accept", acceptHeader); if (jsonBody.Length != 0) { @@ -319,7 +312,7 @@ public async Task GetLatestSchemaAsync(string subject) public async Task RegisterSchemaAsync(string subject, Schema schema, bool normalize) => schema.SchemaType == SchemaType.Avro - // In the avro case, just send the schema string to maintain backards compatibility. + // In the avro case, just send the schema string to maintain backwards compatibility. ? (await RequestAsync($"subjects/{WebUtility.UrlEncode(subject)}/versions?normalize={normalize}", HttpMethod.Post, new SchemaString(schema.SchemaString)) .ConfigureAwait(continueOnCapturedContext: false)).Id : (await RequestAsync($"subjects/{WebUtility.UrlEncode(subject)}/versions?normalize={normalize}", HttpMethod.Post, schema) @@ -328,7 +321,7 @@ public async Task RegisterSchemaAsync(string subject, Schema schema, bool n // Checks whether a schema has been registered under a given subject. public async Task LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas, bool normalize) => SanitizeRegisteredSchema(schema.SchemaType == SchemaType.Avro - // In the avro case, just send the schema string to maintain backards compatibility. + // In the avro case, just send the schema string to maintain backwards compatibility. ? await RequestAsync($"subjects/{WebUtility.UrlEncode(subject)}?normalize={normalize}&deleted={!ignoreDeletedSchemas}", HttpMethod.Post, new SchemaString(schema.SchemaString)) .ConfigureAwait(continueOnCapturedContext: false) : await RequestAsync($"subjects/{WebUtility.UrlEncode(subject)}?normalize={normalize}&deleted={!ignoreDeletedSchemas}", HttpMethod.Post, schema) @@ -340,7 +333,7 @@ public async Task LookupSchemaAsync(string subject, Schema sch public async Task TestCompatibilityAsync(string subject, int versionId, Schema schema) => schema.SchemaType == SchemaType.Avro - // In the avro case, just send the schema string to maintain backards compatibility. + // In the avro case, just send the schema string to maintain backwards compatibility. ? (await RequestAsync($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/{versionId}", HttpMethod.Post, new SchemaString(schema.SchemaString)) .ConfigureAwait(continueOnCapturedContext: false)).IsCompatible : (await RequestAsync($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/{versionId}", HttpMethod.Post, schema) @@ -349,13 +342,13 @@ public async Task TestCompatibilityAsync(string subject, int versionId, Sc public async Task TestLatestCompatibilityAsync(string subject, Schema schema) => schema.SchemaType == SchemaType.Avro - // In the avro case, just send the schema string to maintain backards compatibility. + // In the avro case, just send the schema string to maintain backwards compatibility. ? (await RequestAsync($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, new SchemaString(schema.SchemaString)) .ConfigureAwait(continueOnCapturedContext: false)).IsCompatible : (await RequestAsync($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, schema) .ConfigureAwait(continueOnCapturedContext: false)).IsCompatible; - #endregion Compatibility + #endregion Compatibility #region Config @@ -383,7 +376,7 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - foreach (var client in this.clients) + foreach (var client in clients) { client.Dispose(); } diff --git a/src/Confluent.SchemaRegistry/Rest/Versions.cs b/src/Confluent.SchemaRegistry/Rest/Versions.cs index 930cbcec7..65ef206fe 100644 --- a/src/Confluent.SchemaRegistry/Rest/Versions.cs +++ b/src/Confluent.SchemaRegistry/Rest/Versions.cs @@ -24,20 +24,12 @@ internal static class Versions public const string SchemaRegistry_V1_JSON = "application/vnd.schemaregistry.v1+json"; public const string SchemaRegistry_Default_JSON = "application/vnd.schemaregistry+json"; public const string JSON = "application/json"; - - public static readonly IReadOnlyList PreferredResponseTypes = new List - { - SchemaRegistry_V1_JSON, - SchemaRegistry_Default_JSON, - JSON - }; - /// - /// This type is completely generic and carries no actual information about the type of data, but - /// it is the default for request entities if no content type is specified. Well behaving users - /// of the API will always specify the content type, but ad hoc use may omit it. We treat this as - /// JSON since that's all we currently support. - /// - public const string GenericRequest = "application/octet-stream"; + public static readonly IReadOnlyList PreferredResponseTypes = new List + { + SchemaRegistry_V1_JSON, + SchemaRegistry_Default_JSON, + JSON + }; } } diff --git a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs index 52370d77d..8456c66d6 100644 --- a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs +++ b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs @@ -57,14 +57,14 @@ public static class PropertyNames /// USER_INFO: Credentials are specified via the `schema.registry.basic.auth.user.info` config property in the form username:password. /// If `schema.registry.basic.auth.user.info` is not set, authentication is disabled. /// SASL_INHERIT: Credentials are specified via the `sasl.username` and `sasl.password` configuration properties. - /// + /// /// default: USER_INFO /// public const string SchemaRegistryBasicAuthCredentialsSource = "schema.registry.basic.auth.credentials.source"; /// /// Basic auth credentials in the form {username}:{password}. - /// + /// /// default: "" (no authentication). /// public const string SchemaRegistryBasicAuthUserInfo = "schema.registry.basic.auth.user.info"; @@ -134,20 +134,20 @@ public AuthCredentialsSource? BasicAuthCredentialsSource /// public string Url { - get { return Get(SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl); } - set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl, value); } + get { return Get(PropertyNames.SchemaRegistryUrl); } + set { SetObject(PropertyNames.SchemaRegistryUrl, value); } } /// /// Specifies the timeout for requests to Confluent Schema Registry. - /// + /// /// default: 30000 /// public int? RequestTimeoutMs { - get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs); } - set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs, value.ToString()); } + get { return GetInt(PropertyNames.SchemaRegistryRequestTimeoutMs); } + set { SetObject(PropertyNames.SchemaRegistryRequestTimeoutMs, value.ToString()); } } ///  @@ -158,8 +158,8 @@ public int? RequestTimeoutMs         ///          public string SslCaLocation         { -            get { return Get(SchemaRegistryConfig.PropertyNames.SslCaLocation); } -            set { SetObject(SchemaRegistryConfig.PropertyNames.SslCaLocation, value.ToString()); } +            get { return Get(PropertyNames.SslCaLocation); } +            set { SetObject(PropertyNames.SslCaLocation, value.ToString()); }         }         ///  @@ -170,8 +170,8 @@ public int? RequestTimeoutMs         ///          public string SslKeystoreLocation         { -            get { return Get(SchemaRegistryConfig.PropertyNames.SslKeystoreLocation); } -            set { SetObject(SchemaRegistryConfig.PropertyNames.SslKeystoreLocation, value.ToString()); } +            get { return Get(PropertyNames.SslKeystoreLocation); } +            set { SetObject(PropertyNames.SslKeystoreLocation, value.ToString()); }         }         ///  @@ -182,8 +182,8 @@ public int? RequestTimeoutMs         ///          public string SslKeystorePassword         { -            get { return Get(SchemaRegistryConfig.PropertyNames.SslKeystorePassword); } -            set { SetObject(SchemaRegistryConfig.PropertyNames.SslKeystorePassword, value.ToString()); } +            get { return Get(PropertyNames.SslKeystorePassword); } +            set { SetObject(PropertyNames.SslKeystorePassword, value.ToString()); }         }         ///  @@ -194,20 +194,20 @@ public int? RequestTimeoutMs         ///          public bool? EnableSslCertificateVerification         { -            get { return GetBool(SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification); } -            set { SetObject(SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification, value); } +            get { return GetBool(PropertyNames.EnableSslCertificateVerification); } +            set { SetObject(PropertyNames.EnableSslCertificateVerification, value); }         } /// /// Specifies the maximum number of schemas CachedSchemaRegistryClient /// should cache locally. - /// + /// /// default: 1000 /// public int? MaxCachedSchemas { - get { return GetInt(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas); } - set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas, value.ToString()); } + get { return GetInt(PropertyNames.SchemaRegistryMaxCachedSchemas); } + set { SetObject(PropertyNames.SchemaRegistryMaxCachedSchemas, value.ToString()); } } @@ -216,14 +216,14 @@ public int? MaxCachedSchemas /// public string BasicAuthUserInfo { - get { return Get(SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo); } - set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo, value); } + get { return Get(PropertyNames.SchemaRegistryBasicAuthUserInfo); } + set { SetObject(PropertyNames.SchemaRegistryBasicAuthUserInfo, value); } } /// /// Key subject name strategy. - /// + /// /// default: SubjectNameStrategy.Topic /// [Obsolete("Subject name strategies should now be configured using the serializer's configuration. In the future, this configuration property will be removed from SchemaRegistryConfig")] @@ -235,8 +235,7 @@ public SubjectNameStrategy? KeySubjectNameStrategy if (r == null) { return null; } else { - SubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out SubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.SchemaRegistryKeySubjectNameStrategy} value: {r}."); else @@ -265,8 +264,7 @@ public SubjectNameStrategy? ValueSubjectNameStrategy if (r == null) { return null; } else { - SubjectNameStrategy result; - if (!Enum.TryParse(r, out result)) + if (!Enum.TryParse(r, out SubjectNameStrategy result)) throw new ArgumentException( $"Unknown ${PropertyNames.SchemaRegistryValueSubjectNameStrategy} value: {r}."); else @@ -308,7 +306,7 @@ protected void SetObject(string name, object val) } /// - /// Gets a configuration property value given a key. Returns null if + /// Gets a configuration property value given a key. Returns null if /// the property has not been set. /// /// @@ -325,7 +323,7 @@ public string Get(string key) } return null; } - + /// /// Gets a configuration property int? value given a key. /// @@ -357,7 +355,7 @@ public string Get(string key) if (result == null) { return null; } return bool.Parse(result); } - + /// /// The configuration properties. /// diff --git a/src/Confluent.SchemaRegistry/SchemaRegistryException.cs b/src/Confluent.SchemaRegistry/SchemaRegistryException.cs index cd0d4239d..308af8f2a 100644 --- a/src/Confluent.SchemaRegistry/SchemaRegistryException.cs +++ b/src/Confluent.SchemaRegistry/SchemaRegistryException.cs @@ -14,7 +14,6 @@ // // Refer to LICENSE for more information. -using System; using System.Net; using System.Net.Http; @@ -28,7 +27,7 @@ public class SchemaRegistryException : HttpRequestException { /// /// An error code specific to Schema Registry of the form XXX or XXXYY. - /// where XXX is standard http error status (400-500) and YY specific to schema registry + /// where XXX is standard HTTP error status (400-500) and YY specific to schema registry /// Example: 40403 = Schema not found /// public int ErrorCode { get; } @@ -39,7 +38,7 @@ public class SchemaRegistryException : HttpRequestException public HttpStatusCode Status { get; } /// - /// Initialize a new instance of SchemaRegistryException. + /// Initialize a new instance of . /// /// /// Additional information about the error. diff --git a/src/Confluent.SchemaRegistry/SubjectNameStrategy.cs b/src/Confluent.SchemaRegistry/SubjectNameStrategy.cs index 755f7b1b5..08daeb777 100644 --- a/src/Confluent.SchemaRegistry/SubjectNameStrategy.cs +++ b/src/Confluent.SchemaRegistry/SubjectNameStrategy.cs @@ -62,7 +62,7 @@ public enum SubjectNameStrategy /// - /// Extension methods for the SubjectNameStrategy type. + /// Extension methods for the type. /// public static class SubjectNameStrategyExtensions { diff --git a/test/Confluent.Kafka.SyncOverAsync/Program.cs b/test/Confluent.Kafka.SyncOverAsync/Program.cs index 7898afe31..e16c2c444 100644 --- a/test/Confluent.Kafka.SyncOverAsync/Program.cs +++ b/test/Confluent.Kafka.SyncOverAsync/Program.cs @@ -18,7 +18,6 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka; // This program is included as an educational tool to allow you to @@ -58,7 +57,7 @@ class Program { static void Main(string[] args) { - ThreadPool.GetMinThreads(out int workerThreads, out int completionPortThreads); + ThreadPool.GetMinThreads(out int workerThreads, out int completionPortThreads); ThreadPool.SetMaxThreads(workerThreads, completionPortThreads); ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads); Console.WriteLine($"ThreadPool workerThreads: {workerThreads}, completionPortThreads: {completionPortThreads}"); @@ -67,7 +66,7 @@ static void Main(string[] args) { BootstrapServers = args[0] }; - + using (var producer = new ProducerBuilder(pConfig) .SetValueSerializer(new SimpleAsyncSerializer().SyncOverAsync()) // may deadlock due to thread pool exhaustion. // .SetValueSerializer(new SimpleSyncSerializer()) // will never deadlock. @@ -88,7 +87,7 @@ static void Main(string[] args) Console.WriteLine($"running task {taskNumber}"); object waitObj = new object(); - Action> handler = dr => + Action> handler = dr => { // in a deadlock scenario, the delivery handler will // never execute since execution of the Produce