Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47740

Stop JVM by calling SparkSession.stop from PySpark

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1
    • None
    • PySpark
    • None

    Description

      from pyspark.sql import SparkSession
      
      spark = SparkSession.builder.config("spark.driver.memory", "2g").config("spark.another.property", "hi").getOrCreate()
      spark.conf.get("spark.driver.memory")
      # prints '2g'
      spark.conf.get("spark.another.property")
      # prints 'hi'
      
      # check for JVM process before and after stop
      spark.stop()
      
      # try to recreate Spark session with different driver memory, and no "spark.another.property" at all
      spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
      spark.conf.get("spark.driver.memory")
      # prints '3g'
      spark.conf.get("spark.another.property")
      # prints 'hi'
      
      # check for JVM process
      

      After first call of .getOrCreate() JVM process with following options have been started:

      maxim      76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 /home/maxim/.sdkman/candidates/java/current/bin/java -cp /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/* -Xmx2g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
      

      But it have not been stopped by .stop() method call. New Spark session which reports 3Gb of driver memory actually uses the same 2Gb, and config options from old session which should be closed at this moment.

      Summarizing:

      • spark.stop() stops only SparkContext, but not JVM
      • spark.stop() does not clean up session config

      This behavior has been observed for a long time, at least since 2.2.0, and still present in 3.5.1.

      This could be solved by stopping JVM after Spark session is stopped.
      But just calling spark._jvm.System.exit(0) fill fail because py4j socket will be closed, and it will be impossible to start new Spark session:

      spark._jvm.System.exit(0)
      
      ERROR:root:Exception while sending command.
      Traceback (most recent call last):
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command
          raise Py4JNetworkError("Answer from Java side is empty")
      py4j.protocol.Py4JNetworkError: Answer from Java side is empty
      
      During handling of the above exception, another exception occurred:
      
      Traceback (most recent call last):
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
          response = connection.send_command(command)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command
          raise Py4JNetworkError(
      py4j.protocol.Py4JNetworkError: Error while sending or receiving
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
          return_value = get_return_value(
                         ^^^^^^^^^^^^^^^^^
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
          return f(*a, **kw)
                 ^^^^^^^^^^^
        File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py", line 334, in get_return_value
          raise Py4JError(
      py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
      

      Here is an article describing the same issue, and also providing a safe way to stop JVM without breaking py4j (in Russian):
      https://habr.com/ru/companies/sberbank/articles/805285/

      from pyspark.sql import SparkSession
      from pyspark import SparkContext
      from threading import RLock
      from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART
      
      
      from pyspark.sql import SparkSession
      from pyspark import SparkContext
      from threading import RLock
      from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART
      
      def stop_spark_session(spark_session: SparkSession):
          # stop spark session as usual
          spark.stop()
          # build command to stop JVM
          sys_exit = spark._gateway.jvm.System.exit
          args = sys_exit._build_args(0)[0]
          command = CALL_COMMAND_NAME +\
              sys_exit.command_header +\
              args +\
              END_COMMAND_PART
          # execute it
          gateway_connection = sys_exit.gateway_client._get_connection()
          gateway_socket = gateway_connection.socket
          gateway_socket.sendall(command.encode("utf-8"))
          spark._gateway.proc.terminate()
          spark._gateway.proc.poll()
          # close gateway sockets on Python side
          spark._gateway.shutdown()
      
          # reset globals to initial state
          SparkContext._next_accum_id = 0
          SparkContext._active_spark_context = None
          SparkContext._lock = RLock()
          SparkContext._jvm = None
          SparkContext._gateway = None
          SparkContext._python_includes = None
      

      Calling stop_spark_session(spark) stops JVM, and allows users to create fresh new Spark session using PySpark. No config pollution, no weird semi-closed global contexts.

      Can something like this be added to SparkSession.stop() implementation? Of source, only for mode=client and spark=local|yarn|k8s, there py4j is used.

      Attachments

        Activity

          People

            Unassigned Unassigned
            dolfinus Maxim Martynov
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: