Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22113

UniqueKey constraint is lost with multiple sources join in SQL

    XMLWordPrintableJSON

Details

    Description

      Hi team,
       
      We have a use case to join multiple data sources to generate a continuous updated view. We defined primary key constraint on all the input sources and all the keys are the subsets in the join condition. All joins are left join.
       
      In our case, the first two inputs can produce JoinKeyContainsUniqueKey input sepc, which is good and performant. While when it comes to the third input source, it's joined with the intermediate output table of the first two input tables, and the intermediate table does not carry key constraint information(although the thrid source input table does), so it results in a NoUniqueKey input sepc. Given NoUniqueKey inputs has dramatic performance implications per the Force Join Unique Key email thread, we want to know if there is any mitigation solution for this.

       

      Example:

      Take the example from https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md

      CREATE TEMPORARY TABLE passengers (
        passenger_key STRING,
        first_name STRING,
        last_name STRING,
        update_time TIMESTAMP(3),
        PRIMARY KEY (passenger_key) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'passengers',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'raw',
        'value.format' = 'json'
      );
      
      
      CREATE TEMPORARY TABLE stations (
        station_key STRING,
        update_time TIMESTAMP(3),
        city STRING,
        PRIMARY KEY (station_key) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'stations',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'raw',
        'value.format' = 'json'
      );
      
      CREATE TEMPORARY TABLE booking_channels (
        booking_channel_key STRING,
        update_time TIMESTAMP(3),
        channel STRING,
        PRIMARY KEY (booking_channel_key) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'booking_channels',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'raw',
        'value.format' = 'json'
      );
      
      CREATE TEMPORARY TABLE train_activities (
        scheduled_departure_time TIMESTAMP(3),
        actual_departure_date TIMESTAMP(3),
        passenger_key STRING,
        origin_station_key STRING,
        destination_station_key STRING,
        booking_channel_key STRING,
        PRIMARY KEY (booking_channel_key, origin_station_key, destination_station_key) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'train_activities',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'json',
        'value.format' = 'json'
      );
      
      SELECT 
        t.actual_departure_date, 
        p.first_name,
        p.last_name,
        b.channel, 
        os.city AS origin_station,
        ds.city AS destination_station
      FROM train_activities_1 t
      LEFT JOIN booking_channels b 
      ON t.booking_channel_key = b.booking_channel_key
      LEFT JOIN passengers p
      ON t.passenger_key = p.passenger_key
      LEFT JOIN stations os
      ON t.origin_station_key = os.station_key
      LEFT JOIN stations ds
      ON t.destination_station_key = ds.station_key
      
      

       

       The query will generate exeuction plan of:

       

      Flink SQL> explain
      >  SELECT
      >    t.actual_departure_date,
      >    p.first_name,
      >    p.last_name,
      >    b.channel,
      >    os.city AS origin_station,
      >    ds.city AS destination_station
      >  FROM train_activities_1 t
      >  LEFT JOIN booking_channels b
      >  ON t.booking_channel_key = b.booking_channel_key
      >  LEFT JOIN passengers p
      >  ON t.passenger_key = p.passenger_key
      >  LEFT JOIN stations os
      >  ON t.origin_station_key = os.station_key
      >  LEFT JOIN stations ds
      >  ON t.destination_station_key = ds.station_key;
      == Abstract Syntax Tree ==
      LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], channel=[$8], origin_station=[$15], destination_station=[$18])
      +- LogicalJoin(condition=[=($4, $16)], joinType=[left])
         :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
         :  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
         :  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
         :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, train_activities_1]])
         :  :  :  +- LogicalWatermarkAssigner(rowtime=[update_time], watermark=[-($1, 10000:INTERVAL SECOND)])
         :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, booking_channels]])
         :  :  +- LogicalTableScan(table=[[default_catalog, default_database, passengers]])
         :  +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
         +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
      
      
      
      == Optimized Physical Plan ==
      Calc(select=[actual_departure_date, first_name, last_name, channel, city AS origin_station, city0 AS destination_station])
      +- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, station_key)], select=[actual_departure_date, destination_station_key, channel, first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :- Exchange(distribution=[hash[destination_station_key]])
         :  +- Calc(select=[actual_departure_date, destination_station_key, channel, first_name, last_name, city])
         :     +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, station_key)], select=[actual_departure_date, origin_station_key, destination_station_key, channel, first_name, last_name, station_key, city], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :- Exchange(distribution=[hash[origin_station_key]])
         :        :  +- Calc(select=[actual_departure_date, origin_station_key, destination_station_key, channel, first_name, last_name])
         :        :     +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, passenger_key0)], select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, channel, passenger_key0, first_name, last_name], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :        :- Exchange(distribution=[hash[passenger_key]])
         :        :        :  +- Calc(select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, channel])
         :        :        :     +- Join(joinType=[LeftOuterJoin], where=[=(booking_channel_key, booking_channel_key0)], select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key, booking_channel_key0, channel], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :        :        :- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :        :  +- Calc(select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key])
         :        :        :        :     +- ChangelogNormalize(key=[booking_channel_key, origin_station_key, destination_station_key])
         :        :        :        :        +- Exchange(distribution=[hash[booking_channel_key, origin_station_key, destination_station_key]])
         :        :        :        :           +- TableSourceScan(table=[[default_catalog, default_database, train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key])
         :        :        :        +- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :           +- Calc(select=[booking_channel_key, channel])
         :        :        :              +- ChangelogNormalize(key=[booking_channel_key])
         :        :        :                 +- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :                    +- TableSourceScan(table=[[default_catalog, default_database, booking_channels, watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, update_time, channel])
         :        :        +- Exchange(distribution=[hash[passenger_key]])
         :        :           +- Calc(select=[passenger_key, first_name, last_name])
         :        :              +- ChangelogNormalize(key=[passenger_key])
         :        :                 +- Exchange(distribution=[hash[passenger_key]])
         :        :                    +- TableSourceScan(table=[[default_catalog, default_database, passengers]], fields=[passenger_key, first_name, last_name, update_time])
         :        +- Exchange(distribution=[hash[station_key]])
         :           +- Calc(select=[station_key, city])
         :              +- ChangelogNormalize(key=[station_key])
         :                 +- Exchange(distribution=[hash[station_key]])
         :                    +- TableSourceScan(table=[[default_catalog, default_database, stations]], fields=[station_key, update_time, city])
         +- Exchange(distribution=[hash[station_key]])
            +- Calc(select=[station_key, city])
               +- ChangelogNormalize(key=[station_key])
                  +- Exchange(distribution=[hash[station_key]])
                     +- TableSourceScan(table=[[default_catalog, default_database, stations]], fields=[station_key, update_time, city])== Optimized Execution Plan ==
      Calc(select=[actual_departure_date, first_name, last_name, channel, city AS origin_station, city0 AS destination_station])
      +- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = station_key)], select=[actual_departure_date, destination_station_key, channel, first_name, last_name, city, station_key, city0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :- Exchange(distribution=[hash[destination_station_key]])
         :  +- Calc(select=[actual_departure_date, destination_station_key, channel, first_name, last_name, city])
         :     +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = station_key)], select=[actual_departure_date, origin_station_key, destination_station_key, channel, first_name, last_name, station_key, city], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :- Exchange(distribution=[hash[origin_station_key]])
         :        :  +- Calc(select=[actual_departure_date, origin_station_key, destination_station_key, channel, first_name, last_name])
         :        :     +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = passenger_key0)], select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, channel, passenger_key0, first_name, last_name], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :        :- Exchange(distribution=[hash[passenger_key]])
         :        :        :  +- Calc(select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, channel])
         :        :        :     +- Join(joinType=[LeftOuterJoin], where=[(booking_channel_key = booking_channel_key0)], select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key, booking_channel_key0, channel], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
         :        :        :        :- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :        :  +- Calc(select=[actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key])
         :        :        :        :     +- ChangelogNormalize(key=[booking_channel_key, origin_station_key, destination_station_key])
         :        :        :        :        +- Exchange(distribution=[hash[booking_channel_key, origin_station_key, destination_station_key]])
         :        :        :        :           +- TableSourceScan(table=[[default_catalog, default_database, train_activities_1]], fields=[scheduled_departure_time, actual_departure_date, passenger_key, origin_station_key, destination_station_key, booking_channel_key])
         :        :        :        +- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :           +- Calc(select=[booking_channel_key, channel])
         :        :        :              +- ChangelogNormalize(key=[booking_channel_key])
         :        :        :                 +- Exchange(distribution=[hash[booking_channel_key]])
         :        :        :                    +- TableSourceScan(table=[[default_catalog, default_database, booking_channels, watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, update_time, channel])
         :        :        +- Exchange(distribution=[hash[passenger_key]])
         :        :           +- Calc(select=[passenger_key, first_name, last_name])
         :        :              +- ChangelogNormalize(key=[passenger_key])
         :        :                 +- Exchange(distribution=[hash[passenger_key]])
         :        :                    +- TableSourceScan(table=[[default_catalog, default_database, passengers]], fields=[passenger_key, first_name, last_name, update_time])
         :        +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
         :           +- Calc(select=[station_key, city])
         :              +- ChangelogNormalize(key=[station_key])
         :                 +- Exchange(distribution=[hash[station_key]])
         :                    +- TableSourceScan(table=[[default_catalog, default_database, stations]], fields=[station_key, update_time, city])
         +- Reused(reference_id=[1])
      

       

       

      Attachments

        Activity

          People

            matriv Marios Trivyzas
            okowr Fu Kai
            Votes:
            2 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: