Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10122

AttributeError: 'RDD' object has no attribute 'offsetRanges'

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.5.0
    • DStreams, PySpark

    Description

      SPARK-8389 added the offsetRanges interface to Kafka direct streams. This however appears to break when chaining operations after a transform operation. Following is example code that would result in an error (stack trace below). Note that if the 'count()' operation is taken out of the example code then this error does not occur anymore, and the Kafka data is printed.

      kafka_test.py
      from pyspark import SparkContext
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      
      def attach_kafka_metadata(kafka_rdd):
          offset_ranges = kafka_rdd.offsetRanges()
      
          return kafka_rdd
      
      
      if __name__ == "__main__":
          sc = SparkContext(appName='kafka-test')
          ssc = StreamingContext(sc, 10)
      
          kafka_stream = KafkaUtils.createDirectStream(
              ssc,
              [TOPIC],
              kafkaParams={
                  'metadata.broker.list': BROKERS,
              },
          )
          kafka_stream.transform(attach_kafka_metadata).count().pprint()
      
          ssc.start()
          ssc.awaitTermination()
      
      Stack trace
      Traceback (most recent call last):
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call
          r = self.func(t, *rdds)
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda>
          self.func = lambda t, rdd: func(t, prev_func(t, rdd))
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda>
          self.func = lambda t, rdd: func(t, prev_func(t, rdd))
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda>
          self.func = lambda t, rdd: func(t, prev_func(t, rdd))
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda>
          self.func = lambda t, rdd: func(t, prev_func(t, rdd))
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 332, in <lambda>
          func = lambda t, rdd: oldfunc(rdd)
        File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in attach_kafka_metadata
          offset_ranges = kafka_rdd.offsetRanges()
      AttributeError: 'RDD' object has no attribute 'offsetRanges'
      

      Attachments

        Issue Links

          Activity

            People

              jerryshao Saisai Shao
              aramesh Amit Ramesh
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: