Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11971

Direct Runner State is null while active timers exist

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • runner-direct
    • None

    Description

      State is set to null while active timer is present, this issue does not show in other runners.

      The following example will reach the IllegalStateException within 10-20 times of it being run. LOOP_COUNT does not seem to be a factor as it reproduces with 100 or 100000 LOOP_COUNT. The number of keys is a factor as it did not reproduce with only one key, have not tried with more than 3 keys to see if it's easier to reproduce. 
       

      package test;
      
      import org.apache.beam.sdk.Pipeline;
      import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
      import org.apache.beam.sdk.coders.KvCoder;
      import org.apache.beam.sdk.state.StateSpec;
      import org.apache.beam.sdk.state.StateSpecs;
      import org.apache.beam.sdk.state.TimeDomain;
      import org.apache.beam.sdk.state.Timer;
      import org.apache.beam.sdk.state.TimerSpec;
      import org.apache.beam.sdk.state.TimerSpecs;
      import org.apache.beam.sdk.state.ValueState;
      import org.apache.beam.sdk.testing.TestStream;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.PTransform;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.transforms.WithKeys;
      import org.apache.beam.sdk.values.KV;
      import org.apache.beam.sdk.values.PCollection;
      import org.joda.time.Duration;
      import org.joda.time.Instant;
      
      import java.util.Optional;
       
      
      public class Test {
      
         public static void main (String [] args) throws Exception{
             Test.testToFailure();
      
         }
      
         public static void testToFailure() throws Exception {
             int count = 0;
      
             while (true) {
                 failingTest();
                 System.out.println(
                         String.format("Got to Count %s", String.valueOf(count++)));
             }
         }
      
         public static void failingTest() throws Exception {
             Pipeline p = Pipeline.create();
      
             Instant now = Instant.now();
             TestStream<Integer> stream =
                     TestStream.create(BigEndianIntegerCoder.of())
                             .addElements(1)
                             .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
                             .addElements(2)
                             .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
                             .addElements(3)
                             .advanceWatermarkToInfinity();
      
             p.apply(stream)
                     .apply(WithKeys.of(x -> x))
                     .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()))
                     .apply(new TestToFail());
             p.run();
         }
      
         public static class TestToFail
                 extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> {
      
             @Override
             public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) {
                 return input.apply(ParDo.of(new LoopingRead()));
             }
         }
      
         public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {
      
             static int LOOP_COUNT = 100;
      
             @StateId("value")
             private final StateSpec<ValueState<Integer>> value =
                     StateSpecs.value(BigEndianIntegerCoder.of());
      
             @StateId("count")
             private final StateSpec<ValueState<Integer>> count =
                     StateSpecs.value(BigEndianIntegerCoder.of());
      
             @TimerId("actionTimers")
             private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
      
             @ProcessElement
             public void processElement(
                     ProcessContext c,
                     @StateId("value") ValueState<Integer> value,
                     @TimerId("actionTimers") Timer timers) {
      
                 value.write(c.element().getValue());
                 timers.set(c.timestamp().plus(Duration.millis(1000)));
             }
      
             /** */
             @OnTimer("actionTimers")
             public void onTimer(
                     OnTimerContext c,
                     @StateId("value") ValueState<Integer> value,
                     @StateId("count") ValueState<Integer> count,
                     @TimerId("actionTimers") Timer timers) {
      
                 if (value.read() == null) {
                     throw new IllegalStateException("BINGO!");
                 }
                 Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
                 count.write(counter);
                 value.write(value.read() + counter);
      
                 if (counter < LOOP_COUNT) {
                     timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
                 }
             }
         }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            rarokni@gmail.com Reza ardeshir rokni
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 8h 50m
                8h 50m