Skip to content

Commit

Permalink
Feature/350 fix tests (#351)
Browse files Browse the repository at this point in the history
* Works for Spark 3.5.0

* Update GH action to run for latest spark versions

* Let Avro dependency be managed by Spark

* Exclude examples from code coverage

* Explicitly set avro and jackson versions

* Fix version
  • Loading branch information
kevinwallimann authored Jan 27, 2024
1 parent ec6772b commit fda9b4f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-and-verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
spark: [ 3.2 ]
spark: [ 3.2, 3.3, 3.4, 3.5 ]
scala: [ 2.12, 2.13 ]

name: Spark ${{ matrix.spark }}, Scala ${{ matrix.scala }}
Expand Down
72 changes: 52 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
<maven.shade.plugin.version>3.2.1</maven.shade.plugin.version>

<!-- Spark -->
<spark-32.version>3.2.1</spark-32.version>
<spark.default.version>3.2.4</spark.default.version>
<avro.default.version>1.10.2</avro.default.version>
<jackson.core.default.version>2.12.3</jackson.core.default.version>

<!-- Cross build properties -->

Expand All @@ -63,13 +65,15 @@
<scala.compat.version>2.12</scala.compat.version>

<!--Platforms-->
<spark.version>${spark-32.version}</spark.version>
<spark.version>${spark.default.version}</spark.version>
<avro.version>${avro.default.version}</avro.version>
<jackson.core.version>${jackson.core.default.version}</jackson.core.version>

<spark.avro.version>${spark.version}</spark.avro.version>
<kafka.spark.version>0-10</kafka.spark.version>
<confluent.version>6.2.1</confluent.version>

<!--Libs-->
<avro.version>1.10.2</avro.version>
<jacoco.version>0.8.10</jacoco.version>

<!--Tests-->
Expand Down Expand Up @@ -119,6 +123,20 @@
</developer>
</developers>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.core.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Tests -->
<dependency>
Expand Down Expand Up @@ -159,14 +177,6 @@
<version>${spark.avro.version}</version>
</dependency>

<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>provided</scope>
</dependency>

<!-- Spark Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -362,12 +372,36 @@
<profile>
<id>spark-3.2</id>
<properties>
<spark.version>${spark-32.version}</spark.version>
<avro.version>1.10.2</avro.version>
<confluent.version>6.2.1</confluent.version>
<spark.version>${spark.default.version}</spark.version>
<avro.version>${avro.default.version}</avro.version>
<jackson.core.version>${jackson.core.default.version}</jackson.core.version>
</properties>
</profile>

</profile>
<profile>
<id>spark-3.3</id>
<properties>
<spark.version>3.3.4</spark.version>
<avro.version>1.11.0</avro.version>
<jackson.core.version>2.13.4</jackson.core.version>
</properties>
</profile>
<profile>
<id>spark-3.4</id>
<properties>
<spark.version>3.4.2</spark.version>
<avro.version>1.11.1</avro.version>
<jackson.core.version>2.14.2</jackson.core.version>
</properties>
</profile>
<profile>
<id>spark-3.5</id>
<properties>
<spark.version>3.5.0</spark.version>
<avro.version>1.11.2</avro.version>
<jackson.core.version>2.15.2</jackson.core.version>
</properties>
</profile>
<profile>
<id>uber</id>
<build>
Expand Down Expand Up @@ -563,11 +597,9 @@
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
<configuration>
<!-- examples of class excluding -->
<!-- excludes>
<exclude>za/co/absa/enceladus/migrations/continuous/EntityVersionMap.class</exclude>
<exclude>za/co/absa/enceladus/rest_api/exceptions/ValidationException.class</exclude>
</excludes -->
<excludes>
<exclude>za/co/absa/abris/examples/**</exclude>
</excludes>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package za.co.absa.abris.examples

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession}
import za.co.absa.abris.avro.format.SparkAvroConversions
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator
import za.co.absa.abris.examples.utils.CompatibleRowEncoder


object ConfluentKafkaAvroWriter {
Expand Down Expand Up @@ -85,6 +85,6 @@ object ConfluentKafkaAvroWriter {
private def getEncoder: Encoder[Row] = {
val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema)
val sparkSchema = SparkAvroConversions.toSqlType(avroSchema)
RowEncoder.apply(sparkSchema)
CompatibleRowEncoder.apply(sparkSchema)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.examples.utils

import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.types.StructType

import scala.util.Try

object CompatibleRowEncoder {
def apply(schema: StructType): Encoder[Row] = {
// Spark < 3.5.0
val rowEncoderTry = Try {
val rowEncoderClass = Class.forName("org.apache.spark.sql.catalyst.encoders.RowEncoder")
val applyMethod = rowEncoderClass.getMethod("apply", classOf[StructType])
applyMethod.invoke(null, schema).asInstanceOf[Encoder[Row]]
}

// Spark >= 3.5.0
rowEncoderTry.orElse(Try {
val encodersClass = Class.forName("org.apache.spark.sql.Encoders")
val rowMethod = encodersClass.getMethod("row", classOf[StructType])
rowMethod.invoke(null, schema).asInstanceOf[Encoder[Row]]
}).getOrElse {
throw new IllegalStateException("Neither RowEncoder.apply nor Encoders.row is available in the Spark version.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CatalystAvroConversionSpec extends AnyFlatSpec with Matchers with BeforeAn
SchemaManagerFactory.addSRClientInstance(schemaRegistryConfig, mockedSchemaRegistryClient)
}

val bareByteSchema = """{"type": "bytes"}""""
val bareByteSchema = """{"type": "bytes"}"""

it should "convert one type with bare schema to avro an back" in {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package za.co.absa.abris.avro.utils

import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import za.co.absa.abris.avro.format.SparkAvroConversions
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.examples.data.generation.ComplexRecordsGenerator
import za.co.absa.abris.examples.utils.CompatibleRowEncoder

class AvroSchemaEncoder {

def getEncoder: Encoder[Row] = {
val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema)
val sparkSchema = SparkAvroConversions.toSqlType(avroSchema)
RowEncoder.apply(sparkSchema)
CompatibleRowEncoder.apply(sparkSchema)
}

}

0 comments on commit fda9b4f

Please sign in to comment.