Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27281

Wrong latest offsets returned by DirectKafkaInputDStream#latestOffsets

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 2.4.0
    • None
    • DStreams
    • None

    Description

      I have a very strange and hard to reproduce issue when using kafka direct streaming, version 2.4.0
      From time to time, maybe once a day - once a week I get following error

      java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
      at scala.Predef$.require(Predef.scala:224)
      at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
      at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
      19/01/29 13:10:00 ERROR apps.BusinessRuleEngine: Job failed. Stopping JVM
      java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
      at scala.Predef$.require(Predef.scala:224)
      at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
      at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
      java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
      at scala.Predef$.require(Predef.scala:224)
      at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
      at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

       

      I have 10+ spark jobs consuming 100+ partitions in total, and this happens really seldom. Adding some logging code to DirectKafkaInputDStream revealed that method latestOffsets returns offsets that are lower than currentOffsets. Inspecting kafka broker logs didn't show any suspicious events - no topic leader change, retention etc.

      I also changed the way latest offsets are retrieved - used consumer#endOffsets. It turned out that this change fixed the issue, it returned correct end offsets, issue does not reproduce any more.

      The problem is that I have no idea how to reproduce this manually. The code change seems reasonable, I created a corresponding PR.

      Please take a look at PR - https://github.com/apache/spark/pull/24218

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              vkrot Viacheslav Krot
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: