Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-22707

MergeJoinWork should be considered while collecting DAG credentials

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 4.0.0-alpha-1
    • Query Planning
    • None

    Description

      Given a scenario, when there are 2 different buckets, and the output is written to another bucket than the source. Under specific circumstances, FileSinkOperator is only used in Reducer stages, and if a root work in that stage is a merge join work, it's not scanned for output uris/paths, therefore needed delegation tokens are not fetched for e.g. the output s3 bucket.

      https://github.com/apache/hive/blob/0df4f6c61010b64246d4790f9ce14e966ef34dcb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java#L1507-L1514

        public void addCredentials(BaseWork work, DAG dag) throws IOException {
          dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
          if (work instanceof MapWork) {
            addCredentials((MapWork) work, dag);
          } else if (work instanceof ReduceWork) {
            addCredentials((ReduceWork) work, dag);
          }
        }
      

      sample plan, note Merge Join Operator [MERGEJOIN_35]

      +----------------------------------------------------+
      |                      Explain                       |
      +----------------------------------------------------+
      | Plan optimized by CBO.                             |
      |                                                    |
      | Vertex dependency in root stage                    |
      | Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
      | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
      |                                                    |
      | Stage-3                                            |
      |   Stats Work{}                                     |
      |     Stage-4                                        |
      |       Create Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
      |         Stage-0                                    |
      |           Move Operator                            |
      |             Stage-1                                |
      |               Reducer 3                            |
      |               File Output Operator [FS_20]         |
      |                 Group By Operator [GBY_18] (rows=1 width=440) |
      |                   Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] |
      |                 <-Reducer 2 [CUSTOM_SIMPLE_EDGE]   |
      |                   File Output Operator [FS_10]     |
      |                     table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
      |                     Select Operator [SEL_9] (rows=8400 width=7) |
      |                       Output:["_col0"]             |
      |                       Merge Join Operator [MERGEJOIN_35] (rows=8400 width=7) |
      |                         Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] |
      |                       <-Map 1 [SIMPLE_EDGE] vectorized |
      |                         SHUFFLE [RS_38]            |
      |                           PartitionCols:_col1      |
      |                           Select Operator [SEL_37] (rows=16799 width=15) |
      |                             Output:["_col1"]       |
      |                             Filter Operator [FIL_36] (rows=16799 width=15) |
      |                               predicate:((cs_sold_time_sk = 74858L) and cs_call_center_sk is not null) |
      |                               TableScan [TS_0] (rows=1439980416 width=15) |
      |                                 tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] |
      |                       <-Map 4 [SIMPLE_EDGE] vectorized |
      |                         SHUFFLE [RS_41]            |
      |                           PartitionCols:_col0      |
      |                           Select Operator [SEL_40] (rows=21 width=107) |
      |                             Output:["_col0"]       |
      |                             Filter Operator [FIL_39] (rows=21 width=107) |
      |                               predicate:((CAST( cc_county AS STRING) = 'Williamson County') and cc_call_center_sk is not null) |
      |                               TableScan [TS_3] (rows=42 width=107) |
      |                                 tpcds_bin_partitioned_orc_1000@call_center,cc, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] |
      |                   PARTITION_ONLY_SHUFFLE [RS_17]   |
      |                     Group By Operator [GBY_16] (rows=1 width=424) |
      |                       Output:["_col0"],aggregations:["compute_stats(col1, 'hll')"] |
      |                       Select Operator [SEL_15] (rows=8400 width=7) |
      |                         Output:["col1"]            |
      |                          Please refer to the previous Select Operator [SEL_9] |
      |         Stage-2                                    |
      |           Dependency Collection{}                  |
      |              Please refer to the previous Stage-1  |
      |                                                    |
      +----------------------------------------------------+
      

      Attachments

        1. HIVE-22707.01.patch
          3 kB
          László Bodor
        2. HIVE-22707.02.patch
          6 kB
          László Bodor

        Activity

          People

            abstractdog László Bodor
            abstractdog László Bodor
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: