Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45134

Data duplication may occur when fallback to origin shuffle block

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 3.2.0, 3.3.0, 3.4.0, 3.5.0
    • None
    • Shuffle

    Description

      One possible situation that has been found is that, during the process of requesting mergedBlockMeta, when the channel is closed, it may trigger two callback callbacks and result in duplicate data for the original shuffle blocks.

      1. The first time is when the channel is inactivated, the responseHandler will execute the callback for all outstandingRpcs.
      2. The second time is when the listener corresponding to shuffleClient.writeAndFlush executes the callback after the channel is closed.

      Some Error Logs:

      23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still have 1 requests outstanding when connection from host/ip:prot is closed
      23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get the meta of push-merged block for (3, 54) from host:port
      java.io.IOException: Connection from host:port closed
              at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
              at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
              at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
              at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
              at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
              at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
              at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
              at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
              at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
              at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
              at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
              at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
              at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
              at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
              at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
              at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
              at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
              at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
              at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
              at java.lang.Thread.run(Thread.java:745)
       
      23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get the meta of push-merged block for (3, 54) from host:port
      java.io.IOException: Failed to send RPC RPC 8079698359363123411 to host/ip:port: java.nio.channels.ClosedChannelException
              at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433)
              at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409)
              at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
              at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
              at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
              at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
              at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
              at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
              at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
              at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
              at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
              at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
              at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
              at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
              at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
              at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
              at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
              at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
              at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
              at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
              at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
              at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.nio.channels.ClosedChannelException
              at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
              ... 18 more 

      Attachments

        Activity

          People

            Unassigned Unassigned
            gaoyajun02 gaoyajun02
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: