Skip to content

Commit

Permalink
Add support for Apache Spark 4.0.0-SNAPSHOT with fewer exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
nimrodo committed Apr 8, 2024
1 parent 416c4a3 commit 6ffbca6
Showing 1 changed file with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType
import za.co.absa.commons.annotation.DeveloperApi

import scala.collection.mutable
import scala.util.Try

/**
Expand All @@ -31,18 +32,37 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {

private val deserializer = {
val clazz = classOf[AvroDeserializer]
Try {
clazz.getConstructor(classOf[Schema], classOf[DataType])
.newInstance(rootAvroType, rootCatalystType) // Spark 2.4 -
}.recover { case _: NoSuchMethodException =>
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String])
.newInstance(rootAvroType, rootCatalystType, "LEGACY") // Spark 3.0 - Spark 3.5.0 (including)
}.recover { case _: NoSuchMethodException =>
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String], classOf[Boolean])
.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean) // Spark 3.5.x +
val schemaClz = classOf[Schema]
val dataTypeClz = classOf[DataType]
val stringClz = classOf[String]
val booleanClz = classOf[Boolean]

clazz.getConstructors.collectFirst {
case currCtor if currCtor.getParameterTypes sameElements
Array(schemaClz, dataTypeClz) =>
// Spark 2.4
currCtor.newInstance(rootAvroType, rootCatalystType)
case currCtor if currCtor.getParameterTypes sameElements
Array(schemaClz, dataTypeClz, stringClz) =>
// Spark 3.0 - Spark 3.5.0 (including)
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY")
case currCtor if currCtor.getParameterTypes sameElements
Array(schemaClz, dataTypeClz, stringClz, booleanClz) =>
// Spark 3.5.1 - 3.5.2
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean)
case currCtor if currCtor.getParameterTypes.toSeq sameElements
Array(schemaClz, dataTypeClz, stringClz, booleanClz, stringClz) =>
// Spark 4.0.0-SNAPSHOT+
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean, "")
} match {
case Some(value: AvroDeserializer) =>
value
case _ =>
throw new NoSuchMethodException(
s"""Supported constructors for AvroDeserializer are:
|${clazz.getConstructors.toSeq.mkString(System.lineSeparator())}""".stripMargin)
}
.get
.asInstanceOf[AvroDeserializer]

}

private val ru = scala.reflect.runtime.universe
Expand Down

0 comments on commit 6ffbca6

Please sign in to comment.