Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44679

java.lang.OutOfMemoryError: Requested array size exceeds VM limit

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.1
    • None
    • EC2, PySpark
    • None
    • We use Amazon EMR to run Pyspark jobs.
      Amazon EMR version : emr-6.7.0
      Installed applications : 
      Tez 0.9.2, Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1, Zookeeper 3.5.7, HCatalog 3.1.3, Livy 0.7.1

    • Important

    Description

      We get the following error from our Pyspark application in Production env:

      java.lang.OutOfMemoryError: Requested array size exceeds VM limit

      I simplified the code we used and shared it below so you can easily investigate the issue.

      We use Pyspark to read 900 MB text file which has one record. We use foreach function to iterate over the Datafreme and apply some high order function. The error occurs once foreach action is triggered. I think the issue is related to the integer data type of the bytes array used to hold the serialized dataframe. Since the dataframe record was too big, it seems the serialized record exceeded the max integer value, hence the error occurred. 

      Note that the same error happens when using foreachBatch function with writeStream. 

      Our prod data has many records larger than 100 MB.  Appreciate your help to provide a fix or a solution to that issue.

       

      Find below the code snippet:
      from pyspark.sql import SparkSession,functions as f
       
      def check_file_name(row):
          print("check_file_name called")
       
      def main():
          spark=SparkSession.builder.enableHiveSupport().getOrCreate()
      inputPath = "s3://bucket-name/common/source/"
          inputDF = spark.read.text(inputPath, wholetext=True)
          inputDF = inputDF.select(f.date_format(f.current_timestamp(), 'yyyyMMddHH').astype('string').alias('insert_hr'),
                              f.col("value").alias("raw_data"),
                              f.input_file_name().alias("input_file_name"))
          inputDF.foreach(check_file_name)
       
      if _name_ == "_main_":
          main()
      Find below spark-submit command used:

      spark-submit --master yarn --conf spark.serializer=org.apache.spark.serializer.KryoSerializer  --num-executors 15 --executor-cores 4 --executor-memory 20g --driver-memory 20g --name haitham_job --deploy-mode cluster big_file_process.py

      Attachments

        1. code_sample.txt
          0.6 kB
          Haitham Eltaweel

        Activity

          People

            Unassigned Unassigned
            haitham Haitham Eltaweel
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: