Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32680

Job vertex names get messed up once there is a source vertex chained with a MultipleInput vertex in job graph

    XMLWordPrintableJSON

Details

    Description

      Take the following test(put it to MultipleInputITCase) as example:

          @Test
          public void testMultipleInputDoesNotChainedWithSource() throws Exception {
              testJobVertexName(false);
          }
          
          @Test
          public void testMultipleInputChainedWithSource() throws Exception {
              testJobVertexName(true);
          }
      
          public void testJobVertexName(boolean chain) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
      
              TestListResultSink<Long> resultSink = new TestListResultSink<>();
      
              DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
              DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
              DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");
      
              KeyedMultipleInputTransformation<Long> transform =
                      new KeyedMultipleInputTransformation<>(
                              "MultipleInput",
                              new KeyedSumMultipleInputOperatorFactory(),
                              BasicTypeInfo.LONG_TYPE_INFO,
                              1,
                              BasicTypeInfo.LONG_TYPE_INFO);
              if (chain) {
                  transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
              }
              KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value -> value % 3;
      
              env.addOperator(
                      transform
                              .addInput(source1.getTransformation(), keySelector)
                              .addInput(source2.getTransformation(), keySelector)
                              .addInput(source3.getTransformation(), keySelector));
      
              new MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");
      
              env.execute();
          }

       

      When we run testMultipleInputDoesNotChainedWithSource , all job vertex names are normal:

      When we run testMultipleInputChainedWithSource (the MultipleInput chained with source1), job vertex names get messed up (all job vertex names contain Source: source1):

       

      I think it's a bug.

      Attachments

        1. image-2023-07-26-15-24-24-077.png
          197 kB
          Lijie Wang
        2. image-2023-07-26-15-23-29-551.png
          209 kB
          Lijie Wang

        Issue Links

          Activity

            People

              JunRuiLi Junrui Li
              wanglijie Lijie Wang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: