Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.1
-
None
-
None
Description
from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.driver.memory", "2g").config("spark.another.property", "hi").getOrCreate() spark.conf.get("spark.driver.memory") # prints '2g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process before and after stop spark.stop() # try to recreate Spark session with different driver memory, and no "spark.another.property" at all spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate() spark.conf.get("spark.driver.memory") # prints '3g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process
After first call of .getOrCreate() JVM process with following options have been started:
maxim 76625 10.9 2.7 7975292 447068 pts/8 Sl+ 16:59 0:11 /home/maxim/.sdkman/candidates/java/current/bin/java -cp /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/* -Xmx2g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
But it have not been stopped by .stop() method call. New Spark session which reports 3Gb of driver memory actually uses the same 2Gb, and config options from old session which should be closed at this moment.
Summarizing:
- spark.stop() stops only SparkContext, but not JVM
- spark.stop() does not clean up session config
This behavior has been observed for a long time, at least since 2.2.0, and still present in 3.5.1.
This could be solved by stopping JVM after Spark session is stopped.
But just calling spark._jvm.System.exit(0) fill fail because py4j socket will be closed, and it will be impossible to start new Spark session:
spark._jvm.System.exit(0)
ERROR:root:Exception while sending command. Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending or receiving Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco return f(*a, **kw) ^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py", line 334, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
Here is an article describing the same issue, and also providing a safe way to stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/
from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART def stop_spark_session(spark_session: SparkSession): # stop spark session as usual spark.stop() # build command to stop JVM sys_exit = spark._gateway.jvm.System.exit args = sys_exit._build_args(0)[0] command = CALL_COMMAND_NAME +\ sys_exit.command_header +\ args +\ END_COMMAND_PART # execute it gateway_connection = sys_exit.gateway_client._get_connection() gateway_socket = gateway_connection.socket gateway_socket.sendall(command.encode("utf-8")) spark._gateway.proc.terminate() spark._gateway.proc.poll() # close gateway sockets on Python side spark._gateway.shutdown() # reset globals to initial state SparkContext._next_accum_id = 0 SparkContext._active_spark_context = None SparkContext._lock = RLock() SparkContext._jvm = None SparkContext._gateway = None SparkContext._python_includes = None
Calling stop_spark_session(spark) stops JVM, and allows users to create fresh new Spark session using PySpark. No config pollution, no weird semi-closed global contexts.
Can something like this be added to SparkSession.stop() implementation? Of source, only for mode=client and spark=local|yarn|k8s, there py4j is used.