Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44083

Spark streaming: Add max pending microbatches conf to skip scheduling new mircobatch

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Won't Do
    • 3.4.0
    • None
    • DStreams
    • None

    Description

      In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition. 

      We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. So, Small microbatches can increase latencies further. An RDD's metadata is fetched during the transform phase in our case for various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level :

      val dstreams = ...
      dstreams.transform(rdd =>
       {   
        val uniqueItems = rdd.map(..).distinct.collect
        val metadata = getMedatada(uniqueItems)
        val rddWithMedatadata = rdd.map(...) // adds metadata  
         
        rddWithMedatadata
       })
      

       

      Scheduling many small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue. 

      Proposed changes in JobExecutor.scala on high level:

      val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1)
      
      private def processEvent(event: JobGeneratorEvent): Unit = {
       logDebug("Got event " + event)
       event match {
        case GenerateJobs(time) =>
            if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ 
                  generateJobs(time)
             }else { 
               logWarning("Skipping JobGenerator at " + time)   // TODO: add pending times in queue to log.   
            }
        
        // other current cases
        case ...
         .....
       }
      } 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            dasarianil Anil Dasari
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: