Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4280

DirectStreamObserver for outbound channel can block indefinitely if invoked from inbound channel thread causing deadlock

Details

    Description

      gRPC docs say that: 
              // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming StreamObserver's
              // onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
              // from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else
              // message processing throughput will suffer.

      Looking at the stack, it i because one of the gRPC threads is blocked waiting for the channel to become ready, preventing for that same thread to mark it as ready:
      "grpc-default-executor-0" #12 daemon prio=5 os_prio=0 tid=0x00007fcea88ee800 nid=0x3cc8a waiting on condition [0x00007fce4b9f8000]
         java.lang.Thread.State: WAITING (parking)
              at (C/C++) 0x00007fcead7519f2 (Unknown Source)
              at (C/C++) 0x00007fceac8b8f11 (Unknown Source)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000740e39c48> (a java.util.concurrent.Phaser$QNode)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.Phaser$QNode.block(Phaser.java:1140)
              at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
              at java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
              at java.util.concurrent.Phaser.awaitAdvance(Phaser.java:730)
              at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:51)
              at org.apache.beam.fn.harness.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:117)
              at org.apache.beam.fn.harness.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:53)
              at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:161)
              at org.apache.beam.fn.harness.BeamFnDataWriteRunner$Factory$$Lambda$41/127245540.accept(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612)
              at com.google.cloud.dataflow.integration.synthetic.SyntheticStep.processElement(SyntheticStep.java:93)
              at com.google.cloud.dataflow.integration.synthetic.SyntheticStep$DoFnInvoker.invokeProcessElement(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:622)
              at org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.JavaReadViaImpulse$ReadFromBoundedSourceFn.readSoruce(JavaReadViaImpulse.java:139)
              at org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.JavaReadViaImpulse$ReadFromBoundedSourceFn$DoFnInvoker.invokeProcessElement(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612)
              at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:129)
              at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:461)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1200(FnApiDoFnRunner.java:113)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:612)
              at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
              at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
              at org.apache.beam.fn.harness.FnApiDoFnRunner.processElement(FnApiDoFnRunner.java:408)
              at org.apache.beam.fn.harness.FnApiDoFnRunner$$Lambda$51/1126257907.accept(Unknown Source)
              at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:80)
              at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
              at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:135)
              at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:123)
              at org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:51)
              at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:379)
              at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessageRead.runInContext(ClientCallImpl.java:491)
              at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
              at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:748)

      Attachments

        Issue Links

          Activity

            People

              lcwik Luke Cwik
              lcwik Luke Cwik
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 20m
                  1h 20m