hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [25/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.
Date Sun, 27 Aug 2017 05:33:25 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
deleted file mode 100644
index 1d4d37b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type.
- */
-@InterfaceAudience.Public
-public class CellCreator {
-
-  public static final String VISIBILITY_EXP_RESOLVER_CLASS =
-      "hbase.mapreduce.visibility.expression.resolver.class";
-
-  private VisibilityExpressionResolver visExpResolver;
-
-  public CellCreator(Configuration conf) {
-    Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
-        VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
-        VisibilityExpressionResolver.class);
-    this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
-    this.visExpResolver.init();
-  }
-
-  /**
-   * @param row row key
-   * @param roffset row offset
-   * @param rlength row length
-   * @param family family name
-   * @param foffset family offset
-   * @param flength family length
-   * @param qualifier column qualifier
-   * @param qoffset qualifier offset
-   * @param qlength qualifier length
-   * @param timestamp version timestamp
-   * @param value column value
-   * @param voffset value offset
-   * @param vlength value length
-   * @return created Cell
-   * @throws IOException
-   */
-  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
-      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
-      int vlength) throws IOException {
-    return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
-        timestamp, value, voffset, vlength, (List<Tag>)null);
-  }
-
-  /**
-   * @param row row key
-   * @param roffset row offset
-   * @param rlength row length
-   * @param family family name
-   * @param foffset family offset
-   * @param flength family length
-   * @param qualifier column qualifier
-   * @param qoffset qualifier offset
-   * @param qlength qualifier length
-   * @param timestamp version timestamp
-   * @param value column value
-   * @param voffset value offset
-   * @param vlength value length
-   * @param visExpression visibility expression to be associated with cell
-   * @return created Cell
-   * @throws IOException
-   */
-  @Deprecated
-  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
-      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
-      int vlength, String visExpression) throws IOException {
-    List<Tag> visTags = null;
-    if (visExpression != null) {
-      visTags = this.visExpResolver.createVisibilityExpTags(visExpression);
-    }
-    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
-        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
-  }
-
-  /**
-   * @param row row key
-   * @param roffset row offset
-   * @param rlength row length
-   * @param family family name
-   * @param foffset family offset
-   * @param flength family length
-   * @param qualifier column qualifier
-   * @param qoffset qualifier offset
-   * @param qlength qualifier length
-   * @param timestamp version timestamp
-   * @param value column value
-   * @param voffset value offset
-   * @param vlength value length
-   * @param tags
-   * @return created Cell
-   * @throws IOException
-   */
-  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
-      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
-      int vlength, List<Tag> tags) throws IOException {
-    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
-        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
-  }
-
-  /**
-   * @return Visibility expression resolver
-   */
-  public VisibilityExpressionResolver getVisibilityExpressionResolver() {
-    return this.visExpResolver;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
deleted file mode 100644
index 21b8556..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * Tool used to copy a table to another one which can be on a different setup.
- * It is also configurable with a start and time as well as a specification
- * of the region server implementation if different from the local cluster.
- */
-@InterfaceAudience.Public
-public class CopyTable extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(CopyTable.class);
-
-  final static String NAME = "copytable";
-  long startTime = 0;
-  long endTime = HConstants.LATEST_TIMESTAMP;
-  int batch = Integer.MAX_VALUE;
-  int cacheRow = -1;
-  int versions = -1;
-  String tableName = null;
-  String startRow = null;
-  String stopRow = null;
-  String dstTableName = null;
-  String peerAddress = null;
-  String families = null;
-  boolean allCells = false;
-  static boolean shuffle = false;
-
-  boolean bulkload = false;
-  Path bulkloadDir = null;
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public Job createSubmittableJob(String[] args)
-  throws IOException {
-    if (!doCommandLine(args)) {
-      return null;
-    }
-    
-    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJarByClass(CopyTable.class);
-    Scan scan = new Scan();
-    
-    scan.setBatch(batch);
-    scan.setCacheBlocks(false);
-    
-    if (cacheRow > 0) {
-      scan.setCaching(cacheRow);
-    } else {
-      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
-    }
-    
-    scan.setTimeRange(startTime, endTime);
-    
-    if (allCells) {
-      scan.setRaw(true);
-    }
-    if (shuffle) {
-      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
-    }
-    if (versions >= 0) {
-      scan.setMaxVersions(versions);
-    }
-
-    if (startRow != null) {
-      scan.setStartRow(Bytes.toBytesBinary(startRow));
-    }
-
-    if (stopRow != null) {
-      scan.setStopRow(Bytes.toBytesBinary(stopRow));
-    }
-
-    if(families != null) {
-      String[] fams = families.split(",");
-      Map<String,String> cfRenameMap = new HashMap<>();
-      for(String fam : fams) {
-        String sourceCf;
-        if(fam.contains(":")) {
-            // fam looks like "sourceCfName:destCfName"
-            String[] srcAndDest = fam.split(":", 2);
-            sourceCf = srcAndDest[0];
-            String destCf = srcAndDest[1];
-            cfRenameMap.put(sourceCf, destCf);
-        } else {
-            // fam is just "sourceCf"
-            sourceCf = fam;
-        }
-        scan.addFamily(Bytes.toBytes(sourceCf));
-      }
-      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
-    }
-    job.setNumReduceTasks(0);
-
-    if (bulkload) {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
-        null, job);
-
-      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
-      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
-
-      FileSystem fs = FileSystem.get(getConf());
-      Random rand = new Random();
-      Path root = new Path(fs.getWorkingDirectory(), "copytable");
-      fs.mkdirs(root);
-      while (true) {
-        bulkloadDir = new Path(root, "" + rand.nextLong());
-        if (!fs.exists(bulkloadDir)) {
-          break;
-        }
-      }
-
-      System.out.println("HFiles will be stored at " + this.bulkloadDir);
-      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
-      try (Connection conn = ConnectionFactory.createConnection(getConf());
-          Admin admin = conn.getAdmin()) {
-        HFileOutputFormat2.configureIncrementalLoadMap(job,
-            admin.listTableDescriptor((TableName.valueOf(dstTableName))));
-      }
-    } else {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        Import.Importer.class, null, null, job);
-
-      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
-        null);
-    }
-
-    return job;
-  }
-
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void printUsage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
-        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
-    System.err.println();
-    System.err.println("Options:");
-    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
-    System.err.println("              specify if different from current cluster");
-    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
-    System.err.println(" startrow     the start row");
-    System.err.println(" stoprow      the stop row");
-    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
-    System.err.println("              without endtime means from starttime to forever");
-    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
-    System.err.println(" versions     number of cell versions to copy");
-    System.err.println(" new.name     new table's name");
-    System.err.println(" peer.adr     Address of the peer cluster given in the format");
-    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
-        + ".port:zookeeper.znode.parent");
-    System.err.println(" families     comma-separated list of families to copy");
-    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
-    System.err.println("              To keep the same name, just give \"cfName\"");
-    System.err.println(" all.cells    also copy delete markers and deleted cells");
-    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
-        + "table");
-    System.err.println();
-    System.err.println("Args:");
-    System.err.println(" tablename    Name of the table to copy");
-    System.err.println();
-    System.err.println("Examples:");
-    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
-    System.err.println(" $ hbase " +
-        "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
-        "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
-    System.err.println("For performance consider the following general option:\n"
-        + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
-        + "  decreases the round trip time to the server and may increase performance.\n"
-        + "    -Dhbase.client.scanner.caching=100\n"
-        + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
-        + "  inaccurate results.\n"
-        + "    -Dmapreduce.map.speculative=false");
-  }
-
-  private boolean doCommandLine(final String[] args) {
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
-    if (args.length < 1) {
-      printUsage(null);
-      return false;
-    }
-    try {
-      for (int i = 0; i < args.length; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage(null);
-          return false;
-        }
-
-        final String startRowArgKey = "--startrow=";
-        if (cmd.startsWith(startRowArgKey)) {
-          startRow = cmd.substring(startRowArgKey.length());
-          continue;
-        }
-
-        final String stopRowArgKey = "--stoprow=";
-        if (cmd.startsWith(stopRowArgKey)) {
-          stopRow = cmd.substring(stopRowArgKey.length());
-          continue;
-        }
-
-        final String startTimeArgKey = "--starttime=";
-        if (cmd.startsWith(startTimeArgKey)) {
-          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
-          continue;
-        }
-
-        final String endTimeArgKey = "--endtime=";
-        if (cmd.startsWith(endTimeArgKey)) {
-          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
-          continue;
-        }
-        
-        final String batchArgKey = "--batch=";
-        if (cmd.startsWith(batchArgKey)) {
-          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
-          continue;
-        }
-        
-        final String cacheRowArgKey = "--cacheRow=";
-        if (cmd.startsWith(cacheRowArgKey)) {
-          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
-          continue;
-        }
-
-        final String versionsArgKey = "--versions=";
-        if (cmd.startsWith(versionsArgKey)) {
-          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
-          continue;
-        }
-
-        final String newNameArgKey = "--new.name=";
-        if (cmd.startsWith(newNameArgKey)) {
-          dstTableName = cmd.substring(newNameArgKey.length());
-          continue;
-        }
-
-        final String peerAdrArgKey = "--peer.adr=";
-        if (cmd.startsWith(peerAdrArgKey)) {
-          peerAddress = cmd.substring(peerAdrArgKey.length());
-          continue;
-        }
-
-        final String familiesArgKey = "--families=";
-        if (cmd.startsWith(familiesArgKey)) {
-          families = cmd.substring(familiesArgKey.length());
-          continue;
-        }
-
-        if (cmd.startsWith("--all.cells")) {
-          allCells = true;
-          continue;
-        }
-
-        if (cmd.startsWith("--bulkload")) {
-          bulkload = true;
-          continue;
-        }
-
-        if (cmd.startsWith("--shuffle")) {
-          shuffle = true;
-          continue;
-        }
-
-        if (i == args.length-1) {
-          tableName = cmd;
-        } else {
-          printUsage("Invalid argument '" + cmd + "'");
-          return false;
-        }
-      }
-      if (dstTableName == null && peerAddress == null) {
-        printUsage("At least a new table name or a " +
-            "peer address must be specified");
-        return false;
-      }
-      if ((endTime != 0) && (startTime > endTime)) {
-        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
-        return false;
-      }
-
-      if (bulkload && peerAddress != null) {
-        printUsage("Remote bulkload is not supported!");
-        return false;
-      }
-
-      // set dstTableName if necessary
-      if (dstTableName == null) {
-        dstTableName = tableName;
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      printUsage("Can't start because " + e.getMessage());
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Main entry point.
-   *
-   * @param args  The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Job job = createSubmittableJob(args);
-    if (job == null) return 1;
-    if (!job.waitForCompletion(true)) {
-      LOG.info("Map-reduce job failed!");
-      if (bulkload) {
-        LOG.info("Files are not bulkloaded!");
-      }
-      return 1;
-    }
-    int code = 0;
-    if (bulkload) {
-      code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
-          this.dstTableName});
-      if (code == 0) {
-        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
-        // LoadIncrementalHFiles.
-        FileSystem fs = FileSystem.get(this.getConf());
-        if (!fs.delete(this.bulkloadDir, true)) {
-          LOG.error("Deleting folder " + bulkloadDir + " failed!");
-          code = 1;
-        }
-      }
-    }
-    return code;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
deleted file mode 100644
index 004ee5c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
-import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
-import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.security.visibility.Authorizations;
-import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
-import org.apache.hadoop.hbase.security.visibility.VisibilityLabelOrdinalProvider;
-import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * This implementation creates tags by expanding expression using label ordinal. Labels will be
- * serialized in sorted order of it's ordinal.
- */
-@InterfaceAudience.Private
-public class DefaultVisibilityExpressionResolver implements VisibilityExpressionResolver {
-  private static final Log LOG = LogFactory.getLog(DefaultVisibilityExpressionResolver.class);
-
-  private Configuration conf;
-  private final Map<String, Integer> labels = new HashMap<>();
-
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public void init() {
-    // Reading all the labels and ordinal.
-    // This scan should be done by user with global_admin privileges.. Ensure that it works
-    Table labelsTable = null;
-    Connection connection = null;
-    try {
-      connection = ConnectionFactory.createConnection(conf);
-      try {
-        labelsTable = connection.getTable(LABELS_TABLE_NAME);
-      } catch (IOException e) {
-        LOG.error("Error opening 'labels' table", e);
-        return;
-      }
-      Scan scan = new Scan();
-      scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
-      scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
-      ResultScanner scanner = null;
-      try {
-        scanner = labelsTable.getScanner(scan);
-        Result next = null;
-        while ((next = scanner.next()) != null) {
-          byte[] row = next.getRow();
-          byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
-          labels.put(Bytes.toString(value), Bytes.toInt(row));
-        }
-      } catch (TableNotFoundException e) {
-        // Table not found. So just return
-        return;
-      } catch (IOException e) {
-        LOG.error("Error scanning 'labels' table", e);
-      } finally {
-        if (scanner != null) scanner.close();
-      }
-    } catch (IOException ioe) {
-      LOG.error("Failed reading 'labels' tags", ioe);
-      return;
-    } finally {
-      if (labelsTable != null) {
-        try {
-          labelsTable.close();
-        } catch (IOException ioe) {
-          LOG.warn("Error closing 'labels' table", ioe);
-        }
-      }
-      if (connection != null)
-        try {
-          connection.close();
-        } catch (IOException ioe) {
-          LOG.warn("Failed close of temporary connection", ioe);
-        }
-    }
-  }
-
-  @Override
-  public List<Tag> createVisibilityExpTags(String visExpression) throws IOException {
-    VisibilityLabelOrdinalProvider provider = new VisibilityLabelOrdinalProvider() {
-      @Override
-      public int getLabelOrdinal(String label) {
-        Integer ordinal = null;
-        ordinal = labels.get(label);
-        if (ordinal != null) {
-          return ordinal.intValue();
-        }
-        return VisibilityConstants.NON_EXIST_LABEL_ORDINAL;
-      }
-
-      @Override
-      public String getLabel(int ordinal) {
-        // Unused
-        throw new UnsupportedOperationException(
-            "getLabel should not be used in VisibilityExpressionResolver");
-      }
-    };
-    return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
deleted file mode 100644
index 9737b55..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
-import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.util.ProgramDriver;
-
-/**
- * Driver for hbase mapreduce jobs. Select which to run by passing
- * name of job to this main.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@InterfaceStability.Stable
-public class Driver {
-  /**
-   * @param args
-   * @throws Throwable
-   */
-  public static void main(String[] args) throws Throwable {
-    ProgramDriver pgd = new ProgramDriver();
-
-    pgd.addClass(RowCounter.NAME, RowCounter.class,
-      "Count rows in HBase table.");
-    pgd.addClass(CellCounter.NAME, CellCounter.class,
-      "Count cells in HBase table.");
-    pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
-    pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
-    pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
-    pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
-                 "Complete a bulk data load.");
-    pgd.addClass(CopyTable.NAME, CopyTable.class,
-        "Export a table from local cluster to peer cluster.");
-    pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" +
-        " the data from tables in two different clusters. WARNING: It" +
-        " doesn't work for incrementColumnValues'd cells since the" +
-        " timestamp is changed after being appended to the log.");
-    pgd.addClass(WALPlayer.NAME, WALPlayer.class, "Replay WAL files.");
-    pgd.addClass(ExportSnapshot.NAME, ExportSnapshot.class, "Export" +
-        " the specific snapshot to a given FileSystem.");
-
-    ProgramDriver.class.getMethod("driver", new Class [] {String[].class}).
-      invoke(pgd, new Object[]{args});
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
deleted file mode 100644
index 4c01528..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
-* Export an HBase table.
-* Writes content to sequence files up in HDFS.  Use {@link Import} to read it
-* back in again.
-*/
-@InterfaceAudience.Public
-public class Export extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(Export.class);
-  final static String NAME = "export";
-  final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
-  final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException {
-    String tableName = args[0];
-    Path outputDir = new Path(args[1]);
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJobName(NAME + "_" + tableName);
-    job.setJarByClass(Export.class);
-    // Set optional scan parameters
-    Scan s = getConfiguredScanForJob(conf, args);
-    IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
-    // No reducers.  Just write straight to output files.
-    job.setNumReduceTasks(0);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(Result.class);
-    FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs.
-    return job;
-  }
-
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
-    Scan s = new Scan();
-    // Optional arguments.
-    // Set Scan Versions
-    int versions = args.length > 2? Integer.parseInt(args[2]): 1;
-    s.setMaxVersions(versions);
-    // Set Scan Range
-    long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
-    long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
-    s.setTimeRange(startTime, endTime);
-    // Set cache blocks
-    s.setCacheBlocks(false);
-    // set Start and Stop row
-    if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
-      s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
-    }
-    if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
-      s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
-    }
-    // Set Scan Column Family
-    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
-    if (raw) {
-      s.setRaw(raw);
-    }
-    for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
-      s.addFamily(Bytes.toBytes(columnFamily));
-    }
-    // Set RowFilter or Prefix Filter if applicable.
-    Filter exportFilter = getExportFilter(args);
-    if (exportFilter!= null) {
-        LOG.info("Setting Scan Filter for Export.");
-      s.setFilter(exportFilter);
-    }
-
-    int batching = conf.getInt(EXPORT_BATCHING, -1);
-    if (batching !=  -1){
-      try {
-        s.setBatch(batching);
-      } catch (IncompatibleFilterException e) {
-        LOG.error("Batching could not be set", e);
-      }
-    }
-    LOG.info("versions=" + versions + ", starttime=" + startTime +
-      ", endtime=" + endTime + ", keepDeletedCells=" + raw);
-    return s;
-  }
-
-  private static Filter getExportFilter(String[] args) {
-    Filter exportFilter = null;
-    String filterCriteria = (args.length > 5) ? args[5]: null;
-    if (filterCriteria == null) return null;
-    if (filterCriteria.startsWith("^")) {
-      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
-      exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
-    } else {
-      exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
-    }
-    return exportFilter;
-  }
-
-  /*
-   * @param errorMsg Error message.  Can be null.
-   */
-  private static void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
-      "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
-    System.err.println("  Note: -D properties will be applied to the conf used. ");
-    System.err.println("  For example: ");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress=true");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
-    System.err.println("   -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
-    System.err.println("  Additionally, the following SCAN properties can be specified");
-    System.err.println("  to control/limit what is exported..");
-    System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
-    System.err.println("   -D " + RAW_SCAN + "=true");
-    System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
-    System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the export");
-    System.err.println("For performance consider the following properties:\n"
-        + "   -Dhbase.client.scanner.caching=100\n"
-        + "   -Dmapreduce.map.speculative=false\n"
-        + "   -Dmapreduce.reduce.speculative=false");
-    System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
-        + "   -D" + EXPORT_BATCHING + "=10");
-  }
-
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      return -1;
-    }
-    Job job = createSubmittableJob(getConf(), args);
-    return (job.waitForCompletion(true) ? 0 : 1);
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args);
-    System.exit(errCode);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
deleted file mode 100644
index dc30c6e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Extract grouping columns from input record.
- */
-@InterfaceAudience.Public
-public class GroupingTableMapper
-extends TableMapper<ImmutableBytesWritable,Result> implements Configurable {
-
-  /**
-   * JobConf parameter to specify the columns used to produce the key passed to
-   * collect from the map phase.
-   */
-  public static final String GROUP_COLUMNS =
-    "hbase.mapred.groupingtablemap.columns";
-
-  /** The grouping columns. */
-  protected byte [][] columns;
-  /** The current configuration. */
-  private Configuration conf = null;
-
-  /**
-   * Use this before submitting a TableMap job. It will appropriately set up
-   * the job.
-   *
-   * @param table The table to be processed.
-   * @param scan  The scan with the columns etc.
-   * @param groupColumns  A space separated list of columns used to form the
-   * key used in collect.
-   * @param mapper  The mapper class.
-   * @param job  The current job.
-   * @throws IOException When setting up the job fails.
-   */
-  @SuppressWarnings("unchecked")
-  public static void initJob(String table, Scan scan, String groupColumns,
-    Class<? extends TableMapper> mapper, Job job) throws IOException {
-    TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
-        ImmutableBytesWritable.class, Result.class, job);
-    job.getConfiguration().set(GROUP_COLUMNS, groupColumns);
-  }
-
-  /**
-   * Extract the grouping columns from value to construct a new key. Pass the
-   * new key and value to reduce. If any of the grouping columns are not found
-   * in the value, the record is skipped.
-   *
-   * @param key  The current key.
-   * @param value  The current value.
-   * @param context  The current context.
-   * @throws IOException When writing the record fails.
-   * @throws InterruptedException When the job is aborted.
-   */
-  @Override
-  public void map(ImmutableBytesWritable key, Result value, Context context)
-  throws IOException, InterruptedException {
-    byte[][] keyVals = extractKeyValues(value);
-    if(keyVals != null) {
-      ImmutableBytesWritable tKey = createGroupKey(keyVals);
-      context.write(tKey, value);
-    }
-  }
-
-  /**
-   * Extract columns values from the current record. This method returns
-   * null if any of the columns are not found.
-   * <p>
-   * Override this method if you want to deal with nulls differently.
-   *
-   * @param r  The current values.
-   * @return Array of byte values.
-   */
-  protected byte[][] extractKeyValues(Result r) {
-    byte[][] keyVals = null;
-    ArrayList<byte[]> foundList = new ArrayList<>();
-    int numCols = columns.length;
-    if (numCols > 0) {
-      for (Cell value: r.listCells()) {
-        byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
-            CellUtil.cloneQualifier(value));
-        for (int i = 0; i < numCols; i++) {
-          if (Bytes.equals(column, columns[i])) {
-            foundList.add(CellUtil.cloneValue(value));
-            break;
-          }
-        }
-      }
-      if(foundList.size() == numCols) {
-        keyVals = foundList.toArray(new byte[numCols][]);
-      }
-    }
-    return keyVals;
-  }
-
-  /**
-   * Create a key by concatenating multiple column values.
-   * <p>
-   * Override this function in order to produce different types of keys.
-   *
-   * @param vals  The current key/values.
-   * @return A key generated by concatenating multiple column values.
-   */
-  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
-    if(vals == null) {
-      return null;
-    }
-    StringBuilder sb =  new StringBuilder();
-    for(int i = 0; i < vals.length; i++) {
-      if(i > 0) {
-        sb.append(" ");
-      }
-      sb.append(Bytes.toString(vals[i]));
-    }
-    return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString()));
-  }
-
-  /**
-   * Returns the current configuration.
-   *
-   * @return The current configuration.
-   * @see org.apache.hadoop.conf.Configurable#getConf()
-   */
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Sets the configuration. This is used to set up the grouping details.
-   *
-   * @param configuration  The configuration to set.
-   * @see org.apache.hadoop.conf.Configurable#setConf(
-   *   org.apache.hadoop.conf.Configuration)
-   */
-  @Override
-  public void setConf(Configuration configuration) {
-    this.conf = configuration;
-    String[] cols = conf.get(GROUP_COLUMNS, "").split(" ");
-    columns = new byte[cols.length][];
-    for(int i = 0; i < cols.length; i++) {
-      columns[i] = Bytes.toBytes(cols[i]);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
deleted file mode 100644
index e90d5c1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple MR input format for HFiles.
- * This code was borrowed from Apache Crunch project.
- * Updated to the recent version of HBase.
- */
-public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
-
-  /**
-   * File filter that removes all "hidden" files. This might be something worth removing from
-   * a more general purpose utility; it accounts for the presence of metadata files created
-   * in the way we're doing exports.
-   */
-  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  /**
-   * Record reader for HFiles.
-   */
-  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
-
-    private Reader in;
-    protected Configuration conf;
-    private HFileScanner scanner;
-
-    /**
-     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
-     */
-    private Cell value = null;
-    private long count;
-    private boolean seeked = false;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      FileSplit fileSplit = (FileSplit) split;
-      conf = context.getConfiguration();
-      Path path = fileSplit.getPath();
-      FileSystem fs = path.getFileSystem(conf);
-      LOG.info("Initialize HFileRecordReader for {}", path);
-      this.in = HFile.createReader(fs, path, conf);
-
-      // The file info must be loaded before the scanner can be used.
-      // This seems like a bug in HBase, but it's easily worked around.
-      this.in.loadFileInfo();
-      this.scanner = in.getScanner(false, false);
-
-    }
-
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      boolean hasNext;
-      if (!seeked) {
-        LOG.info("Seeking to start");
-        hasNext = scanner.seekTo();
-        seeked = true;
-      } else {
-        hasNext = scanner.next();
-      }
-      if (!hasNext) {
-        return false;
-      }
-      value = scanner.getCell();
-      count++;
-      return true;
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-      return NullWritable.get();
-    }
-
-    @Override
-    public Cell getCurrentValue() throws IOException, InterruptedException {
-      return value;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
-      // the start row, but better than nothing anyway.
-      return 1.0f * count / in.getEntries();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (in != null) {
-        in.close();
-        in = null;
-      }
-    }
-  }
-
-  @Override
-  protected List<FileStatus> listStatus(JobContext job) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-
-    // Explode out directories that match the original FileInputFormat filters
-    // since HFiles are written to directories where the
-    // directory name is the column name
-    for (FileStatus status : super.listStatus(job)) {
-      if (status.isDirectory()) {
-        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
-        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
-          result.add(match);
-        }
-      } else {
-        result.add(status);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new HFileRecordReader();
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path filename) {
-    // This file isn't splittable.
-    return false;
-  }
-}
\ No newline at end of file


Mime
View raw message