Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38091

AvroSerializer can cause java.lang.ClassCastException at run time

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
    • None
    • SQL

    Description

      AvroSerializer's implementation, at least in newConverter, was not 100% based on the nternalRow and SpecializedGetters interface. It assumes many implementation details of the interface. 

      For example, in 

            case (TimestampType, LONG) => avroType.getLogicalType match {
                // For backward compatibility, if the Avro type is Long and it is not logical type
                // (the `null` case), output the timestamp value as with millisecond precision.
                case null | _: TimestampMillis => (getter, ordinal) =>
                  DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
                case _: TimestampMicros => (getter, ordinal) =>
                  timestampRebaseFunc(getter.getLong(ordinal))
                case other => throw new IncompatibleSchemaException(errorPrefix +
                  s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
              }
      

      it assumes the InternalRow instance encodes TimestampType as java.lang.Long. That's true for Unsaferow but not for GenericInternalRow

      Hence the above code will end up with runtime exceptions when used on an instance of GenericInternalRow, which is the case for Python UDF. 

      I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a UnsafeRow and Python UDF doesn't involve the optimizer(s) and hence each row is a GenericInternalRow

      It would be great if someone can correct me or offer a better explanation. 

       

      To reproduce the issue, 

      git checkout master and git cherry-pick --no-commit this-commit

      and run the test org.apache.spark.sql.avro.AvroSerdeSuite.

       

      You will see runtime exceptions like the following one


      
      Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
        java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app')
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
        at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
        at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
        at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
        at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
        at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
        at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
        at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
      \\

      Attachments

        Activity

          People

            Unassigned Unassigned
            Zhen-hao Zhenhao Li
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: