Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32122

Exception while writing dataframe with enum fields

    XMLWordPrintableJSON

Details

    • Question
    • Status: Resolved
    • Minor
    • Resolution: Cannot Reproduce
    • 2.4.3
    • None
    • SQL

    Description

      I have an avro schema with one field which is an enum and I am trying to enforce this schema when I am writing my dataframe, the code looks something like this

      case class Name1(id:String,count:Int,val_type:String)
      
      val schema = """{
                       |  "type" : "record",
                       |  "name" : "name1",
                       |  "namespace" : "com.data",
                       |  "fields" : [
                       |  {
                       |    "name" : "id",
                       |    "type" : "string"
                       |  },
                       |  {
                       |    "name" : "count",
                       |    "type" : "int"
                       |  },
                       |  {
                       |    "name" : "val_type",
                       |    "type" : {
                       |      "type" : "enum",
                       |      "name" : "ValType",
                       |      "symbols" : [ "s1", "s2" ]
                       |    }
                       |  }
                       |  ]
                       |}""".stripMargin
      
      val df = Seq(
                  Name1("1",2,"s1"),
                  Name1("1",3,"s2"),
                  Name1("1",4,"s2"),
                  Name1("11",2,"s1")).toDF()
      
      df.write.format("avro").option("avroSchema",schema).save("data/tes2/")
      

      This code fails with the following exception,

       

      2020-06-28 23:28:10 ERROR Utils:91 - Aborting task
      org.apache.avro.AvroRuntimeException: Not a union: "string"
      	at org.apache.avro.Schema.getTypes(Schema.java:299)
      	at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
      	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
      	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:392)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:296)
      	at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
      	at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
      	at org.apache.spark.sql.avro.AvroOutputWriter.serializer$lzycompute(AvroOutputWriter.scala:42)
      	at org.apache.spark.sql.avro.AvroOutputWriter.serializer(AvroOutputWriter.scala:42)
      	at org.apache.spark.sql.avro.AvroOutputWriter.write(AvroOutputWriter.scala:64)
      	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
      	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
      	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      	at org.apache.spark.scheduler.Task.run(Task.scala:121)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      2020-06-28 23:28:10 ERROR Utils:91 - Aborting task

       

      I understand this is because of the type of val_type is  `String` in the case class. Can you please advice how I can solve this problem without having to change the underlying avro schema?

      Thanks!

      Attachments

        Activity

          People

            Unassigned Unassigned
            kiran002 Sai kiran Krishna murthy
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: