Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44756

Executor hangs when RetryingBlockTransferor fails to initiate retry

    XMLWordPrintableJSON

Details

    Description

      We have been observing this issue several times in our production where some executors are being stuck at BlockTransferService#fetchBlockSync().

      After some investigation, the issue seems to be caused by an unhandled edge case in RetryingBlockTransferor.

      1. Shuffle transfer fails for whatever reason

      java.io.IOException: Cannot allocate memory
      	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
      	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      	at sun.nio.ch.IOUtil.write(IOUtil.java:51)
      	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
      	at org.apache.spark.network.shuffle.SimpleDownloadFile$SimpleDownloadWritableChannel.write(SimpleDownloadFile.java:78)
      	at org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onData(OneForOneBlockFetcher.java:340)
      	at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
      	at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
      	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      

      2. The above exception caught by AbstractChannelHandlerContext#invokeChannelRead(), and propagated to the exception handler

      3. Exception reaches RetryingBlockTransferor#initiateRetry(), and it tries to initiate retry

      23/08/09 16:58:37 shuffle-client-4-2 INFO RetryingBlockTransferor: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
      

      4. Retry initiation fails (in our case, it fails to create a new thread)

      5. Exception caught by AbstractChannelHandlerContext#invokeExceptionCaught(), and not further processed

      23/08/09 16:58:53 shuffle-client-4-2 DEBUG AbstractChannelHandlerContext: An exception java.lang.OutOfMemoryError: unable to create new native thread
      	at java.lang.Thread.start0(Native Method)
      	at java.lang.Thread.start(Thread.java:719)
      	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
      	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
      	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
      	at org.apache.spark.network.shuffle.RetryingBlockTransferor.initiateRetry(RetryingBlockTransferor.java:182)
      	at org.apache.spark.network.shuffle.RetryingBlockTransferor.access$500(RetryingBlockTransferor.java:43)
      	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferFailure(RetryingBlockTransferor.java:230)
      	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchFailure(RetryingBlockTransferor.java:260)
      	at org.apache.spark.network.shuffle.OneForOneBlockFetcher.failRemainingBlocks(OneForOneBlockFetcher.java:318)
      	at org.apache.spark.network.shuffle.OneForOneBlockFetcher.access$300(OneForOneBlockFetcher.java:55)
      	at org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onFailure(OneForOneBlockFetcher.java:357)
      	at org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:56)
      	at org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:231)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
      

      6. After all, retry never happens and the executor thread ends up being stuck at BlockTransferService#fetchBlockSync(), waiting for the transfer to complete/fail

      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
      scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
      scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
      scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
      org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:293)
      org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
      org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1154)
      org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1098)
      

       

      Attachments

        Activity

          People

            hdaikoku Harunobu Daikoku
            hdaikoku Harunobu Daikoku
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: