Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29852

Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.4.4, 3.0.0
    • None
    • Spark Core, SQL
    • None
    • Patch

    Description

      Both RDD and Dataset APIs have 2 methods of collecting data from executors to driver:

       

      1. .collect() setup multiple threads in a job and dump all data from executor into drivers memory. This is great if data on driver needs to be accessible ASAP, but not as efficient if access to partitions can only happen sequentially, and outright risky if driver doesn't have enough memory to hold all data.
      • the solution for issue SPARK-25224 partially alleviate this by delaying deserialisation of data in InternalRow format, such that only the much smaller serialised data needs to be entirely hold by driver memory. This solution does not abide O(1) memory consumption, thus does not scale to arbitrarily large dataset
      1. .toLocalIterator() fetch one partition in 1 job at a time, and fetching of the next partition does not start until sequential access to previous partition has concluded. This action abides O(1) memory consumption and is great if access to data is sequential and significantly slower than the speed where partitions can be shipped from a single executor, with 1 thread. It becomes inefficient when the sequential access to data has to wait for a relatively long time for the shipping of the next partition

      The proposed solution is a crossover between two existing implementations: a concurrent subroutine that is both CPU and memory bounded. The solution allocate a fixed sized resource pool (by default = number of available CPU cores) that serves the shipping of partitions concurrently, and block sequential access to partitions' data until shipping is finished (which usually happens without blocking for partitionID >=2 due to the fact that shipping start much earlier and preemptively). Tenants of the resource pool can be GC'ed and evicted once sequential access to it's data has finished, which allows more partitions to be fetched much earlier than they are accessed. The maximum memory consumption is O(m * n), where m is the predefined concurrency and n is the size of the largest partition.

      The following scala code snippet demonstrates a simple implementation:

       

      (requires scala 2.11 + and ScalaTests)

       

      package org.apache.spark.spike
      
      import java.util.concurrent.ArrayBlockingQueue
      
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.{FutureAction, SparkContext}
      import org.scalatest.FunSpec
      
      import scala.concurrent.Future
      import scala.language.implicitConversions
      import scala.reflect.ClassTag
      import scala.util.{Failure, Success, Try}
      
      class ToLocalIteratorPreemptivelySpike extends FunSpec {
      
        import ToLocalIteratorPreemptivelySpike._
      
        lazy val sc: SparkContext = SparkSession.builder().master("local[*]").getOrCreate().sparkContext
      
        it("can be much faster than toLocalIterator") {
      
          val max = 80
          val delay = 100
      
          val slowRDD = sc.parallelize(1 to max, 8).map { v =>
            Thread.sleep(delay)
            v
          }
      
          val (r1, t1) = timed {
            slowRDD.toLocalIterator.toList
          }
      
          val capacity = 4
          val (r2, t2) = timed {
            slowRDD.toLocalIteratorPreemptively(capacity).toList
          }
      
          assert(r1 == r2)
          println(s"linear: $t1, preemptive: $t2")
          assert(t1 > t2 * 2)
          assert(t2 > max * delay / capacity)
        }
      }
      
      object ToLocalIteratorPreemptivelySpike {
      
        case class PartitionExecution[T: ClassTag](
            @transient self: RDD[T],
            id: Int
        ) {
      
          def eager: this.type = {
            AsArray.future
            this
          }
      
          case object AsArray {
      
            @transient lazy val future: FutureAction[Array[T]] = {
              var result: Array[T] = null
      
              val future = self.context.submitJob[T, Array[T], Array[T]](
                self,
                _.toArray,
                Seq(id), { (_, data) =>
                  result = data
                },
                result
              )
      
              future
            }
      
            @transient lazy val now: Array[T] = future.get()
          }
        }
      
        implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {
      
          import scala.concurrent.ExecutionContext.Implicits.global
      
          def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
            val executions = self.partitions.indices.map { ii =>
              PartitionExecution(self, ii)
            }
      
            val buffer = new ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)
      
            Future {
              executions.foreach { exe =>
                buffer.put(Success(exe)) // may be blocking due to capacity
                exe.eager // non-blocking
              }
            }.onFailure {
              case e: Throwable =>
                buffer.put(Failure(e))
            }
      
            self.partitions.indices.toIterator.map { _ =>
              val exe = buffer.take().get
              exe.AsArray.now
            }
          }
      
          def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {
      
            _toLocalIteratorPreemptively(capacity).flatten
          }
        }
      
        def timed[T](fn: => T): (T, Long) = {
          val startTime = System.currentTimeMillis()
          val result = fn
          val endTime = System.currentTimeMillis()
          (result, endTime - startTime)
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              peng Peng Cheng
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: