Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30536

Sort-merge join operator spilling performance improvements

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.1.0
    • None
    • Spark Core, SQL
    • None

    Description

      Testing with TPC-DS 100 TB benchmark data set showed that some of SQLs (example query 14) are not able to run even with extremely large Spark executor memory.Spark spilling feature has to be enabled, in order to be able to process these SQLs. Processing of SQLs becomes extremely slow when spilling is enabled.The Spark spilling feature is enabled via two parameters: “spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” and “spark.sql.sortMergeJoinExec.buffer.spill.threshold”

      “spark.sql.sortMergeJoinExec.buffer.in.memory.threshold” – when this threshold is reached, the data will be moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter object.

      “spark.sql.sortMergeJoinExec.buffer.spill.threshold” – when this threshold is reached, the data will be spilled from UnsafeExternalSorter object onto the disk.

       

      During execution of sort-merge join (Left Semi Join ) for each left join row “right matches” are found and stored into ExternalAppendOnlyUnsafeRowArrey object.In the case of Query 14 there are millions of rows of “right matches”. To run this query spilling is enabled and data is moved from ExternalAppendOnlyUnsafeRowArrey into UnsafeExternalSorter and then spilled onto the disk.When million rows are processed on left side of the join, the iterator on top of spilled “right matches” rows is created each time. This means that millions of time iterator on top of right matches (that are spilled on the disk) is created.The current Spark implementation creates iterator on top of spilled rows and producing I/0 which results into millions of I/0 when million rows are processed.

       

      To avoid the performance bottleneck this JIRA introducing following solution:

      1. Implement lazy initialization of UnsafeSorterSpillReader - iterator on top of spilled rows:
          … During SortMergeJoin (Left Semi Join) execution, the iterator on the spill data is created but no iteration over the data is done.
         ... Having lazy initialization of UnsafeSorterSpillReader will enable efficient processing of SortMergeJoin even if data is spilled onto disk. Unnecessary I/O will be avoided.

      2. Decrease initial memory read buffer size in UnsafeSorterSpillReader from 1MB to 1KB:
          … UnsafeSorterSpillReader constructor takes lot of time due to size of default 1MB memory read buffer.
          … The code already has logic to increase the memory read buffer if it cannot fit the data, so decreasing the size to 1K is safe and has positive performance impact.

      3. Improve memory utilization when spilling is enabled in ExternalAppendOnlyUnsafeRowArrey:

          … In the current implementation, when spilling is enabled, UnsafeExternalSorter object is created and then data moved from ExternalAppendOnlyUnsafeRowArrey object into UnsafeExternalSorter and then ExternalAppendOnlyUnsafeRowArrey object is emptied. Just before ExternalAppendOnlyUnsafeRowArrey object is emptied there are both objects in the memory with the same data. That require double memory and there is duplication of data. This can be avoided.

          … In the proposed solution, when spark.sql.sortMergeJoinExec.buffer.in.memory.threshold is reached  adding new rows into ExternalAppendOnlyUnsafeRowArray object stops. UnsafeExternalSorter object is created and new rows are added into this object. ExternalAppendOnlyUnsafeRowArray object retains all rows already added into this object. This approach will enable better memory utilization and avoid unnecessary movement of data from one object into another.

       

      The test of this solution with query 14 and enabled spilling on the disk, showed 500X performance improvements and it didn’t degrade performance of the other SQLs from TPC-DS benchmark.

      Attachments

        1. spark-30536-explained.pdf
          558 kB
          shanyu zhao

        Issue Links

          Activity

            People

              Unassigned Unassigned
              siknezevic Sinisa Knezevic
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated: