Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12002

offsetRanges attribute missing in Kafka RDD when resuming from checkpoint

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.6.0
    • DStreams, PySpark
    • None

    Description

      SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed the issue of not ending up with non-Kafka RDDs when chaining transforms to Kafka RDDs. It appears that this issue remains for the case where a streaming application using Kafka direct streams is initialized from the checkpoint directory. The following is a representative example where everything works as expected during the first run, but exceptions are thrown on a subsequent run when the context is being initialized from the checkpoint directory.

      test_checkpoint.py
      from pyspark import SparkContext                                                                                            
      from pyspark.streaming import StreamingContext                                                                              
      from pyspark.streaming.kafka import KafkaUtils                                                                              
      
      
      def attach_kafka_metadata(kafka_rdd):                                                                                       
          offset_ranges = kafka_rdd.offsetRanges()                                                                                
                                                                                                                                  
          return kafka_rdd                                                                                                        
                                                                                                                                  
                                                                                                                                  
      def create_context():                                                                                                       
          sc = SparkContext(appName='kafka-test')                                                                                 
          ssc = StreamingContext(sc, 10)                                                                                          
          ssc.checkpoint(CHECKPOINT_URI)                                                                                          
                                                                                                                                  
          kafka_stream = KafkaUtils.createDirectStream(                                                                           
              ssc,                                                                                                                
              [TOPIC],                                                                                                            
              kafkaParams={                                                                                                       
                  'metadata.broker.list': BROKERS,                                                                                
              },                                                                                                                  
          )                                                                                                                       
          kafka_stream.transform(attach_kafka_metadata).count().pprint()                                                          
                                                                                                                                  
          return ssc                                                                                                              
                                                                                                                                  
                                                                                                                                  
      if __name__ == "__main__":                                                                                                  
          ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)                                                      
          ssc.start()                                                                                                             
          ssc.awaitTermination()
      
      Exception on resuming from checkpoint
      Traceback (most recent call last):
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call
          r = self.func(t, *rdds)
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 344, in <lambda>
        File "/home/spark/batch/test_checkpoint.py", line 12, in attach_kafka_metadata
          offset_ranges = kafka_rdd.offsetRanges()
      AttributeError: 'RDD' object has no attribute 'offsetRanges'
      

      Attachments

        Issue Links

          Activity

            People

              jerryshao Saisai Shao
              aramesh Amit Ramesh
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: