Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.10.0
-
None
-
None
Description
Runner attempts to combine shards' numbers computed for the window and following panes with late events even if the window's accumulation mode is set to DISCARDING_FIRED_PANES. This results in an exception thrown by SingletonCombineFn.
Steps to recreate this behaviour:
- create dynamic writer with `withSharding()` option
- send stream of messages to Dataflow job via PubSub
- retain some messages
- let the rest of the messages flow to the job, until the watermark reaches the window's end
- release retained messages
In case all PubSub traffic is halted and released after window's end, Beam won't try to merge them. This only happens, if just a part of messages come as late events.
Stacktrace:
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358) org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448) org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429) org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94) org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Sharding implementation:
class RecordCountSharding[T](recordsPerShard: Int) extends PTransform[PCollection[T], PCollectionView[java.lang.Integer]] { import RecordCountSharding._ override def expand(input: PCollection[T]): PCollectionView[java.lang.Integer] = { val count = input.apply( Combine.globally(Count.combineFn[T]()).withoutDefaults() ) val shardsNum = count.apply( MapElements.into(TypeDescriptors.integers()) .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: java.lang.Long => new java.lang.Integer(getShardsNum(count, recordsPerShard)) }) ) shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows)) } } object RecordCountSharding { val ShardsNumForEmptyWindows = 0 def apply[T](recordsPerShard: Int): RecordCountSharding[T] = { if (recordsPerShard <= 0) { throw new IllegalArgumentException(s"recordsPerShard must be greater than 0! Got $recordsPerShard") } new RecordCountSharding[T](recordsPerShard) } def getShardsNum(count: Long, recordsPerShard: Int): Int = { (count.toFloat / recordsPerShard.toFloat).ceil.toInt } }