Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26947

Pyspark KMeans Clustering job fails on large values of k

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Invalid
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Component/s: ML, MLlib, PySpark
    • Labels:
      None

      Description

      We recently had a case where a user's pyspark job running KMeans clustering was failing for large values of k. I was able to reproduce the same issue with dummy dataset. I have attached the code as well as the data in the JIRA. The stack trace is printed below from Java:

       

      Exception in thread "Thread-10" java.lang.OutOfMemoryError: Java heap space
      	at java.util.Arrays.copyOf(Arrays.java:3332)
      	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
      	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:649)
      	at java.lang.StringBuilder.append(StringBuilder.java:202)
      	at py4j.Protocol.getOutputCommand(Protocol.java:328)
      	at py4j.commands.CallCommand.execute(CallCommand.java:81)
      	at py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.lang.Thread.run(Thread.java:748)
      

      Python:

      Traceback (most recent call last):
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/xxx/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
          raise Py4JNetworkError("Answer from Java side is empty")
      py4j.protocol.Py4JNetworkError: Answer from Java side is empty
      
      During handling of the above exception, another exception occurred:
      
      Traceback (most recent call last):
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/xxx/container_xxx/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
          response = connection.send_command(command)
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
          "Error while receiving", e, proto.ERROR_ON_RECEIVE)
      py4j.protocol.Py4JNetworkError: Error while receiving
      Traceback (most recent call last):
        File "clustering_app.py", line 154, in <module>
          main(args)
        File "clustering_app.py", line 145, in main
          run_clustering(sc, args.input_path, args.output_path, args.num_clusters_list)
        File "clustering_app.py", line 136, in run_clustering
          clustersTable, cluster_Centers = clustering(sc, documents, output_path, k, max_iter)
        File "clustering_app.py", line 68, in clustering
          cluster_Centers = km_model.clusterCenters()
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/pyspark.zip/pyspark/ml/clustering.py", line 337, in clusterCenters
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/pyspark.zip/pyspark/ml/wrapper.py", line 55, in _call_java
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/pyspark.zip/pyspark/ml/common.py", line 109, in _java2py
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
        File "/grid/2/tmp/yarn-local/usercache/user/appcache/application_xxx/container_xxx/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value
      py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.ml.python.MLSerDe.dumps
      

      The command with which the application was launched is given below:

      $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --conf spark.executor.memory=20g --conf spark.driver.memory=20g --conf spark.executor.memoryOverhead=4g --conf spark.driver.memoryOverhead=4g --conf spark.kryoserializer.buffer.max=2000m --conf spark.driver.maxResultSize=12g ~/clustering_app.py --input_path hdfs:///user/username/part-v001x --output_path hdfs:///user/username --num_clusters_list 10000
      

      The input dataset is approximately 90 MB in size and the assigned heap memory to both driver and executor is close to 20 GB. This only happens for large values of k.

        Attachments

        1. clustering_app.py
          4 kB
          Parth Gandhi

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              pgandhi Parth Gandhi
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: