Description
A trivially reproducible bug in the code for `FrequentItems`. The schema for the resulting arrays of frequent items is hard coded] to have non-nullable array elements:
val outputCols = colInfo.map { v => StructField(v._1 + "_freqItems", ArrayType(v._2, false)) } val schema = StructType(outputCols).toAttributes Dataset.ofRows(df.sparkSession, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
However if the column contains frequent nulls then these nulls are included in the frequent items array. This results in various errors such as any attempt to `collect()` resulting in a null pointer exception:
from pyspark.sql import SparkSession spark = SparkSession.Builder().getOrCreate() df = spark.createDataFrame([ (1, 'a'), (2, None), (3, 'b'), ], schema="id INTEGER, val STRING") rows = df.freqItems(df.columns).collect()
Results in:
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 533, in collect sock_info = self._jdf.collectToPython() File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o40.collectToPython. : java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:44) at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:39) at org.apache.spark.sql.execution.LocalTableScanExec.executeCollect(LocalTableScanExec.scala:70) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
Unclear if the hardcoding is at fault or if the algorithm is actually designed to not return nulls even if they are frequent. In which case the hard coding would be appropriate. I'll put a PR in that assumes that the hardcoding is the bug unless people know otherwise?