Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
State is set to null while active timer is present, this issue does not show in other runners.
The following example will reach the IllegalStateException within 10-20 times of it being run. LOOP_COUNT does not seem to be a factor as it reproduces with 100 or 100000 LOOP_COUNT. The number of keys is a factor as it did not reproduce with only one key, have not tried with more than 3 keys to see if it's easier to reproduce.
package test; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.joda.time.Instant; import java.util.Optional; public class Test { public static void main (String [] args) throws Exception{ Test.testToFailure(); } public static void testToFailure() throws Exception { int count = 0; while (true) { failingTest(); System.out.println( String.format("Got to Count %s", String.valueOf(count++))); } } public static void failingTest() throws Exception { Pipeline p = Pipeline.create(); Instant now = Instant.now(); TestStream<Integer> stream = TestStream.create(BigEndianIntegerCoder.of()) .addElements(1) .advanceWatermarkTo(now.plus(Duration.standardSeconds(1))) .addElements(2) .advanceWatermarkTo(now.plus(Duration.standardSeconds(1))) .addElements(3) .advanceWatermarkToInfinity(); p.apply(stream) .apply(WithKeys.of(x -> x)) .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of())) .apply(new TestToFail()); p.run(); } public static class TestToFail extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) { return input.apply(ParDo.of(new LoopingRead())); } } public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> { static int LOOP_COUNT = 100; @StateId("value") private final StateSpec<ValueState<Integer>> value = StateSpecs.value(BigEndianIntegerCoder.of()); @StateId("count") private final StateSpec<ValueState<Integer>> count = StateSpecs.value(BigEndianIntegerCoder.of()); @TimerId("actionTimers") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement public void processElement( ProcessContext c, @StateId("value") ValueState<Integer> value, @TimerId("actionTimers") Timer timers) { value.write(c.element().getValue()); timers.set(c.timestamp().plus(Duration.millis(1000))); } /** */ @OnTimer("actionTimers") public void onTimer( OnTimerContext c, @StateId("value") ValueState<Integer> value, @StateId("count") ValueState<Integer> count, @TimerId("actionTimers") Timer timers) { if (value.read() == null) { throw new IllegalStateException("BINGO!"); } Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1; count.write(counter); value.write(value.read() + counter); if (counter < LOOP_COUNT) { timers.set(c.timestamp().plus(Duration.standardSeconds(1))); } } } }