Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25649

CatalystTypeConverter throws exception for ScalaesRow type when converting from ArrayConverter

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Invalid
    • 2.3.2
    • None
    • Examples
    • None
    • Scala version: 2.2.1
      Elasticsearch-hadoop version: 6.4.1
      Java version: 1.8.0_181

    Description

      Hi,

      I am trying running pyspark to convert elastic search data using `elasticsearch-hadoop:6.4.2`.

      Here, I have a few array fields which looks like:
      "outer_array": [

      { "inner_array": ["1", "2", "3"], "other_value": 4 }

      ,

      { "inner_array": ["5", "6", "7"], "other_value": 9 }

      ]

      Outer array is an array of objects which has a property which is an array. Basically the issue is that it is not able to convert ScalaEsRow type to ArrayConverter, which is shown in the stack trace below.

      For the conversion part, I am providing the following option:

      df2=spark.read.option("es.nodes", "localhost").option("es.port","9200").option("es.read.field.as.array.include","outer_array,'outer_array.inner_array', 'outer_array.other_value'").format("org.elasticsearch.spark.sql").load("temp-index/customer")
      df2.show(1)

      Here it's showing the following error:

      scala.MatchError: [Buffer(1, 2, 3),4] (of class org.elasticsearch.spark.sql.ScalaEsRow)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:160)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
      at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:61)
      at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:58)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

      Attachments

        Activity

          People

            Unassigned Unassigned
            mauliksoneji Maulik Soneji
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: