Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30921

Error using VectorAssembler after Pandas GROUPED_AGG UDF

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.4
    • 3.0.0
    • ML, PySpark
    • None
    • numpy==1.16.4
      pandas==0.23.4
      py4j==0.10.7
      pyarrow==0.8.0
      pyspark==2.4.4
      scikit-learn==0.19.1
      scipy==1.1.0

    Description

      Using VectorAssembler after a Pandas GROUPED_AGG and join causes an opaque error:

      Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: apply_impl(input[1, struct<t:bigint,val:bigint>, true].val)

      However, inserting a .cache() between the VectorAssembler and join seems to prevent VectorAssembler & Pandas UDF from interacting to cause this error.

       

      E py4j.protocol.Py4JJavaError: An error occurred while calling o259.collectToPython.
      E : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
      E Exchange hashpartitioning(foo_id_SummaryAggregator_AOG2FHR#34L, 4)
      E +- *(4) Filter AtLeastNNulls(n, apply_impl(foo_explode_SummaryAggregator_AOG2FHR#20.val),apply_impl(foo_explode_SummaryAggregator_AOG2FHR#20.val))
      E +- Generate explode(foo#11), foo_id_SummaryAggregator_AOG2FHR#34L, true, foo_explode_SummaryAggregator_AOG2FHR#20
      E +- *(3) Project foo#11, monotonically_increasing_id() AS foo_id_SummaryAggregator_AOG2FHR#34L
      E +- Scan ExistingRDDfoo#11,id#12L
      {{E }}
      E at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
      E at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
      E at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
      E at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.python.AggregateInPandasExec.doExecute(AggregateInPandasExec.scala:80)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
      E at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
      E at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:383)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
      E at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
      E at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
      E at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
      E at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
      E at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
      E at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
      E at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
      E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
      E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
      E at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
      E at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
      E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      E at java.lang.reflect.Method.invoke(Method.java:498)
      E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      E at py4j.Gateway.invoke(Gateway.java:282)
      E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      E at py4j.commands.CallCommand.execute(CallCommand.java:79)
      E at py4j.GatewayConnection.run(GatewayConnection.java:238)
      E at java.lang.Thread.run(Thread.java:748)
      E Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: apply_impl(input[1, struct<t:bigint,val:bigint>, true].val)
      E at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
      E at org.apache.spark.sql.catalyst.expressions.PythonUDF.doGenCode(PythonUDF.scala:50)
      E at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
      E at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
      E at scala.Option.getOrElse(Option.scala:121)
      E at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
      E at org.apache.spark.sql.catalyst.expressions.AtLeastNNonNulls$$anonfun$4.apply(nullExpressions.scala:402)
      E at org.apache.spark.sql.catalyst.expressions.AtLeastNNonNulls$$anonfun$4.apply(nullExpressions.scala:401)
      E at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      E at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      E at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      E at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      E at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      E at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      E at org.apache.spark.sql.catalyst.expressions.AtLeastNNonNulls.doGenCode(nullExpressions.scala:401)
      E at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
      E at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
      E at scala.Option.getOrElse(Option.scala:121)
      E at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
      E at org.apache.spark.sql.execution.FilterExec.org$apache$spark$sql$execution$FilterExec$$genPredicate$1(basicPhysicalOperators.scala:139)
      E at org.apache.spark.sql.execution.FilterExec$$anonfun$13.apply(basicPhysicalOperators.scala:179)
      E at org.apache.spark.sql.execution.FilterExec$$anonfun$13.apply(basicPhysicalOperators.scala:163)
      E at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      E at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      E at scala.collection.immutable.List.foreach(List.scala:392)
      E at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      E at scala.collection.immutable.List.map(List.scala:296)
      E at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:163)
      E at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
      E at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:374)
      E at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:403)
      E at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
      E at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
      E at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:374)
      E at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
      E at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
      E at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
      E at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
      E at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
      E at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
      E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      E at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
      E at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
      E at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
      E at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
      E ... 69 more

      Attachments

        1. test_dyn_pandas_function.py
          1 kB
          Tim Kellogg

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              tkellogg Tim Kellogg
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: