Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.3.1, 3.4.1
-
pySpark, linux, hadoop, parquet.
Description
When union-ing two DataFrames read from parquet containing nested structures (2 fields of array types where one is double and second is integer) data from the second field seems to be lost (zeros are set instead).
This seems to be the case only if nested vectorised reader is used (spark.sql.parquet.enableNestedColumnVectorizedReader=true).
The following Python code reproduces the problem:
from pyspark.sql import SparkSession from pyspark.sql.types import * # PREPARING DATA data1 = [] data2 = [] for i in range(2): data1.append( (([1,2,3],[1,1,2]),i)) data2.append( (([1.0,2.0,3.0],[1,1]),i+10)) schema1 = StructType([ StructField('value', StructType([ StructField('f1', ArrayType(IntegerType()), True), StructField('f2', ArrayType(IntegerType()), True) ])), StructField('id', IntegerType(), True) ]) schema2 = StructType([ StructField('value', StructType([ StructField('f1', ArrayType(DoubleType()), True), StructField('f2', ArrayType(IntegerType()), True) ])), StructField('id', IntegerType(), True) ]) spark = SparkSession.builder.getOrCreate() data_dir = "/user/<user>/" df1 = spark.createDataFrame(data1, schema1) df1.write.mode('overwrite').parquet(data_dir + "data1") df2 = spark.createDataFrame(data2, schema2) df2.write.mode('overwrite').parquet(data_dir + "data2") # READING DATA parquet1 = spark.read.parquet(data_dir + "data1") parquet2 = spark.read.parquet(data_dir + "data2") # UNION out = parquet1.union(parquet2) parquet1.select("value.f2").distinct().show() out.select("value.f2").distinct().show() print(parquet1.collect()) print(out.collect())
Output:
+---------+ | f2| +---------+ |[1, 1, 2]| +---------+ +---------+ | f2| +---------+ |[0, 0, 0]| | [1, 1]| +---------+ [ Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=0), Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=1) ] [ Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=0), Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=1), Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=10), Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=11) ]
Please notice that values for the field f2 are lost after the union is done. This only happens when this data is read from parquet files.
Could you please look into this?
Best regards,
Jakub