Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44772

Reading blocks from remote executors causes timeout issue

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 3.1.2
    • None
    • EC2, PySpark, Shuffle, Spark Core
    • None

    Description

      I'm using EMR 6.5 with Spark 3.1.2

      I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory for executors

      Also speculative mode is on.

      // df.repartition(6000) 

      I see lots of failures with 

      2023-08-11 01:01:09,846 ERROR org.apache.spark.network.server.ChunkFetchRequestHandler (shuffle-server-4-95): Error sending result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]] to /172.31.20.110:36654; closing connection
      java.nio.channels.ClosedChannelException
      	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
      	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
      	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
      	at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
      	at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
      	at org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
      	at org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
      	at org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
      	at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
      	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
      	at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      	at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
      	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
      	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
      	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
      	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.lang.Thread.run(Thread.java:750)
      Caused by: java.io.IOException: Broken pipe
      	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
      	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      	at sun.nio.ch.IOUtil.write(IOUtil.java:51)
      	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
      	at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
      	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111)
      	at org.sparkproject.io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
      	at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
      	at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
      	at org.sparkproject.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
      	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
      	at org.sparkproject.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
      	at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
      	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
      	at org.sparkproject.io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
      	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
      	... 39 more 

       

      I tried to set this for kernel

      ```

      sudo ethtool -K eth0 tso off
      sudo ethtool -K eth0 sg off

      ```

      Didn't work. I guess external shuffle service is not able to send to data to other executors due to some reason.

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            nebiaydin nebi mert aydin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: