Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-7431

ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • mrv2
    • Reviewed

    Description

      HADOOP-15327 introduced some regressions in the ShuffleHandler.

      1. a memory leak

      ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
      

       
      The Shuffle's channelRead didn't release the message properly, the fix would be this:

            try {
              // ....
            } finally {
              ReferenceCountUtil.release(msg);
            }
      

      Or even simpler:

      extends SimpleChannelInboundHandler<FullHttpRequest>
      

      1. a bug in SSL mode with more than 1 reducers

      It manifested in multiple errors:

      ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
      java.io.IOException: Broken pipe
      
      ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
      java.nio.channels.ClosedChannelException
      
      // if the reducer memory was not enough, then even this:
      Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
      	at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
      	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
      	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
      	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
      Caused by: java.lang.OutOfMemoryError: Java heap space
      	at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
      	at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
      	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
      	at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
      	at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
      

      Configuration - mapred-site.xml

      mapreduce.shuffle.ssl.enabled=true
      

      Alternative is to build a custom jar where FadvisedFileRegion is replaced with FadvisedChunkedFile in sendMapOutput.

      Reproduction

      hdfs dfs -rm -r -skipTrash /tmp/sort_input
      hdfs dfs -rm -r -skipTrash /tmp/sort_output
      yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input
      yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt
      

      ShuffleHandler's protocol

      // HTTP Request
      GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1
      + keep alive headers
      
      // HTTP Response Headers
      content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
      + keep alive headers
      
      // Response Data (transfer-encoding=chunked)
      serialised ShuffleHeader
      content of the MapOutput file (start offset - length)
      serialised ShuffleHeader
      content of the MapOutput file (start offset - length)
      serialised ShuffleHeader
      content of the MapOutput file (start offset - length)
      serialised ShuffleHeader
      content of the MapOutput file (start offset - length)
      ...
      LastHttpContent
      // close socket if no keep-alive
      

      Issues

      • setResponseHeaders: did not always set the the content-length, also the transfer-encoding=chunked header was missing.
      • ReduceMapFileCount.operationComplete: messed up the futures on the LastHttpContent
      • ChannelGroup accepted: is only used to close the channels, no need for that magic 5. See the details here.
      • bossGroup: should have only 1 thread for accepting connections.
      • Shuffle: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile

      Max session open files is not an optimisation, it's actually wasting resources

          // by default maxSessionOpenFiles = 3
          for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) {
            ChannelFuture nextMap = sendMap(reduceContext);
            if(nextMap == null) {
              return;
            }
          }
      

      At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later.

      So this can be simplified a lot:

      sendMap(reduceContext);
      

      My proposal

      Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation.

      • ShuffleChannel
      • ShuffleChannelInitializer
      • ShuffleChannelHandlerContext
      • ShuffleChannelHandler

      TODO:

      • fix/drop/refactor the existing unit tests
      • add proper unit test that tests SSL/non-SSL mode where the response data is properly verified
      • documentation about the protocol

      WIP: github.com/tomicooler/hadoop

      Netty useful docs

      Attachments

        1. chunked.txt
          9 kB
          Tamas Domok
        2. chunked-fileregion.txt
          8 kB
          Tamas Domok
        3. normal.txt
          5 kB
          Tamas Domok
        4. normal-fileregion.txt
          3 kB
          Tamas Domok
        5. sendMapPipeline.png
          26 kB
          Tamas Domok

        Issue Links

          Activity

            People

              tdomok Tamas Domok
              tdomok Tamas Domok
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: