Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44805

Data lost after union using spark.sql.parquet.enableNestedColumnVectorizedReader=true

    XMLWordPrintableJSON

Details

    Description

      When union-ing two DataFrames read from parquet containing nested structures (2 fields of array types where one is double and second is integer) data from the second field seems to be lost (zeros are set instead). 

      This seems to be the case only if nested vectorised reader is used (spark.sql.parquet.enableNestedColumnVectorizedReader=true). 

      The following Python code reproduces the problem: 

      from pyspark.sql import SparkSession
      from pyspark.sql.types import *
      
      # PREPARING DATA
      data1 = []
      data2 = []
      
      for i in range(2): 
          data1.append( (([1,2,3],[1,1,2]),i))
          data2.append( (([1.0,2.0,3.0],[1,1]),i+10))
      
      schema1 = StructType([
              StructField('value', StructType([
                   StructField('f1', ArrayType(IntegerType()), True),
                   StructField('f2', ArrayType(IntegerType()), True)             
                   ])),
               StructField('id', IntegerType(), True)
      ])
      
      schema2 = StructType([
              StructField('value', StructType([
                   StructField('f1', ArrayType(DoubleType()), True),
                   StructField('f2', ArrayType(IntegerType()), True)             
                   ])),
               StructField('id', IntegerType(), True)
      ])
      
      spark = SparkSession.builder.getOrCreate()
      data_dir = "/user/<user>/"
      
      df1 = spark.createDataFrame(data1, schema1)
      df1.write.mode('overwrite').parquet(data_dir + "data1") 
      df2 = spark.createDataFrame(data2, schema2)
      df2.write.mode('overwrite').parquet(data_dir + "data2") 
      
      
      # READING DATA
      parquet1 = spark.read.parquet(data_dir + "data1")
      parquet2 = spark.read.parquet(data_dir + "data2")
      
      
      # UNION
      out = parquet1.union(parquet2)
      
      
      parquet1.select("value.f2").distinct().show()
      out.select("value.f2").distinct().show()
      print(parquet1.collect())
      print(out.collect()) 

      Output: 

      +---------+
      |       f2|
      +---------+
      |[1, 1, 2]|
      +---------+
      
      +---------+
      |       f2|
      +---------+
      |[0, 0, 0]|
      |   [1, 1]|
      +---------+
      
      
      [
      Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=0), 
      Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=1)
      ]
      
      [
      Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=0), 
      Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=1), 
      Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=10), 
      Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=11)
      ] 

      Please notice that values for the field f2 are lost after the union is done. This only happens when this data is read from parquet files. 

      Could you please look into this? 

      Best regards,

      Jakub

      Attachments

        Activity

          People

            bersprockets Bruce Robbins
            jwozniak Jakub Wozniak
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: