Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27805

toPandas does not propagate SparkExceptions with arrow enabled

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.3
    • 3.0.0
    • PySpark, SQL
    • None

    Description

      When calling toPandas with arrow enabled errors encountered during the collect are not propagated to the python process.
      There is only a very general EofError raised.
      Example of behavior with arrow enabled vs. arrow disabled:

      import traceback
      from pyspark.sql.functions import udf
      from pyspark.sql.types import IntegerType
      
      def raise_exception():
        raise Exception("My error")
      error_udf = udf(raise_exception, IntegerType())
      df = spark.range(3).toDF("i").withColumn("x", error_udf())
      try:
          df.toPandas()
      except:
          no_arrow_exception = traceback.format_exc()
      spark.conf.set("spark.sql.execution.arrow.enabled", "true")
      try:
          df.toPandas()
      except:
          arrow_exception = traceback.format_exc()
      print no_arrow_exception
      print arrow_exception
      

      arrow_exception gives as output:

      >>> print arrow_exception
      Traceback (most recent call last):
        File "<stdin>", line 2, in <module>
        File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2143, in toPandas
          batches = self._collectAsArrow()
        File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2205, in _collectAsArrow
          results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
        File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, in load_stream
          num = read_int(stream)
        File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, in read_int
          raise EOFError
      EOFError
      

      no_arrow_exception gives as output:

      Traceback (most recent call last):
        File "<stdin>", line 2, in <module>
        File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 2166, in toPandas
          pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
        File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 516, in collect
          sock_info = self._jdf.collectToPython()
        File "/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
          answer, self.gateway_client, self.target_id, self.name)
        File "/Users/dvogelbacher/git/spark/python/pyspark/sql/utils.py", line 89, in deco
          return f(*a, **kw)
        File "/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
          format(target_id, ".", name), value)
      Py4JJavaError: An error occurred while calling o38.collectToPython.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 428, in main
          process()
        File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 423, in process
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 438, in dump_stream
          self.serializer.dump_stream(self._batched(iterator), stream)
        File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 141, in dump_stream
          for obj in iterator:
        File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 427, in _batched
          for item in iterator:
        File "<string>", line 1, in <lambda>
        File "/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 86, in <lambda>
          return lambda *a: f(*a)
        File "/Users/dvogelbacher/git/spark/python/pyspark/util.py", line 99, in wrapper
          return f(*args, **kwargs)
        File "<stdin>", line 2, in raise_exception
      Exception: My error
      ...
      

      Attachments

        Issue Links

          Activity

            People

              dvogelbacher David Vogelbacher
              dvogelbacher David Vogelbacher
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: