Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29503

MapObjects doesn't copy Unsafe data when nested under Safe data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.1, 2.2.3, 2.3.4, 2.4.5, 3.0.0
    • 3.0.0
    • SQL

    Description

      In order for MapObjects to operate safely, it checks to see if the result of the mapping function is an Unsafe type (UnsafeRow, UnsafeArrayData, UnsafeMapData) and performs a copy before writing it into MapObjects' output array. This is to protect against expressions which re-use the same native memory buffer to represent its result across evaluations; if the copy wasn't here, all results would be pointing to the same native buffer and would represent the last result written to the buffer. However, MapObjects misses this needed copy if the Unsafe data is nested below some safe structure, for instance a GenericArrrayData whose elements are all UnsafeRows. In this scenario, all elements of the GenericArrayData will be pointing to the same native UnsafeRow buffer which will hold the last value written to it.

       

      Right now, this bug seems to only occur when a `ProjectExec` goes down the `execute` path, as opposed to WholeStageCodegen's `produce` and `consume` path.

       

      Example Reproduction Code:

      import org.apache.spark.sql.catalyst.expressions.objects.MapObjects
      import org.apache.spark.sql.catalyst.expressions.CreateArray
      import org.apache.spark.sql.catalyst.expressions.Expression
      import org.apache.spark.sql.functions.{array, struct}
      import org.apache.spark.sql.Column
      import org.apache.spark.sql.types.ArrayType
      
      // For the purpose of demonstration, we need to disable WholeStage codegen
      spark.conf.set("spark.sql.codegen.wholeStage", "false")
      
      val exampleDS = spark.sparkContext.parallelize(Seq(Seq(1, 2, 3))).toDF("items")
      
      // Trivial example: Nest unsafe struct inside safe array
      // items: Seq[Int] => items.map{item => Seq(Struct(item))}
      val result = exampleDS.select(    
          new Column(MapObjects(
              {item: Expression => array(struct(new Column(item))).expr},
              $"items".expr,
              exampleDS.schema("items").dataType.asInstanceOf[ArrayType].elementType
          )) as "items"
      )
      
      result.show(10, false)
      

       

      Actual Output:

      +---------------------------------------------------------+
      |items                                                    |
      +---------------------------------------------------------+
      |[WrappedArray([3]), WrappedArray([3]), WrappedArray([3])]|
      +---------------------------------------------------------+
      

       

      Expected Output:

      +---------------------------------------------------------+
      |items                                                    |
      +---------------------------------------------------------+
      |[WrappedArray([1]), WrappedArray([2]), WrappedArray([3])]|
      +---------------------------------------------------------+
      

       

      We've confirmed that the bug exists on version 2.1.1 as well as on master (which I assume corresponds to version 3.0.0?)

       

      Attachments

        Issue Links

          Activity

            People

              kabhwan Jungtaek Lim
              aaronlewism Aaron Lewis
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: