Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39218

Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.3, 3.1.2, 3.2.1, 3.3.0
    • 3.3.0
    • None

    Description

      For example,

      import time
      
      def func(batch_df, batch_id):
          time.sleep(10)
          print(batch_df.count())
      
      q = spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
      time.sleep(5)
      q.stop()
      

      works find with pinned thread mode is disabled. Whe pinned thread mode is enabled:

      22/05/18 15:23:24 ERROR MicroBatchExecution: Query [id = 2538f8a2-c6e4-44c9-bf38-e6dab555267e, runId = 1d500478-1d77-46aa-b35a-585264a809b9] terminated with error
      py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
        File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
          return_value = getattr(self.pool[obj_id], method)(*params)
        File "/.../spark/python/pyspark/sql/utils.py", line 272, in call
          raise e
        File "/.../spark/python/pyspark/sql/utils.py", line 269, in call
          self.func(DataFrame(jdf, self.session), batch_id)
        File "<stdin>", line 3, in func
        File "/.../spark/python/pyspark/sql/dataframe.py", line 804, in count
          return int(self._jdf.count())
        File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
          return_value = get_return_value(
        File "/.../spark/python/pyspark/sql/utils.py", line 190, in deco
          return f(*a, **kw)
        File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
          raise Py4JJavaError(
      py4j.protocol.Py4JJavaError: An error occurred while calling o44.count.
      : java.lang.InterruptedException
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
      	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
      	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
      	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
      	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2227)
      

      Attachments

        Activity

          People

            gurwls223 Hyukjin Kwon
            gurwls223 Hyukjin Kwon
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: