Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33932

Support Retry Mechanism in RocksDBStateDataTransfer

    XMLWordPrintableJSON

Details

    Description

      Currently, there is no retry mechanism for downloading and uploading RocksDB state files. Any jittering of remote filesystem might lead to a checkpoint failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can significantly reduce the failure rate of checkpoint during asynchronous phase.
      The exception is as below:

       
      2023-12-19 08:46:00,197 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 2 by task 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job ffffffffa025f19e0000000000000000 at application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
      org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> Calc[133] (184/500)#0.
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          ... 4 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle
          at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
          at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
          at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          ... 3 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle
          at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
          ... 3 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.net.ConnectException: Connection timed out
          at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
          at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) ~[?:?]
          at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
       

      We can support retry mechanism for rocksdb uploader to decrease the failure rate of checkpointing in the async phase.

      Attachments

        Issue Links

          Activity

            People

              xiangyu0xf xiangyu feng
              dianer17 Guojun Li
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated: