Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Given a scenario, when there are 2 different buckets, and the output is written to another bucket than the source. Under specific circumstances, FileSinkOperator is only used in Reducer stages, and if a root work in that stage is a merge join work, it's not scanned for output uris/paths, therefore needed delegation tokens are not fetched for e.g. the output s3 bucket.
public void addCredentials(BaseWork work, DAG dag) throws IOException { dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); if (work instanceof MapWork) { addCredentials((MapWork) work, dag); } else if (work instanceof ReduceWork) { addCredentials((ReduceWork) work, dag); } }
sample plan, note Merge Join Operator [MERGEJOIN_35]
+----------------------------------------------------+ | Explain | +----------------------------------------------------+ | Plan optimized by CBO. | | | | Vertex dependency in root stage | | Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) | | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) | | | | Stage-3 | | Stats Work{} | | Stage-4 | | Create Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} | | Stage-0 | | Move Operator | | Stage-1 | | Reducer 3 | | File Output Operator [FS_20] | | Group By Operator [GBY_18] (rows=1 width=440) | | Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] | | <-Reducer 2 [CUSTOM_SIMPLE_EDGE] | | File Output Operator [FS_10] | | table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} | | Select Operator [SEL_9] (rows=8400 width=7) | | Output:["_col0"] | | Merge Join Operator [MERGEJOIN_35] (rows=8400 width=7) | | Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] | | <-Map 1 [SIMPLE_EDGE] vectorized | | SHUFFLE [RS_38] | | PartitionCols:_col1 | | Select Operator [SEL_37] (rows=16799 width=15) | | Output:["_col1"] | | Filter Operator [FIL_36] (rows=16799 width=15) | | predicate:((cs_sold_time_sk = 74858L) and cs_call_center_sk is not null) | | TableScan [TS_0] (rows=1439980416 width=15) | | tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] | | <-Map 4 [SIMPLE_EDGE] vectorized | | SHUFFLE [RS_41] | | PartitionCols:_col0 | | Select Operator [SEL_40] (rows=21 width=107) | | Output:["_col0"] | | Filter Operator [FIL_39] (rows=21 width=107) | | predicate:((CAST( cc_county AS STRING) = 'Williamson County') and cc_call_center_sk is not null) | | TableScan [TS_3] (rows=42 width=107) | | tpcds_bin_partitioned_orc_1000@call_center,cc, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] | | PARTITION_ONLY_SHUFFLE [RS_17] | | Group By Operator [GBY_16] (rows=1 width=424) | | Output:["_col0"],aggregations:["compute_stats(col1, 'hll')"] | | Select Operator [SEL_15] (rows=8400 width=7) | | Output:["col1"] | | Please refer to the previous Select Operator [SEL_9] | | Stage-2 | | Dependency Collection{} | | Please refer to the previous Stage-1 | | | +----------------------------------------------------+