Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34378

Minibatch join disrupted the original order of input records

    XMLWordPrintableJSON

Details

    • Technical Debt
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.19.0
    • 1.19.0
    • Table SQL / Runtime
    • None

    Description

      I'm not sure if it's a bug. The following case can re-produce this situation.

      // add it in CalcITCase
      @Test
      def test(): Unit = {
        env.setParallelism(1)
        val rows = Seq(
          row(1, "1"),
          row(2, "2"),
          row(3, "3"),
          row(4, "4"),
          row(5, "5"),
          row(6, "6"),
          row(7, "7"),
          row(8, "8"))
        val dataId = TestValuesTableFactory.registerData(rows)
      
        val ddl =
          s"""
             |CREATE TABLE t1 (
             |  a int,
             |  b string
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'false'
             |)
           """.stripMargin
        tEnv.executeSql(ddl)
      
        val ddl2 =
          s"""
             |CREATE TABLE t2 (
             |  a int,
             |  b string
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'false'
             |)
           """.stripMargin
        tEnv.executeSql(ddl2)
      
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5))
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
      
        println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
      
        tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
      }

      Result

      +----+---+---+---+---+ 
      | op | a | b | a0| b0| 
      +----+---+---+---+---+ 
      | +I | 3 | 3 | 3 | 3 | 
      | +I | 7 | 7 | 7 | 7 | 
      | +I | 2 | 2 | 2 | 2 | 
      | +I | 5 | 5 | 5 | 5 | 
      | +I | 1 | 1 | 1 | 1 | 
      | +I | 6 | 6 | 6 | 6 | 
      | +I | 4 | 4 | 4 | 4 | 
      | +I | 8 | 8 | 8 | 8 | 
      +----+---+---+---+---+
      
      

      When I do not use minibatch join, the result is :

      +----+---+---+----+----+
      | op | a | b | a0 | b0 |
      +----+---+---+----+----+
      | +I | 1 | 1 |  1 |  1 |
      | +I | 2 | 2 |  2 |  2 |
      | +I | 3 | 3 |  3 |  3 |
      | +I | 4 | 4 |  4 |  4 |
      | +I | 5 | 5 |  5 |  5 |
      | +I | 6 | 6 |  6 |  6 |
      | +I | 7 | 7 |  7 |  7 |
      | +I | 8 | 8 |  8 |  8 |
      +----+---+---+----+----+
       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xuyangzhong xuyang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: