Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34285 [Umbrella] Test Flink Release 1.19
  3. FLINK-34384

Release Testing: Verify FLINK-33735 Improve the exponential-delay restart-strategy

    XMLWordPrintableJSON

Details

    Description

      Test suggestion:

      1. Prepare a datastream job that all tasks throw exception directly.
        1. Set the parallelism to 5 or above
      2. Prepare some configuration options:
        • restart-strategy.type : exponential-delay
        • restart-strategy.exponential-delay.attempts-before-reset-backoff : 7
      3. Start the cluster: ./bin/start-cluster.sh
      4. Run the job: ./bin/flink run -c className jarName
      5. Check the result
        • Check whether job will be retried 7 times
        • Check the exception history, the list has 7 exceptions
        • Each retries except the last one can see the 5 subtasks(They are concurrent exceptions).

       
       

      Note: Set these options mentioned at step2 at 2 level separately

      • Cluster level: set them in the config.yaml
      • Job level: Set them in the code

       

      Job level demo:

      public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
      
          conf.setString("restart-strategy", "exponential-delay");
          conf.setString("restart-strategy.exponential-delay.attempts-before-reset-backoff", "6");
          StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment(conf);
          env.setParallelism(5);
      
          DataGeneratorSource<Long> generatorSource =
                  new DataGeneratorSource<>(
                          value -> value,
                          300,
                          RateLimiterStrategy.perSecond(10),
                          Types.LONG);
      
          env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator")
                  .map(new RichMapFunction<Long, Long>() {
                      @Override
                      public Long map(Long value) {
                          throw new RuntimeException(
                                  "Excepted testing exception, subtaskIndex: " + getRuntimeContext().getIndexOfThisSubtask());
                      }
                  })
                  .print();
      
          env.execute();
      } 

      Attachments

        1. screenshot-1.png
          225 kB
          lincoln lee
        2. image-2024-02-07-13-49-03-024.png
          307 kB
          Caican Cai
        3. image-2024-02-06-15-05-05-331.png
          422 kB
          Rui Fan

        Issue Links

          Activity

            People

              caicancai Caican Cai
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: