Uploaded image for project: 'HBase'
  1. HBase
  2. HBASE-25608

Support HFileOutputFormat locality sensitive even destination cluster is different from source cluster

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0-alpha-1, 1.7.0, 2.4.1, 1.8.0
    • 3.0.0-alpha-1
    • None
    • None
    • Reviewed
    • Hide

      Added configurations to specify the ZK cluster key for remote cluster in HFileOutputFormat2.
      Default, input and output are to the cluster specified in Job configuration.
      Use HFileOutputformat2#configureRemoteCluster to have output go to a remote cluster.
      HFileOutputFormat2#configureIncrementalLoad(Job, Table, RegionLocator) configure them using Table's configuration.
      You can also configure them by calling HFileOutputFormat2#configureRemoteCluster explicitly.
      Show
      Added configurations to specify the ZK cluster key for remote cluster in HFileOutputFormat2. Default, input and output are to the cluster specified in Job configuration. Use HFileOutputformat2#configureRemoteCluster to have output go to a remote cluster. HFileOutputFormat2#configureIncrementalLoad(Job, Table, RegionLocator) configure them using Table's configuration. You can also configure them by calling HFileOutputFormat2#configureRemoteCluster explicitly.

    Description

      Sometimes, we want to perform MR job which is source cluster and destination cluster is different like following for data migration, batch job and so on.

       

              Configuration conf = HBaseConfiguration.createClusterConf(HBaseConfiguration.create(), sourceClusterKey);
      
              final Job job = Job.getInstance(conf, jobName);
              // ...
              FileOutputFormat.setOutputPath(job, new Path(outputPath));
      
              Scan scan = createScanner();
      
              TableMapReduceUtil.initTableMapperJob(
                      sourceTableName, scan,
                      Mapper.class,
                      ImmutableBytesWritable.class, Put.class, job);
      
              try (Connection con = ConnectionFactory.createConnection(destinationClusterKey); 
                   Table table = con.getTable(destinationTableName); 
                   RegionLocator regionLocator = con.getRegionLocator(destinationTableName)) {
                  HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
              }
              return job.waitForCompletion(true) ? 0 : 1;
      

      HFileOutputFormat2 doesn't create locality-sensitive hfiles.

      We got following exception

      2021-02-24 19:55:48,298 WARN [main] org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong when locating rowkey: xxxxxxxxxxxx
      org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found, got: XXXX.
              at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302)
              at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181)
              at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165)
              at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122)
              at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957)
              at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74)
              at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216)
              at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
              at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
              at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
              at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
              at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78)
              at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43)
              at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
              at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
              at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
              at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
              at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
      

      Because it creates connection using task configuration which is configured for source cluster.
      Thus, it tried to connect to the source cluster and get locations for the table that should exist in the destination.

                InetSocketAddress[] favoredNodes = null;
                if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
                  HRegionLocation loc = null;
                  String tableName = Bytes.toString(tableNameBytes);
                  if (tableName != null) {
                    try (Connection connection = ConnectionFactory.createConnection(conf);
                        RegionLocator locator =
                            connection.getRegionLocator(TableName.valueOf(tableName))) {
                      loc = locator.getRegionLocation(rowKey);
                    } catch (Throwable e) {
                      LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
                        tableName, e);
                      loc = null;
                    }
                  }
                  if (null == loc) {
                    LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
                  } else {
                    LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
                    InetSocketAddress initialIsa =
                        new InetSocketAddress(loc.getHostname(), loc.getPort());
                    if (initialIsa.isUnresolved()) {
                      LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
                    } else {
                      LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
                      favoredNodes = new InetSocketAddress[] { initialIsa };
                    }
                  }
                }
                wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
      

      HFileOutputFormat2 should be aware of destination cluster correctly when source and destination is different for proper location-sensitive HFile generation

      Attachments

        Issue Links

          Activity

            People

              lineyshinya Shinya Yoshida
              lineyshinya Shinya Yoshida
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: