Details
-
Bug
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
-
None
Description
AvroSerializer's implementation, at least in newConverter, was not 100% based on the nternalRow and SpecializedGetters interface. It assumes many implementation details of the interface.
For example, in
case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. case null | _: TimestampMillis => (getter, ordinal) => DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) case _: TimestampMicros => (getter, ordinal) => timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException(errorPrefix + s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other") }
it assumes the InternalRow instance encodes TimestampType as java.lang.Long. That's true for Unsaferow but not for GenericInternalRow.
Hence the above code will end up with runtime exceptions when used on an instance of GenericInternalRow, which is the case for Python UDF.
I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a UnsafeRow and Python UDF doesn't involve the optimizer(s) and hence each row is a GenericInternalRow.
It would be great if someone can correct me or offer a better explanation.
To reproduce the issue,
git checkout master and git cherry-pick --no-commit this-commit
and run the test org.apache.spark.sql.avro.AvroSerdeSuite.
You will see runtime exceptions like the following one
Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED *** java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283) at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67) at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217) \\