Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24151

KafkaSink fails with setMaxConcurrentCheckpoints being enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Done
    • 1.14.0
    • 1.14.0
    • Connectors / Kafka
    • None

    Description

      We experienced a RuntimeException in a test run for FLINK-23850 :

      java.lang.RuntimeException: Failed to send data to Kafka: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
              at org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141) ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
      Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
      

      Test job executed:

              Configuration config = new Configuration();
              config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
              env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
              env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
              env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
              env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
              env.setParallelism(6);
      
              final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      
              tableEnv.createTable("T1",
                      TableDescriptor.forConnector("kafka")
                              .schema(Schema.newBuilder()
                                      .column("pk", DataTypes.STRING().notNull())
                                      .column("x", DataTypes.STRING().notNull())
                                      .build())
                              .option("topic", "flink-23850-in1")
                              .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS)
                              .option("value.format", "csv")
                              .option("scan.startup.mode", "earliest-offset")
                              .build());
      
              final Table resultTable =
                      tableEnv.sqlQuery(
                              "SELECT "
                                      + "T1.pk, "
                                      + "'asd', "
                                      + "'foo', "
                                      + "'bar' "
                                      + "FROM T1");
      
              tableEnv.createTable("T4",
                      TableDescriptor.forConnector("kafka")
                              .schema(Schema.newBuilder()
                                      .column("pk", DataTypes.STRING().notNull())
                                      .column("some_calculated_value", DataTypes.STRING())
                                      .column("pk1", DataTypes.STRING())
                                      .column("pk2", DataTypes.STRING())
                                      .build())
                              .option("topic", "flink-23850-out")
                              .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS)
                              .option("value.format", "csv")
                              .option("sink.delivery-guarantee", "exactly-once")
                              .option("sink.transactional-id-prefix", "flink-23850")
                              .option("scan.startup.mode", "earliest-offset")
                              .build());
      
              resultTable.executeInsert("T4");
      

      Attachments

        1. release-testing-run-6.tar.gz
          113 kB
          Matthias Pohl

        Issue Links

          Activity

            People

              arvid Arvid Heise
              mapohl Matthias Pohl
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: