Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9566

Performance regression of FlinkRunner stream mode due to watermark holds update

Details

    Description

      Nexmark tests show that the throughput of FlinkRunner with Rocksdb state backend dropped by 50%~80% in Query 4/5/6/9/11.  Some other queries also dropped but not as much as these queries. Affected queries contain Keyed State.

       

       

      Nexmark tests results. Tests have been run on the same machine and can be reproduced.

       - before regression:

      Performance:
        Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results    (Baseline)
        0000           3.8                     26171.2                      100000
        0001           3.9                     25967.3                       92000
        0002           1.9                     53447.4                         351
        0003           2.8                     35791.0                         580
        0004           2.5                      4045.3                          40
        0005           9.6                     10448.2                          12
        0006           1.2                      8532.4                         401
        0007           4.0                     25018.8                           1
        0008           2.9                     34928.4                        6000
        0009           1.1                      9066.2                         298
        0010           9.5                     10564.1                           2
        0011          11.1                      9005.0                        1919
        0012           4.5                     22075.1                        1919
        0013           4.4                     22547.9                       92000
        0014           9.7                     10261.7                       92000
      

       - after regression:

      Performance:
        Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results    (Baseline)
        0000           4.5                     22036.1                      100000
        0001           3.9                     25839.8                       92000
        0002           2.3                     43763.7                         351
        0003           3.5                     28669.7                         580
        0004           3.6                      2801.1                          40
        0005          22.6                      4429.1                          12
        0006           2.5                      3993.6                         401
        0007           7.5                     13320.9                           1
        0008           2.7                     36737.7                        6000
        0009           2.5                      3930.8                         298
        0010          16.2                      6178.6                           2
        0011          82.9                      1206.3                        1919
        0012           5.9                     16874.8                        1919
        0013           4.2                     23889.2                       92000
      

       

      The regression comes from the "updateWatermarkHold()" function recently added in Flink DoFnOperator in [PR#10534|hhttps://github.com/apache/beam/pull/10534]

      https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140

      It's to allow FinkRunner to set watermark holds. However, the implementation was not performance efficient:

      1. "pendingTimersById" is an interface of the RocksDB key/value map state. Iterate all values via "pendingTimersById.values()" is a big cost. According to the bellow CPU cycles profiling result of Query 4 by [async-profiler|https://github.com/jvm-profiling-tools/async-profiler], we also can see that most of the CPU time was spent on "RocksIterator.next()" brought by "pendingTimersById.values()".
      2. Another overhead is that this function will be called multiple times in each setTimer and deleteTimer;

       

      CPU time profiling result (FlameGraph) of Query 4 in Nexmark:

      https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh

       

       

       

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              Bingfeng Xia Bingfeng Xia
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1.5h
                  1.5h