Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28513

Flink Table API CSV streaming sink throws SerializedThrowable exception

    XMLWordPrintableJSON

Details

    Description

      Table API S3 streaming sink (CSV format) throws the following exception,

      Caused by: org.apache.flink.util.SerializedThrowable: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
      at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111) ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
      at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129) ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
      at org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110) ~[flink-csv-1.15.1.jar:1.15.1]
      at org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:642) ~[flink-connector-files-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64) ~[flink-file-sink-common-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:305) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:277) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:270) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:261) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:87) ~[flink-streaming-java-1.15.1.jar:1.15.1]
      at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(AbstractStreamingWriter.java:129) ~[flink-connector-files-1.15.1.jar:1.15.1]
      

      In my table config, I am trying to read from Kafka and write to S3 (s3a) using table API and checkpoint configuration using s3p (presto). Even I tried with a simple datagen example instead of Kafka with local file system as checkpointing (`file:///` instead of `s3p://`) and I am getting the same issue. Exactly it is fails when the code triggers the checkpoint.

      Some related slack conversation and SO conversation here, here and here

      Since there is no work around for S3 table API streaming sink, I am marking this as critical. if this is not a relevant severity, feel free to reduce the priority.

      Attachments

        Issue Links

          Activity

            People

              samrat007 Samrat Deb
              jaya.ananthram Jaya Ananthram
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: