diff --git a/.github/workflows/test-and-verify.yml b/.github/workflows/test-and-verify.yml index 2388b73e..190368b4 100644 --- a/.github/workflows/test-and-verify.yml +++ b/.github/workflows/test-and-verify.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: - spark: [ 3.2 ] + spark: [ 3.2, 3.3, 3.4, 3.5 ] scala: [ 2.12, 2.13 ] name: Spark ${{ matrix.spark }}, Scala ${{ matrix.scala }} diff --git a/pom.xml b/pom.xml index 71ffb5bf..f89d0d6c 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,9 @@ 3.2.1 - 3.2.1 + 3.2.4 + 1.10.2 + 2.12.3 @@ -63,13 +65,15 @@ 2.12 - ${spark-32.version} + ${spark.default.version} + ${avro.default.version} + ${jackson.core.default.version} + ${spark.version} 0-10 6.2.1 - 1.10.2 0.8.10 @@ -119,6 +123,20 @@ + + + + org.apache.avro + avro + ${avro.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.core.version} + + + @@ -159,14 +177,6 @@ ${spark.avro.version} - - - org.apache.avro - avro - ${avro.version} - provided - - org.apache.spark @@ -362,12 +372,36 @@ spark-3.2 - ${spark-32.version} - 1.10.2 - 6.2.1 + ${spark.default.version} + ${avro.default.version} + ${jackson.core.default.version} - + + + spark-3.3 + + 3.3.4 + 1.11.0 + 2.13.4 + + + + spark-3.4 + + 3.4.2 + 1.11.1 + 2.14.2 + + + + spark-3.5 + + 3.5.0 + 1.11.2 + 2.15.2 + + uber @@ -563,11 +597,9 @@ jacoco-maven-plugin ${jacoco.version} - - + + za/co/absa/abris/examples/** + diff --git a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala index 7f3387d9..2ed75118 100644 --- a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala +++ b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala @@ -16,13 +16,13 @@ package za.co.absa.abris.examples -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession} import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.config.AbrisConfig import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator +import za.co.absa.abris.examples.utils.CompatibleRowEncoder object ConfluentKafkaAvroWriter { @@ -85,6 +85,6 @@ object ConfluentKafkaAvroWriter { private def getEncoder: Encoder[Row] = { val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) - RowEncoder.apply(sparkSchema) + CompatibleRowEncoder.apply(sparkSchema) } } diff --git a/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala b/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala new file mode 100644 index 00000000..f28a14eb --- /dev/null +++ b/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.examples.utils + +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.types.StructType + +import scala.util.Try + +object CompatibleRowEncoder { + def apply(schema: StructType): Encoder[Row] = { + // Spark < 3.5.0 + val rowEncoderTry = Try { + val rowEncoderClass = Class.forName("org.apache.spark.sql.catalyst.encoders.RowEncoder") + val applyMethod = rowEncoderClass.getMethod("apply", classOf[StructType]) + applyMethod.invoke(null, schema).asInstanceOf[Encoder[Row]] + } + + // Spark >= 3.5.0 + rowEncoderTry.orElse(Try { + val encodersClass = Class.forName("org.apache.spark.sql.Encoders") + val rowMethod = encodersClass.getMethod("row", classOf[StructType]) + rowMethod.invoke(null, schema).asInstanceOf[Encoder[Row]] + }).getOrElse { + throw new IllegalStateException("Neither RowEncoder.apply nor Encoders.row is available in the Spark version.") + } + } +} diff --git a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala index 7fa8aa7b..1f32af7a 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala @@ -60,7 +60,7 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn SchemaManagerFactory.addSRClientInstance(schemaRegistryConfig, mockedSchemaRegistryClient) } - val bareByteSchema = """{"type": "bytes"}"""" + val bareByteSchema = """{"type": "bytes"}""" it should "convert one type with bare schema to avro an back" in { diff --git a/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala index 5c5e3afc..92cf427f 100644 --- a/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala +++ b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala @@ -17,17 +17,17 @@ package za.co.absa.abris.avro.utils import org.apache.spark.sql.{Encoder, Row} -import org.apache.spark.sql.catalyst.encoders.RowEncoder import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator +import za.co.absa.abris.examples.utils.CompatibleRowEncoder class AvroSchemaEncoder { def getEncoder: Encoder[Row] = { val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) - RowEncoder.apply(sparkSchema) + CompatibleRowEncoder.apply(sparkSchema) } }