Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5909

QueryablePipeline fails to lookup UserState with Timers

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • None
    • 2.9.0
    • sdk-java-core

    Description

      QueryablePipeline fails when given a ParDo with both state and timers, similar to the example in https://beam.apache.org/blog/2017/08/28/timely-processing.html#event-time-timers:

      java.lang.IllegalArgumentException: expected one element but was: <org.apache.beam.sdk.values.PCollection.<init>:402#3d93cb799b3970be, expiry>
              at org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
              at org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
              at org.apache.beam.runners.core.construction.graph.QueryablePipeline.lambda$getUserStates$10(QueryablePipeline.java:362)
              at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
              at java.util.Iterator.forEachRemaining(Iterator.java:116)
              at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
              at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
              at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
              at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
              at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
              at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
              at org.apache.beam.runners.core.construction.graph.QueryablePipeline.getUserStates(QueryablePipeline.java:372)
              at org.apache.beam.runners.core.construction.graph.GreedyStageFuser.forGrpcPortRead(GreedyStageFuser.java:92)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuseSiblings(GreedyPipelineFuser.java:340)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fusePipeline(GreedyPipelineFuser.java:142)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:75)
              at org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:88)
              at org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:96)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
              at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m