Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32642

The upsert mode doesn't work for the compound keys

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.17.0, 1.17.1
    • None
    • Table SQL / Client
    • None

    Description

       
      Hi, the issue can be produced by following below steps:
       
      1. Create two tables in sqlserver, one is the sink table, the other one is cdc source table, the sink table has a compound key(date_str,time_str).
      2. Create the corresponding flink tables in sql-client.
       

      --create sink table in sqlserver
      CREATE TABLE cumulative_cnt (
        date_str VARCHAR(50),
        time_str VARCHAR(50),
        cnt INTEGER
        CONSTRAINT PK_cumulative_cnt PRIMARY KEY (date_str,time_str)
      );
      
      --create source cdc table in sqlserver
      CREATE TABLE user_behavior (
        id INTEGER NOT NULL IDENTITY(101,1) PRIMARY KEY,
        create_date datetime NOT NULL,
        click_event VARCHAR(255) NOT NULL
      );
      
      EXEC sys.sp_cdc_enable_table  
      @source_schema = N'dbo',  
      @source_name   = N'user_behavior',  
      @role_name     = NULL,  
      @supports_net_changes = 1  
      GO
      
      --create flink tables through sql-client
      CREATE TABLE cumulative_cnt (
          date_str STRING,
          time_str STRING,
          cnt BIGINT,
          PRIMARY KEY (date_str, time_str)NOT ENFORCED
        )  WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:sqlserver://xxxx:1433;databaseName=xxxx',
          'username' = 'xxxx',
          'password' = 'xxxx',
          'table-name' = 'cumulative_cnt'
      );
      
      --create flink cdc table through sql-client
      CREATE TABLE user_behavior (
         id int,
         create_date TIMESTAMP(0),
         click_event STRING
       ) WITH (
          'connector' = 'sqlserver-cdc',
          'hostname' = 'xxxx',
          'port' = '1433',
          'username' = 'xxxx',
          'password' = 'xxxx',
          'database-name' = 'xxxx',
          'schema-name' = 'dbo',
          'table-name' = 'user_behavior'
       ); 

      3. Run below sql through sql-client to start the job for capturing the cdc data and do the aggregation and finally insert the result into target table.

      insert into cumulative_cnt
      select
      date_str,
      max(time_str) as time_str,
      count(*) as cnt
      from
      (
        select
        DATE_FORMAT(create_date, 'yyyy-MM-dd') as date_str,
        SUBSTR(DATE_FORMAT(create_date, 'HH:mm'),1,4) || '0' as time_str
        from user_behavior
      ) group by date_str; 
      
      
      

      4. Insert two records for testing.

      INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 
      01:01:00','click1');
      INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 02:20:00','click1');

      5. Checked the result in db( pls see the screen 1) and found that the target table only have one record, but it is not the expectation cause the two source records have different time, thus the compound key(date_str,  time_str) shoud be different( pls see the screen 2 )
       
      There should be two records in the target table:
      2023-06-01    01:00    1
      2023-06-01    02:20    2
       
      screen 1

       
      screen 2

       
       
       
       

      Attachments

        1. image-2023-07-21-23-55-47-399.png
          11 kB
          jasonliangyc
        2. image-2023-07-21-23-56-56-543.png
          16 kB
          jasonliangyc
        3. image-2023-07-21-23-57-22-186.png
          11 kB
          jasonliangyc
        4. image-2023-07-22-00-11-56-363.png
          16 kB
          jasonliangyc

        Activity

          People

            Unassigned Unassigned
            jasonliangyc jasonliangyc
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: