Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30952

Grouped pandas_udf crashed when a group returned an empty DataFrame

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.4.0
    • None
    • PySpark
    • None

    Description

      We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). 

       

      Sample Code:

      from pyspark.sql.functions import pandas_udf, PandasUDFType
      from pyspark.sql.types import StructField, StructType, StringType, LongType
      import pandas as pd
      from pyspark.sql import SparkSession
      import numpy as np
      
      
      def check_pdf():
          schema = StructType([
              StructField("customer_id", StringType(), True),
              StructField("count", LongType(), True)
          ])
      
          @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
          def handler(pdf):
              mean = float(np.mean(pdf["count"]))
              sigma = float(np.std(pdf["count"], ddof=1))
              return pdf[pdf["count"] > mean + 3 * sigma]
      
          return handler
      
      
      def main():
          spark = SparkSession.builder \
              .appName("AppTest") \
              .master("local[4]") \
              .config("spark.driver.host", "localhost") \
              .config("spark.sql.shuffle.partitions", 2) \
              .getOrCreate()
      
          df = spark.createDataFrame([
              {
                  "count": 15,
                  "customer_id": "c1"
              },
              {
                  "count": 11,
                  "customer_id": "c1"
              },
              {
                  "count": 11,
                  "customer_id": "c2"
              }
          ])
      
          result = df.groupby("customer_id").apply(check_pdf()).collect()
          print(result)
      
          spark.stop()
      
      
      if __name__ == '__main__':
          main()
      
      

      Exception:

      2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4)
      org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
      at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) 
      at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) 
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) 
      at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) 
      at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 
      at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
      at org.apache.spark.scheduler.Task.run(Task.scala:121) 
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) 
      ... 20 more
      
      
      
      

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            bruce_zhao bruce_zhao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: