Details
-
Bug
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
2.4.0
-
None
-
None
Description
I have a very strange and hard to reproduce issue when using kafka direct streaming, version 2.4.0
From time to time, maybe once a day - once a week I get following error
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 19/01/29 13:10:00 ERROR apps.BusinessRuleEngine: Job failed. Stopping JVM java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
I have 10+ spark jobs consuming 100+ partitions in total, and this happens really seldom. Adding some logging code to DirectKafkaInputDStream revealed that method latestOffsets returns offsets that are lower than currentOffsets. Inspecting kafka broker logs didn't show any suspicious events - no topic leader change, retention etc.
I also changed the way latest offsets are retrieved - used consumer#endOffsets. It turned out that this change fixed the issue, it returned correct end offsets, issue does not reproduce any more.
The problem is that I have no idea how to reproduce this manually. The code change seems reasonable, I created a corresponding PR.
Please take a look at PR - https://github.com/apache/spark/pull/24218
Attachments
Issue Links
- is related to
-
KAFKA-7703 KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
- Resolved
- links to