Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32139

Data accidentally deleted and not deleted when upsert sink to hbase

    XMLWordPrintableJSON

Details

    Description

      Problem background

      We meet data accidental deletion and non deletion issues when synchronizing MySQL cdc data to HBase using HBase connectors.

      Reproduction steps

      1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkUpsertMaterializer is tunned off by setting table.exec.sink.upsert-materialize = 'NONE'

      MySQL table schema is as follows。

      CREATE TABLE `source_sample_1001` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(200) DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      `weight` float DEFAULT NULL,
      PRIMARY KEY (`id`)
      );

      The source table definition in Flink is as follows.

      CREATE TABLE `source_sample_1001` (
          `id` bigint,
          `name` String,
          `age` bigint,
          `weight` float,
        PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'mysql-cdc' ,
      'hostname' = '${ip}',
      'port' = '3306',
      'username' = '${user}',
      'password' = '${password}',
      'database-name' = 'testdb_0010',
      'table-name' = 'source_sample_1001'
      );

      HBase sink table are created in testdb_0011 namespace.

      CREATE 'testdb_0011:source_sample_1001', 'data'
      ​
      describe 'testdb_0011:source_sample_1001'
       
      # describe output
      Table testdb_0011:source_sample_1001 is ENABLED                                                                                                                                                         
      testdb_0011:source_sample_1001                                                                                                                                                                          
      COLUMN FAMILIES DESCRIPTION                                                                                                                                                                             {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
      

       

                                                                                                                                     
      The sink table definition in Flink.

      CREATE TABLE `hbase_sink1` (
          `id` STRING COMMENT 'unique id',
          `data` ROW<
              `name` string,
              `age` string,
              `weight` string
          >,
          primary key(`id`) not enforced
      ) WITH (
        'connector' = 'hbase-2.2',
        'table-name' = 'testdb_0011:source_sample_1001',
        'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
      );

      DML in flink to synchronize data.

      INSERT INTO `hbase_sink1` SELECT
          `id`, row(`name`, `age`, `weight`)
      FROM (
          SELECT
              REVERSE(CONCAT_WS('', CAST(id AS VARCHAR ))) as id,
              `name`, cast(`age` as varchar) as `age`, cast(`weight` as varchar) as `weight`
          FROM `source_sample_1001`
      ) t;

      2、Another flink job sinks datagen data to the MySQL table source_sample_1001 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。

      CREATE TABLE datagen_source (
          `id` int,
          `name` String,
          `age` int,
          `weight` int
      ) WITH (
         'connector' = 'datagen',
        'fields.id.kind' = 'random',
        'fields.id.min' = '1',
        'fields.id.max' = '10000',
        'fields.name.length' = '20',
        'fields.age.min' = '1',
        'fields.age.max' = '150',
        'fields.weight.min' = '5',
        'fields.weight.max' = '300',
        'rows-per-second' = '5000'
      );
      ​
      CREATE TABLE `source_sample_1001` (
          `id` bigint,
          `name` String,
          `age` bigint,
          `weight` float,
        PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
        'table-name' = 'source_sample_1001',
        'username' = '${user}',
        'password' = '${password}',
        'sink.buffer-flush.max-rows' = '500',
        'sink.buffer-flush.interval' = '1s'
      );
      ​
      -- dml
      INSERT INTO `source_sample_1001` SELECT `id`, `name`, `age`, cast(`weight` as float) FROM `datagen_source`;

      3、A bash script deletes the MySQL table source_sample_1001 with batch 10.

      #!/bin/bash
      ​
      mysql1="mysql -h${ip} -u${user} -p${password}"
      batch=10
      ​
      for ((i=1; ;i++)); do
      echo "iteration $i start"
      for ((j=1; j<=10000; j+=10)); do
        $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))"
      done
      echo "iteration $i end"
      sleep 10
      done

      4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem.

      5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.

      6、Check the hbase table and reproduce the issue of data loss. As shown below, 67 records were lost in a test.

      hbase(main):006:0> count 'testdb_0011:source_sample_1001'                                                   
      9933 row(s)
      Took 0.8724 seconds                                                                                                                                                                                     
      => 9933

      Find out a missing record and check the raw data in HBase.

      hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
      COLUMN                                             CELL                                                                                                                                                
      0 row(s)
      Took 0.0029 seconds                                                                                                                                                                                     
      hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
      ROW                                                 COLUMN+CELL                                                                                                                                         
      24                                                 column=data:name, timestamp=2023-05-20T21:17:44.884, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:43.769, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:42.902, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:41.614, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:40.885, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:40.841, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:39.788, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.799, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.688, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.650, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:34.885, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.905, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.803, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.693, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:31.784, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:30.684, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:29.812, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:29.590, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:28.876, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:28.565, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:27.879, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:27.699, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:24.209, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870                                                                     
      24                                                 column=data:name, timestamp=2023-05-20T21:17:22.480, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:20.716, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:18.678, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:17.720, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:16.858, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:16.682, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:15.753, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:14.571, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:11.572, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:09.681, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:08.792, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:05.888, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:05.754, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:03.626, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:02.652, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:01.790, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:17:00.986, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:59.797, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:58.982, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:58.626, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:58.149, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:56.610, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:51.655, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:51.458, type=Delete                                                                                    
      24                                                 column=data:name, timestamp=2023-05-20T21:16:44.860, type=Delete                                                                                    
      1 row(s)
      Took 0.1466 seconds                                                                                  

      7、Start the bash script to delete all data of the MySQL table. Waiting for the sink hbase job to read all the binlog of MySQL table source_sample_1001.

      6、Check the hbase table and reproduce the issue of data no deletion. As shown below, 6 records were not deleted in the test.

      hbase(main):012:0> count 'testdb_0011:source_sample_1001'
      6 row(s)
      Took 0.5121 seconds                                                                                                                                                                                     
      => 6

      Check the raw data of a record in HBase.

      hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
      COLUMN                                             CELL                                                                                                                                                
      data:name                                         timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                                                                                       
      1 row(s)
      Took 0.0037 seconds                                                                                                                                                                                     
      hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
      ROW                                                 COLUMN+CELL                                                                                                                                         
      3668                                               column=data:name, timestamp=2023-05-20T21:17:45.728, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:44.693, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:43.854, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:41.721, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:40.763, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:37.872, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:32.573, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:26.811, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:24.310, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:23.508, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:22.788, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b                                                                     
      3668                                               column=data:name, timestamp=2023-05-20T21:17:21.746, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:17.761, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:12.610, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:11.909, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:07.846, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:06.901, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:06.758, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:06.569, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:02.689, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:17:00.344, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:59.961, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:59.415, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:58.916, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:58.718, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:58.339, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:56.340, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:55.883, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:55.683, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:55.056, type=Delete                                                                                    
      3668                                               column=data:name, timestamp=2023-05-20T21:16:46.845, type=Delete                                                                                    
      1 row(s)
      Took 0.0457 seconds                                                                                          

      Reason for the problem

      The HBase connector use the Delete key type without timestamp to delete the latest version of the specified column. This is an expensive call in that on the server-side, it first does a get to find the latest versions timestamp. Then it adds a delete using the fetched cells timestamp. Causing the following issues:

      Problem 1: When writing update data, the timestamp of -U and +U added by the hbase server to the update message may be the same, and -U deleted the latest version of +U data, resulting in accidental deletion of the data. The problem was also reported by https://issues.apache.org/jira/browse/FLINK-28910

      Problem 2: When there are multiple versions of HBase data, deleting the data will exposes earlier versions of the data, and resulting in the issue of data no deletion.

      Solution proposal 

      Use the DeleteColumn key type and set strongly increasing timestamp for put and delete mutation. The delete mutation will delete all versions of the specified column with a timestamp less than or equal to the specified.

      I have test the proposed solution for several days, and neither the data accidental deletion nor no deletion issues happen.

      Attachments

        1. image-2023-05-24-23-16-59-508.png
          733 kB
          LiuZeshan
        2. aa.log
          50 kB
          LiuZeshan
        3. image-2023-05-24-23-07-23-978.png
          28 kB
          LiuZeshan

        Issue Links

          Activity

            People

              LiuZeshan LiuZeshan
              LiuZeshan LiuZeshan
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Remaining Estimate - 24h
                  24h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified