Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-43816

Spark Corrupts Data In-Transit for High Volume (> 20 TB/hr) of Data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.3.1
    • None
    • PySpark

    Description

      Bug Context

      Hello! I would like to report a bug that my team noticed while we were using Spark (please see the Environment section to see our exact setup).

      The application we built is meant to convert a large number of JSON files (JSON Lines format) and write them to a Delta table. The JSON files are located in an Azure Data Lake Gen 2 without hierarchical namespacing. The Delta table is in an Azure Data Lake Gen 2 with hierarchical namespacing.

      We have a PySpark notebook in our Synapse Analytics workspace which reads the JSON files into a DataFrame and then writes them to the Delta table. It uses batch processing.

      The JSON files have no corrupt records, we checked them thoroughly. And there are no code flaws in our PySpark notebook, we also checked that.

      Our code reads 15 TB of JSON files (each file is about 400 MB in size) into our PySpark DataFrame in the following way.

      originalDF = (  
      spark.read
          .schema(originDataSchema)
          .option("pathGlobFilter", DESIRED_FILE_PATTERN)
          .option("mode", "PERMISSIVE")
          .option("columnNameOfCorruptRecord", "DiscoveredCorruptRecords")
          .option("badRecordsPath", BAD_RECORDS_PATH)
          .json(ORIGIN_FILES_PATH)
      ) 

      To read this data and then write it to a Delta table takes about 37 minutes.

      The problem that we noticed is that as the data is read into the PySpark DataFrame, a small percent of it becomes corrupted. Only about 1 in 10 million records become corrupted. This is just a made-up example to illustrate the point:

      // The original JSON record looks like this
      { "Name": "Robert", "Email": "bob@gmail.com", "Nickname": "Bob" }
      
      // When we look in the PySpark DataFrame we see this (for a small percent of records)
      { "Name": "Robertbob@", "Email": "gmail.com", "Nickname": "Bob" }

       

      Essentially, the spark.read() has some deserialization problem that only emerges for high data throughput (> 20 TB/hr).

      When we tried using a smaller dataset (1/4 the size), it didn't show any signs of corruption.

      When we use the same exact code and then parse just one JSON file which contains the record mentioned above, everything works perfectly fine.

      The spark.read() corruption is also not deterministic. If we re-run the 20 TB/hr test, we still see corruption but in different records.

       

      Our Temporary Solution

      What we noticed is that the "spark.sql.files.maxPartitionBytes" was by default set to 128 MB. This meant that for the average JSON files we were reading - which was 400 MB - Spark was making four calls to the Azure Data Lake and fetching a byte range (i.e. the 1st call got bytes 0-128MB, the 2nd call got bytes 128MB-256MB, etc.).

      We increased "spark.sql.files.maxPartitionBytes" to a large number (1 GB) and that made the data corruption problem go away.

       

      How We Think You Can Fix This

      From my understanding, when Spark makes a call for a byte range, it will often "cut off" the data in the middle of a JSON record. Our JSON files are in the JSON Lines format and they contain thousands of lines, each with a JSON record. So calling a byte range from 0 - 128MB will most likely mean that the cutoff point is right in the middle of a JSON record.

      Spark seems to have some code logic which handles this by only processing the "full lines" that are received. But this logic seems to be failing a small percent of the time. Specifically, we have about 50,000 JSON files, that means ~200,000 byte range calls are being made. And spark.read() is creating about 150 corrupt records.

      So we think you should look at the Spark code which is doing this "cut off" handling for byte ranges and see if there's something missing there. Or something in the deserialization logic of spark.read().

      Again, this bug only emerges for high volumes of data transfer (> 20 TB/hr). This could be a "race condition" or some kind of performance-related bug.

      Attachments

        Activity

          People

            Unassigned Unassigned
            saiallu2020 Sai Allu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: