Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
In a pipeline with unbounded input, if a user defines a custom trigger and does not specify a specific non-zero withNumShards, they may see an IllegalArgumentException at runtime due to incompatible windows.
For example, consider this compound trigger:
Window.into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10)))))
.discardingFiredPanes()
Using that windowing without specifying sharding yields:
Inputs to Flatten had incompatible triggers:Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), AfterSynchronizedProcessingTime.pastFirstElementInPane()))
Without explicit sharding, WriteFiles creates both a sharded and unsharded collection; the first goes through one GroupByKey while the other goes through 2. These two collections are then flattened together and they have incompatible triggers due to the double-grouped collection using a continuation trigger.
If the user instead specifies numShards, then a different code path is followed that avoids this incompatibility.
It looks like WriteFiles may need to be implemented differently to avoid combining collections with potentially incompatible triggers.