Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10329

Dataflow runner does not reserve timestamp of Create.Timestamped() in batch stateful dofn

Details

    Description

      When run a test pipeline such as

      public void testTimestampedValue() throws Exception {
            final String timerId = "foo";
      
            DoFn<KV<String, Long>, KV<Long, Instant>> statefn =
                new DoFn<KV<String, Long>, KV<Long, Instant>>() {
      
                  @TimerId(timerId)
                  private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
      
                  @ProcessElement
                  public void processElement(
                      @TimerId(timerId) Timer timer,
                      @Timestamp Instant timestamp,
                      OutputReceiver<KV<Long, Instant>> r) {
                    r.output(KV.of(3L, timestamp));
                  }
      
                  @OnTimer(timerId)
                  public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
                    // do nothing. Since whether timer is involved doesn’t make difference
                  }
                };
      
            PCollection<KV<Long, Instant>> output =
                pipeline
                    .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 37L), new Instant(123L))))
                    .apply(ParDo.of(statefn));
      
            PAssert.that(output).containsInAnyOrder(KV.of(3L, new Instant(123L)));
            pipeline.run();
      }
      
      

      On dataflow with fnapi, the timestamp of output in PAssert won't match.

      Attachments

        Issue Links

          Activity

            People

              yichi Yichi Zhang
              yichi Yichi Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h