Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22386 Data Source V2 improvements
  3. SPARK-23325

DataSourceV2 readers should always produce InternalRow.

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.4.0
    • SQL
    • None

    Description

      DataSourceV2 row-oriented implementations are limited to producing either Row instances or UnsafeRow instances by implementing SupportsScanUnsafeRow. Instead, I think that implementations should always produce InternalRow.

      The problem with the choice between Row and UnsafeRow is that neither one is appropriate for implementers.

      File formats don't produce Row instances or the data values used by Row, like java.sql.Timestamp and java.sql.Date. An implementation that uses Row instances must produce data that is immediately translated from the representation that was just produced by Spark. In my experience, it made little sense to translate a timestamp in microseconds to a (milliseconds, nanoseconds) pair, create a Timestamp instance, and pass that instance to Spark for immediate translation back.

      On the other hand, UnsafeRow is very difficult to produce unless data is already held in memory. Even the Parquet support built into Spark deserializes to InternalRow and then uses UnsafeProjection to produce unsafe rows. When I went to build an implementation that deserializes Parquet or Avro directly to UnsafeRow (I tried both), I found that it couldn't be done without first deserializing into memory because the size of an array must be known before any values are written.

      I ended up deciding to deserialize to InternalRow and use UnsafeProjection to convert to unsafe. There are two problems with this: first, this is Scala and was difficult to call from Java (it required reflection), and second, this causes double projection in the physical plan (a copy for unsafe to unsafe) if there is a projection that wasn't fully pushed to the data source.

      I think the solution is to have a single interface for readers that expects InternalRow. Then, a projection should be added in the Spark plan to convert to unsafe and avoid projection in the plan and in the data source. If the data source already produces unsafe rows by deserializing directly, this still minimizes the number of copies because the unsafe projection will check whether the incoming data is already UnsafeRow.

      Using InternalRow would also match the interface on the write side.

      Attachments

        Issue Links

          Activity

            People

              rdblue Ryan Blue
              rdblue Ryan Blue
              Votes:
              0 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: