From 36a09f09c909ef18283a356013b05b9a3be094b3 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 24 Jan 2024 10:26:03 +0100 Subject: [PATCH 1/6] Works for Spark 3.5.0 --- .../examples/ConfluentKafkaAvroWriter.scala | 4 +- .../examples/utils/CompatibleRowEncoder.scala | 42 +++++++++++++++++++ .../avro/sql/CatalystAvroConversionSpec.scala | 2 +- .../abris/avro/utils/AvroSchemaEncoder.scala | 4 +- 4 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala diff --git a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala index 7f3387d9..2ed75118 100644 --- a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala +++ b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala @@ -16,13 +16,13 @@ package za.co.absa.abris.examples -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession} import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.config.AbrisConfig import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator +import za.co.absa.abris.examples.utils.CompatibleRowEncoder object ConfluentKafkaAvroWriter { @@ -85,6 +85,6 @@ object ConfluentKafkaAvroWriter { private def getEncoder: Encoder[Row] = { val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) - RowEncoder.apply(sparkSchema) + CompatibleRowEncoder.apply(sparkSchema) } } diff --git a/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala b/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala new file mode 100644 index 00000000..f28a14eb --- /dev/null +++ b/src/main/scala/za/co/absa/abris/examples/utils/CompatibleRowEncoder.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.examples.utils + +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.types.StructType + +import scala.util.Try + +object CompatibleRowEncoder { + def apply(schema: StructType): Encoder[Row] = { + // Spark < 3.5.0 + val rowEncoderTry = Try { + val rowEncoderClass = Class.forName("org.apache.spark.sql.catalyst.encoders.RowEncoder") + val applyMethod = rowEncoderClass.getMethod("apply", classOf[StructType]) + applyMethod.invoke(null, schema).asInstanceOf[Encoder[Row]] + } + + // Spark >= 3.5.0 + rowEncoderTry.orElse(Try { + val encodersClass = Class.forName("org.apache.spark.sql.Encoders") + val rowMethod = encodersClass.getMethod("row", classOf[StructType]) + rowMethod.invoke(null, schema).asInstanceOf[Encoder[Row]] + }).getOrElse { + throw new IllegalStateException("Neither RowEncoder.apply nor Encoders.row is available in the Spark version.") + } + } +} diff --git a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala index 7fa8aa7b..1f32af7a 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala @@ -60,7 +60,7 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn SchemaManagerFactory.addSRClientInstance(schemaRegistryConfig, mockedSchemaRegistryClient) } - val bareByteSchema = """{"type": "bytes"}"""" + val bareByteSchema = """{"type": "bytes"}""" it should "convert one type with bare schema to avro an back" in { diff --git a/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala index 5c5e3afc..92cf427f 100644 --- a/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala +++ b/src/test/scala/za/co/absa/abris/avro/utils/AvroSchemaEncoder.scala @@ -17,17 +17,17 @@ package za.co.absa.abris.avro.utils import org.apache.spark.sql.{Encoder, Row} -import org.apache.spark.sql.catalyst.encoders.RowEncoder import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator +import za.co.absa.abris.examples.utils.CompatibleRowEncoder class AvroSchemaEncoder { def getEncoder: Encoder[Row] = { val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema) val sparkSchema = SparkAvroConversions.toSqlType(avroSchema) - RowEncoder.apply(sparkSchema) + CompatibleRowEncoder.apply(sparkSchema) } } From b9bd2ae3a180a622adcf393c4b204c9921cd136f Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 24 Jan 2024 11:05:49 +0100 Subject: [PATCH 2/6] Update GH action to run for latest spark versions --- .github/workflows/test-and-verify.yml | 2 +- pom.xml | 30 +++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test-and-verify.yml b/.github/workflows/test-and-verify.yml index 2388b73e..190368b4 100644 --- a/.github/workflows/test-and-verify.yml +++ b/.github/workflows/test-and-verify.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: - spark: [ 3.2 ] + spark: [ 3.2, 3.3, 3.4, 3.5 ] scala: [ 2.12, 2.13 ] name: Spark ${{ matrix.spark }}, Scala ${{ matrix.scala }} diff --git a/pom.xml b/pom.xml index 71ffb5bf..4bc5f703 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,10 @@ 3.2.1 - 3.2.1 + 3.2.4 + 3.3.4 + 3.4.2 + 3.5.0 @@ -367,7 +370,30 @@ 6.2.1 - + + spark-3.3 + + ${spark-33.version} + 1.11.0 + 6.2.1 + + + + spark-3.4 + + ${spark-34.version} + 1.11.1 + 6.2.1 + + + + spark-3.5 + + ${spark-35.version} + 1.11.2 + 6.2.1 + + uber From 693e39640571b95528137dcbabfaddb6cd6e9dcf Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 24 Jan 2024 11:37:22 +0100 Subject: [PATCH 3/6] Let Avro dependency be managed by Spark --- pom.xml | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pom.xml b/pom.xml index 4bc5f703..bc13fab1 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,6 @@ 6.2.1 - 1.10.2 0.8.10 @@ -162,14 +161,6 @@ ${spark.avro.version} - - - org.apache.avro - avro - ${avro.version} - provided - - org.apache.spark @@ -366,7 +357,6 @@ spark-3.2 ${spark-32.version} - 1.10.2 6.2.1 @@ -374,7 +364,6 @@ spark-3.3 ${spark-33.version} - 1.11.0 6.2.1 @@ -382,7 +371,6 @@ spark-3.4 ${spark-34.version} - 1.11.1 6.2.1 @@ -390,7 +378,6 @@ spark-3.5 ${spark-35.version} - 1.11.2 6.2.1 From d98ff5354bbeb5b19ebd445055cfba6753a3c468 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 25 Jan 2024 15:27:10 +0100 Subject: [PATCH 4/6] Exclude examples from code coverage --- pom.xml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index bc13fab1..bd9dfacc 100644 --- a/pom.xml +++ b/pom.xml @@ -576,11 +576,9 @@ jacoco-maven-plugin ${jacoco.version} - - + + za/co/absa/abris/examples/** + From 0803dc2da36157b124080c2969cb02b720d9af2b Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 25 Jan 2024 17:35:50 +0100 Subject: [PATCH 5/6] Explicitly set avro and jackson versions --- pom.xml | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index bd9dfacc..8b8a8c7c 100644 --- a/pom.xml +++ b/pom.xml @@ -48,10 +48,9 @@ 3.2.1 - 3.2.4 - 3.3.4 - 3.4.2 - 3.5.0 + 3.2.4 + 1.10.2 + 2.12.3 @@ -66,7 +65,10 @@ 2.12 - ${spark-32.version} + ${spark.default.version} + ${avro.default.version} + ${jackson.core.default.version} + ${spark.version} 0-10 6.2.1 @@ -121,6 +123,20 @@ + + + + org.apache.avro + avro + ${avro.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.core.version} + + + @@ -356,29 +372,34 @@ spark-3.2 - ${spark-32.version} - 6.2.1 + ${spark.default.version} + ${avro.default.version} + ${jackson.core.default.version} + spark-3.3 - ${spark-33.version} - 6.2.1 + 3.3.4 + 1.11.0 + 2.13.4.2 spark-3.4 - ${spark-34.version} - 6.2.1 + 3.4.2 + 1.11.1 + 2.14.2 spark-3.5 - ${spark-35.version} - 6.2.1 + 3.5.0 + 1.11.2 + 2.15.2 From 5a383d1987f0d29fe32093ffa9b64b341905c248 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Sat, 27 Jan 2024 12:46:32 +0100 Subject: [PATCH 6/6] Fix version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8b8a8c7c..f89d0d6c 100644 --- a/pom.xml +++ b/pom.xml @@ -383,7 +383,7 @@ 3.3.4 1.11.0 - 2.13.4.2 + 2.13.4