Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24578

Reading remote cache block behavior changes and causes timeout issue

    XMLWordPrintableJSON

Details

    Description

      After Spark 2.3, we observed lots of errors like the following in some of our production job

      18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865; closing connection
      java.io.IOException: Broken pipe
      at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
      at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      at sun.nio.ch.IOUtil.write(IOUtil.java:65)
      at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
      at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
      at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
      at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
      at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
      at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
      at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
      at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
      at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
      at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
      at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
      at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
      at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
      at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
      at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
      at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
      at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
      at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
      at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
      at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
      at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
      at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
      at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
      

       

      Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each with 8 cores. Here, the memory of driver and executors are not an import factor here as long as it is big enough, say 20G. 

      val n = 100000000
      val df0 = sc.parallelize(1 to n).toDF
      val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
      ).withColumn("x1", rand()
      ).withColumn("x2", rand()
      ).withColumn("x3", rand()
      ).withColumn("x4", rand()
      ).withColumn("x5", rand()
      ).withColumn("x6", rand()
      ).withColumn("x7", rand()
      ).withColumn("x8", rand()
      ).withColumn("x9", rand())
      
      df.cache; df.count
      
      (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show() }
      

       

      In the above example, we generate a random DataFrame of size around 7G; cache it and then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. This follows the code path of https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

      We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 2.2.1, we found that 

      18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
      18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
      18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
      18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
      18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
      18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
      18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
      18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock for rdd_3_0 
      18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for rdd_3_0 
      18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 538.2 MB, free 11.1 GB)
      

      That is, when a task is scheduled to a host-2 where it needs to read the cache block rdd_3_0 data from host-1, the endpoint of `master.getLocations(..)` ( see https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622) reports a remote cache block is not found and triggered the recompute.  

      I believe this behavior change is introduced by this change set  https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132 

      We have two questions here

      1. what is the right behavior, should we re-compute or should we transfer block from remote?
      2. if we should transfer from remote, why the performance is so bad for cache block?

       

      Attachments

        Activity

          People

            wbzhao Wenbo Zhao
            wbzhao Wenbo Zhao
            Votes:
            0 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: