hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [16/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.
Date Sat, 26 Aug 2017 08:56:02 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
deleted file mode 100644
index e18b3aa..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-/**
- * A base for {@link MultiTableInputFormat}s. Receives a list of
- * {@link Scan} instances that define the input tables and
- * filters etc. Subclasses may use other TableRecordReader implementations.
- */
-@InterfaceAudience.Public
-public abstract class MultiTableInputFormatBase extends
-    InputFormat<ImmutableBytesWritable, Result> {
-
-  private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
-
-  /** Holds the set of scans used to define the input. */
-  private List<Scan> scans;
-
-  /** The reader scanning the table, can be a custom one. */
-  private TableRecordReader tableRecordReader = null;
-
-  /**
-   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
-   * default.
-   *
-   * @param split The split to work with.
-   * @param context The current context.
-   * @return The newly created record reader.
-   * @throws IOException When creating the reader fails.
-   * @throws InterruptedException when record reader initialization fails
-   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
-   *      org.apache.hadoop.mapreduce.InputSplit,
-   *      org.apache.hadoop.mapreduce.TaskAttemptContext)
-   */
-  @Override
-  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-      InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    TableSplit tSplit = (TableSplit) split;
-    LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
-
-    if (tSplit.getTable() == null) {
-      throw new IOException("Cannot create a record reader because of a"
-          + " previous error. Please look at the previous logs lines from"
-          + " the task's full log for more details.");
-    }
-    final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
-    Table table = connection.getTable(tSplit.getTable());
-
-    if (this.tableRecordReader == null) {
-      this.tableRecordReader = new TableRecordReader();
-    }
-    final TableRecordReader trr = this.tableRecordReader;
-
-    try {
-      Scan sc = tSplit.getScan();
-      sc.setStartRow(tSplit.getStartRow());
-      sc.setStopRow(tSplit.getEndRow());
-      trr.setScan(sc);
-      trr.setTable(table);
-      return new RecordReader<ImmutableBytesWritable, Result>() {
-
-        @Override
-        public void close() throws IOException {
-          trr.close();
-          connection.close();
-        }
-
-        @Override
-        public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
-          return trr.getCurrentKey();
-        }
-
-        @Override
-        public Result getCurrentValue() throws IOException, InterruptedException {
-          return trr.getCurrentValue();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-          return trr.getProgress();
-        }
-
-        @Override
-        public void initialize(InputSplit inputsplit, TaskAttemptContext context)
-            throws IOException, InterruptedException {
-          trr.initialize(inputsplit, context);
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-          return trr.nextKeyValue();
-        }
-      };
-    } catch (IOException ioe) {
-      // If there is an exception make sure that all
-      // resources are closed and released.
-      trr.close();
-      connection.close();
-      throw ioe;
-    }
-  }
-
-  /**
-   * Calculates the splits that will serve as input for the map tasks. The
-   * number of splits matches the number of regions in a table.
-   *
-   * @param context The current job context.
-   * @return The list of input splits.
-   * @throws IOException When creating the list of splits fails.
-   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException {
-    if (scans.isEmpty()) {
-      throw new IOException("No scans were provided.");
-    }
-
-    Map<TableName, List<Scan>> tableMaps = new HashMap<>();
-    for (Scan scan : scans) {
-      byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
-      if (tableNameBytes == null)
-        throw new IOException("A scan object did not have a table name");
-
-      TableName tableName = TableName.valueOf(tableNameBytes);
-
-      List<Scan> scanList = tableMaps.get(tableName);
-      if (scanList == null) {
-        scanList = new ArrayList<>();
-        tableMaps.put(tableName, scanList);
-      }
-      scanList.add(scan);
-    }
-
-    List<InputSplit> splits = new ArrayList<>();
-    Iterator iter = tableMaps.entrySet().iterator();
-    while (iter.hasNext()) {
-      Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
-      TableName tableName = entry.getKey();
-      List<Scan> scanList = entry.getValue();
-
-      try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
-        Table table = conn.getTable(tableName);
-        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
-                regionLocator, conn.getAdmin());
-        Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
-        for (Scan scan : scanList) {
-          if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-            throw new IOException("Expecting at least one region for table : "
-                    + tableName.getNameAsString());
-          }
-          int count = 0;
-
-          byte[] startRow = scan.getStartRow();
-          byte[] stopRow = scan.getStopRow();
-
-          for (int i = 0; i < keys.getFirst().length; i++) {
-            if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-              continue;
-            }
-
-            if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-                    Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-                    (stopRow.length == 0 || Bytes.compareTo(stopRow,
-                            keys.getFirst()[i]) > 0)) {
-              byte[] splitStart = startRow.length == 0 ||
-                      Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-                      keys.getFirst()[i] : startRow;
-              byte[] splitStop = (stopRow.length == 0 ||
-                      Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-                      keys.getSecond()[i].length > 0 ?
-                      keys.getSecond()[i] : stopRow;
-
-              HRegionLocation hregionLocation = regionLocator.getRegionLocation(
-                      keys.getFirst()[i], false);
-              String regionHostname = hregionLocation.getHostname();
-              HRegionInfo regionInfo = hregionLocation.getRegionInfo();
-              String encodedRegionName = regionInfo.getEncodedName();
-              long regionSize = sizeCalculator.getRegionSize(
-                      regionInfo.getRegionName());
-
-              TableSplit split = new TableSplit(table.getName(),
-                      scan, splitStart, splitStop, regionHostname,
-                      encodedRegionName, regionSize);
-
-              splits.add(split);
-
-              if (LOG.isDebugEnabled())
-                LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
-            }
-          }
-        }
-      }
-    }
-
-    return splits;
-  }
-
-  /**
-   * Test if the given region is to be included in the InputSplit while
-   * splitting the regions of a table.
-   * <p>
-   * This optimization is effective when there is a specific reasoning to
-   * exclude an entire region from the M-R job, (and hence, not contributing to
-   * the InputSplit), given the start and end keys of the same. <br>
-   * Useful when we need to remember the last-processed top record and revisit
-   * the [last, current) interval for M-R processing, continuously. In addition
-   * to reducing InputSplits, reduces the load on the region server as well, due
-   * to the ordering of the keys. <br>
-   * <br>
-   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
-   * (recent) region. <br>
-   * Override this method, if you want to bulk exclude regions altogether from
-   * M-R. By default, no region is excluded( i.e. all regions are included).
-   *
-   * @param startKey Start key of the region
-   * @param endKey End key of the region
-   * @return true, if this region needs to be included as part of the input
-   *         (default).
-   */
-  protected boolean includeRegionInSplit(final byte[] startKey,
-      final byte[] endKey) {
-    return true;
-  }
-
-  /**
-   * Allows subclasses to get the list of {@link Scan} objects.
-   */
-  protected List<Scan> getScans() {
-    return this.scans;
-  }
-
-  /**
-   * Allows subclasses to set the list of {@link Scan} objects.
-   *
-   * @param scans The list of {@link Scan} used to define the input
-   */
-  protected void setScans(List<Scan> scans) {
-    this.scans = scans;
-  }
-
-  /**
-   * Allows subclasses to set the {@link TableRecordReader}.
-   *
-   * @param tableRecordReader A different {@link TableRecordReader}
-   *          implementation.
-   */
-  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
-    this.tableRecordReader = tableRecordReader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
deleted file mode 100644
index 4cc784f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * <p>
- * Hadoop output format that writes to one or more HBase tables. The key is
- * taken to be the table name while the output value <em>must</em> be either a
- * {@link Put} or a {@link Delete} instance. All tables must already exist, and
- * all Puts and Deletes must reference only valid column families.
- * </p>
- *
- * <p>
- * Write-ahead logging (WAL) for Puts can be disabled by setting
- * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
- * Note that disabling write-ahead logging is only appropriate for jobs where
- * loss of data due to region server failure can be tolerated (for example,
- * because it is easy to rerun a bulk import).
- * </p>
- */
-@InterfaceAudience.Public
-public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
-  /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
-  public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
-  /** Property value to use write-ahead logging */
-  public static final boolean WAL_ON = true;
-  /** Property value to disable write-ahead logging */
-  public static final boolean WAL_OFF = false;
-  /**
-   * Record writer for outputting to multiple HTables.
-   */
-  protected static class MultiTableRecordWriter extends
-      RecordWriter<ImmutableBytesWritable, Mutation> {
-    private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
-    Connection connection;
-    Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
-    Configuration conf;
-    boolean useWriteAheadLogging;
-
-    /**
-     * @param conf
-     *          HBaseConfiguration to used
-     * @param useWriteAheadLogging
-     *          whether to use write ahead logging. This can be turned off (
-     *          <tt>false</tt>) to improve performance when bulk loading data.
-     */
-    public MultiTableRecordWriter(Configuration conf,
-        boolean useWriteAheadLogging) throws IOException {
-      LOG.debug("Created new MultiTableRecordReader with WAL "
-          + (useWriteAheadLogging ? "on" : "off"));
-      this.conf = conf;
-      this.useWriteAheadLogging = useWriteAheadLogging;
-    }
-
-    /**
-     * @param tableName
-     *          the name of the table, as a string
-     * @return the named mutator
-     * @throws IOException
-     *           if there is a problem opening a table
-     */
-    BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
-      if(this.connection == null){
-        this.connection = ConnectionFactory.createConnection(conf);
-      }
-      if (!mutatorMap.containsKey(tableName)) {
-        LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
-
-        BufferedMutator mutator =
-            connection.getBufferedMutator(TableName.valueOf(tableName.get()));
-        mutatorMap.put(tableName, mutator);
-      }
-      return mutatorMap.get(tableName);
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException {
-      for (BufferedMutator mutator : mutatorMap.values()) {
-        mutator.close();
-      }
-      if (connection != null) {
-        connection.close();
-      }
-    }
-
-    /**
-     * Writes an action (Put or Delete) to the specified table.
-     *
-     * @param tableName
-     *          the table being updated.
-     * @param action
-     *          the update, either a put or a delete.
-     * @throws IllegalArgumentException
-     *          if the action is not a put or a delete.
-     */
-    @Override
-    public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
-      BufferedMutator mutator = getBufferedMutator(tableName);
-      // The actions are not immutable, so we defensively copy them
-      if (action instanceof Put) {
-        Put put = new Put((Put) action);
-        put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
-            : Durability.SKIP_WAL);
-        mutator.mutate(put);
-      } else if (action instanceof Delete) {
-        Delete delete = new Delete((Delete) action);
-        mutator.mutate(delete);
-      } else
-        throw new IllegalArgumentException(
-            "action must be either Delete or Put");
-    }
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext context) throws IOException,
-      InterruptedException {
-    // we can't know ahead of time if it's going to blow up when the user
-    // passes a table name that doesn't exist, so nothing useful here.
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new TableOutputCommitter();
-  }
-
-  @Override
-  public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
-        conf.getBoolean(WAL_PROPERTY, WAL_ON));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
deleted file mode 100644
index 0f07a58..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * MultiTableSnapshotInputFormat generalizes
- * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
- * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
- * configured for each.
- * Internally, the input format delegates to
- * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
- * and thus has the same performance advantages;
- * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
- * more details.
- * Usage is similar to TableSnapshotInputFormat, with the following exception:
- * initMultiTableSnapshotMapperJob takes in a map
- * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
- * scan will be applied;
- * the overall dataset for the job is defined by the concatenation of the regions and tables
- * included in each snapshot/scan
- * pair.
- * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob
- * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
- * .hadoop.fs.Path)}
- * can be used to configure the job.
- * <pre>{@code
- * Job job = new Job(conf);
- * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
- *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
- *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
- * );
- * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
- * TableMapReduceUtil.initTableSnapshotMapperJob(
- *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
- *      MyMapOutputValueWritable.class, job, true, restoreDir);
- * }
- * </pre>
- * Internally, this input format restores each snapshot into a subdirectory of the given tmp
- * directory. Input splits and
- * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
- * .TableSnapshotInputFormat}
- * (one per region).
- * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
- * permissioning; the
- * same caveats apply here.
- *
- * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
- * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
- */
-@InterfaceAudience.Public
-public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
-
-  private final MultiTableSnapshotInputFormatImpl delegate;
-
-  public MultiTableSnapshotInputFormat() {
-    this.delegate = new MultiTableSnapshotInputFormatImpl();
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext)
-      throws IOException, InterruptedException {
-    List<TableSnapshotInputFormatImpl.InputSplit> splits =
-        delegate.getSplits(jobContext.getConfiguration());
-    List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
-
-    for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
-      rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
-    }
-
-    return rtn;
-  }
-
-  public static void setInput(Configuration configuration,
-      Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
-    new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
deleted file mode 100644
index 4331c0f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.ConfigurationUtil;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Shared implementation of mapreduce code over multiple table snapshots.
- * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
- * .MultiTableSnapshotInputFormat} and mapred
- * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
- */
-@InterfaceAudience.LimitedPrivate({ "HBase" })
-@InterfaceStability.Evolving
-public class MultiTableSnapshotInputFormatImpl {
-
-  private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
-
-  public static final String RESTORE_DIRS_KEY =
-      "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
-  public static final String SNAPSHOT_TO_SCANS_KEY =
-      "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
-
-  /**
-   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
-   * restoreDir.
-   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
-   *
-   * @param conf
-   * @param snapshotScans
-   * @param restoreDir
-   * @throws IOException
-   */
-  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
-      Path restoreDir) throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    setSnapshotToScans(conf, snapshotScans);
-    Map<String, Path> restoreDirs =
-        generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
-    setSnapshotDirs(conf, restoreDirs);
-    restoreSnapshots(conf, restoreDirs, fs);
-  }
-
-  /**
-   * Return the list of splits extracted from the scans/snapshots pushed to conf by
-   * {@link
-   * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
-   *
-   * @param conf Configuration to determine splits from
-   * @return Return the list of splits extracted from the scans/snapshots pushed to conf
-   * @throws IOException
-   */
-  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
-      throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
-
-    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
-    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
-    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
-      String snapshotName = entry.getKey();
-
-      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
-
-      SnapshotManifest manifest =
-          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
-      List<HRegionInfo> regionInfos =
-          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
-
-      for (Scan scan : entry.getValue()) {
-        List<TableSnapshotInputFormatImpl.InputSplit> splits =
-            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
-        rtn.addAll(splits);
-      }
-    }
-    return rtn;
-  }
-
-  /**
-   * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
-   * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
-   *
-   * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
-   * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration
-   * @throws IOException
-   */
-  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
-
-    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
-
-    for (Map.Entry<String, String> entry : ConfigurationUtil
-        .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
-      String snapshotName = entry.getKey();
-      String scan = entry.getValue();
-
-      Collection<Scan> snapshotScans = rtn.get(snapshotName);
-      if (snapshotScans == null) {
-        snapshotScans = Lists.newArrayList();
-        rtn.put(snapshotName, snapshotScans);
-      }
-
-      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
-    }
-
-    return rtn;
-  }
-
-  /**
-   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
-   *
-   * @param conf
-   * @param snapshotScans
-   * @throws IOException
-   */
-  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
-      throws IOException {
-    // flatten out snapshotScans for serialization to the job conf
-    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
-
-    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
-      String snapshotName = entry.getKey();
-      Collection<Scan> scans = entry.getValue();
-
-      // serialize all scans and map them to the appropriate snapshot
-      for (Scan scan : scans) {
-        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
-            TableMapReduceUtil.convertScanToString(scan)));
-      }
-    }
-
-    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
-  }
-
-  /**
-   * Retrieve the directories into which snapshots have been restored from
-   * ({@link #RESTORE_DIRS_KEY})
-   *
-   * @param conf Configuration to extract restore directories from
-   * @return the directories into which snapshots have been restored from
-   * @throws IOException
-   */
-  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
-    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
-    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
-
-    for (Map.Entry<String, String> kvp : kvps) {
-      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
-    }
-
-    return rtn;
-  }
-
-  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
-    Map<String, String> toSet = Maps.newHashMap();
-
-    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
-      toSet.put(entry.getKey(), entry.getValue().toString());
-    }
-
-    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
-  }
-
-  /**
-   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
-   * return a map from the snapshot to the restore directory.
-   *
-   * @param snapshots      collection of snapshot names to restore
-   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
-   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
-   */
-  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
-      Path baseRestoreDir) {
-    Map<String, Path> rtn = Maps.newHashMap();
-
-    for (String snapshotName : snapshots) {
-      Path restoreSnapshotDir =
-          new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
-      rtn.put(snapshotName, restoreSnapshotDir);
-    }
-
-    return rtn;
-  }
-
-  /**
-   * Restore each (snapshot name, restore directory) pair in snapshotToDir
-   *
-   * @param conf          configuration to restore with
-   * @param snapshotToDir mapping from snapshot names to restore directories
-   * @param fs            filesystem to do snapshot restoration on
-   * @throws IOException
-   */
-  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
-      throws IOException {
-    // TODO: restore from record readers to parallelize.
-    Path rootDir = FSUtils.getRootDir(conf);
-
-    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
-      String snapshotName = entry.getKey();
-      Path restoreDir = entry.getValue();
-      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
-          + " for MultiTableSnapshotInputFormat");
-      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
-    }
-  }
-
-  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
-      FileSystem fs) throws IOException {
-    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
deleted file mode 100644
index d1dba1d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-/**
- * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
- * <p>
- * It can be used instead when the Map operation is not CPU
- * bound in order to improve throughput.
- * <p>
- * Mapper implementations using this MapRunnable must be thread-safe.
- * <p>
- * The Map-Reduce job has to be configured with the mapper to use via
- * {@link #setMapperClass} and the number of thread the thread-pool can use with the
- * {@link #getNumberOfThreads} method. The default value is 10 threads.
- * <p>
- */
-
-public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
-  private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
-  private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass;
-  private Context outer;
-  private ExecutorService executor;
-  public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
-  public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
-
-  /**
-   * The number of threads in the thread pool that will run the map function.
-   * @param job the job
-   * @return the number of threads
-   */
-  public static int getNumberOfThreads(JobContext job) {
-    return job.getConfiguration().
-        getInt(NUMBER_OF_THREADS, 10);
-  }
-
-  /**
-   * Set the number of threads in the pool for running maps.
-   * @param job the job to modify
-   * @param threads the new number of threads
-   */
-  public static void setNumberOfThreads(Job job, int threads) {
-    job.getConfiguration().setInt(NUMBER_OF_THREADS,
-        threads);
-  }
-
-  /**
-   * Get the application's mapper class.
-   * @param <K2> the map's output key type
-   * @param <V2> the map's output value type
-   * @param job the job
-   * @return the mapper class to run
-   */
-  @SuppressWarnings("unchecked")
-  public static <K2,V2>
-  Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) {
-    return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>)
-        job.getConfiguration().getClass( MAPPER_CLASS,
-            Mapper.class);
-  }
-
-  /**
-   * Set the application's mapper class.
-   * @param <K2> the map output key type
-   * @param <V2> the map output value type
-   * @param job the job to modify
-   * @param cls the class to use as the mapper
-   */
-  public static <K2,V2>
-  void setMapperClass(Job job,
-      Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) {
-    if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
-      throw new IllegalArgumentException("Can't have recursive " +
-          "MultithreadedTableMapper instances.");
-    }
-    job.getConfiguration().setClass(MAPPER_CLASS,
-        cls, Mapper.class);
-  }
-
-  /**
-   * Run the application's maps using a thread pool.
-   */
-  @Override
-  public void run(Context context) throws IOException, InterruptedException {
-    outer = context;
-    int numberOfThreads = getNumberOfThreads(context);
-    mapClass = getMapperClass(context);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Configuring multithread runner to use " + numberOfThreads +
-          " threads");
-    }
-    executor = Executors.newFixedThreadPool(numberOfThreads);
-    for(int i=0; i < numberOfThreads; ++i) {
-      MapRunner thread = new MapRunner(context);
-      executor.execute(thread);
-    }
-    executor.shutdown();
-    while (!executor.isTerminated()) {
-      // wait till all the threads are done
-      Thread.sleep(1000);
-    }
-  }
-
-  private class SubMapRecordReader
-  extends RecordReader<ImmutableBytesWritable, Result> {
-    private ImmutableBytesWritable key;
-    private Result value;
-    private Configuration conf;
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return 0;
-    }
-
-    @Override
-    public void initialize(InputSplit split,
-        TaskAttemptContext context
-        ) throws IOException, InterruptedException {
-      conf = context.getConfiguration();
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      synchronized (outer) {
-        if (!outer.nextKeyValue()) {
-          return false;
-        }
-        key = ReflectionUtils.copy(outer.getConfiguration(),
-            outer.getCurrentKey(), key);
-        value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
-        return true;
-      }
-    }
-
-    public ImmutableBytesWritable getCurrentKey() {
-      return key;
-    }
-
-    @Override
-    public Result getCurrentValue() {
-      return value;
-    }
-  }
-
-  private class SubMapRecordWriter extends RecordWriter<K2,V2> {
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException,
-    InterruptedException {
-    }
-
-    @Override
-    public void write(K2 key, V2 value) throws IOException,
-    InterruptedException {
-      synchronized (outer) {
-        outer.write(key, value);
-      }
-    }
-  }
-
-  private class SubMapStatusReporter extends StatusReporter {
-
-    @Override
-    public Counter getCounter(Enum<?> name) {
-      return outer.getCounter(name);
-    }
-
-    @Override
-    public Counter getCounter(String group, String name) {
-      return outer.getCounter(group, name);
-    }
-
-    @Override
-    public void progress() {
-      outer.progress();
-    }
-
-    @Override
-    public void setStatus(String status) {
-      outer.setStatus(status);
-    }
-
-    public float getProgress() {
-      return 0;
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
-      justification="Don't understand why FB is complaining about this one. We do throw exception")
-  private class MapRunner implements Runnable {
-    private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
-    private Context subcontext;
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    MapRunner(Context context) throws IOException, InterruptedException {
-      mapper = ReflectionUtils.newInstance(mapClass,
-          context.getConfiguration());
-      try {
-        Constructor c = context.getClass().getConstructor(
-          Mapper.class,
-          Configuration.class,
-          TaskAttemptID.class,
-          RecordReader.class,
-          RecordWriter.class,
-          OutputCommitter.class,
-          StatusReporter.class,
-          InputSplit.class);
-        c.setAccessible(true);
-        subcontext = (Context) c.newInstance(
-          mapper,
-          outer.getConfiguration(), 
-          outer.getTaskAttemptID(),
-          new SubMapRecordReader(),
-          new SubMapRecordWriter(),
-          context.getOutputCommitter(),
-          new SubMapStatusReporter(),
-          outer.getInputSplit());
-      } catch (Exception e) {
-        try {
-          Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
-            Configuration.class,
-            TaskAttemptID.class,
-            RecordReader.class,
-            RecordWriter.class,
-            OutputCommitter.class,
-            StatusReporter.class,
-            InputSplit.class);
-          c.setAccessible(true);
-          MapContext mc = (MapContext) c.newInstance(
-            outer.getConfiguration(), 
-            outer.getTaskAttemptID(),
-            new SubMapRecordReader(),
-            new SubMapRecordWriter(),
-            context.getOutputCommitter(),
-            new SubMapStatusReporter(),
-            outer.getInputSplit());
-          Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
-          Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
-          subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
-        } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
-          // rethrow as IOE
-          throw new IOException(e);
-        }
-      }
-    }
-
-    @Override
-    public void run() {
-      try {
-        mapper.run(subcontext);
-      } catch (Throwable ie) {
-        LOG.error("Problem in running map.", ie);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
deleted file mode 100644
index 8997da9..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class MutationSerialization implements Serialization<Mutation> {
-  @Override
-  public boolean accept(Class<?> c) {
-    return Mutation.class.isAssignableFrom(c);
-  }
-
-  @Override
-  public Deserializer<Mutation> getDeserializer(Class<Mutation> c) {
-    return new MutationDeserializer();
-  }
-
-  @Override
-  public Serializer<Mutation> getSerializer(Class<Mutation> c) {
-    return new MutationSerializer();
-  }
-
-  private static class MutationDeserializer implements Deserializer<Mutation> {
-    private InputStream in;
-
-    @Override
-    public void close() throws IOException {
-      in.close();
-    }
-
-    @Override
-    public Mutation deserialize(Mutation mutation) throws IOException {
-      MutationProto proto = MutationProto.parseDelimitedFrom(in);
-      return ProtobufUtil.toMutation(proto);
-    }
-
-    @Override
-    public void open(InputStream in) throws IOException {
-      this.in = in;
-    }
-    
-  }
-  private static class MutationSerializer implements Serializer<Mutation> {
-    private OutputStream out;
-
-    @Override
-    public void close() throws IOException {
-      out.close();
-    }
-
-    @Override
-    public void open(OutputStream out) throws IOException {
-      this.out = out;
-    }
-
-    @Override
-    public void serialize(Mutation mutation) throws IOException {
-      MutationType type;
-      if (mutation instanceof Put) {
-        type = MutationType.PUT;
-      } else if (mutation instanceof Delete) {
-        type = MutationType.DELETE;
-      } else {
-        throw new IllegalArgumentException("Only Put and Delete are supported");
-      }
-      ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
deleted file mode 100644
index f01e84f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * Combine Puts. Merges Put instances grouped by <code>K</code> into a single
- * instance.
- * @see TableMapReduceUtil
- */
-@InterfaceAudience.Public
-public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
-  private static final Log LOG = LogFactory.getLog(PutCombiner.class);
-
-  @Override
-  protected void reduce(K row, Iterable<Put> vals, Context context)
-      throws IOException, InterruptedException {
-    // Using HeapSize to create an upper bound on the memory size of
-    // the puts and flush some portion of the content while looping. This
-    // flush could result in multiple Puts for a single rowkey. That is
-    // acceptable because Combiner is run as an optimization and it's not
-    // critical that all Puts are grouped perfectly.
-    long threshold = context.getConfiguration().getLong(
-        "putcombiner.row.threshold", 1L * (1<<30));
-    int cnt = 0;
-    long curSize = 0;
-    Put put = null;
-    Map<byte[], List<Cell>> familyMap = null;
-    for (Put p : vals) {
-      cnt++;
-      if (put == null) {
-        put = p;
-        familyMap = put.getFamilyCellMap();
-      } else {
-        for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
-            .entrySet()) {
-          List<Cell> cells = familyMap.get(entry.getKey());
-          List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
-          for (Cell cell : entry.getValue()) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            curSize += kv.heapSize();
-            if (kvs != null) {
-              kvs.add(kv);
-            }
-          }
-          if (cells == null) {
-            familyMap.put(entry.getKey(), entry.getValue());
-          }
-        }
-        if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
-        if (curSize > threshold) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
-          }
-          context.write(row, put);
-          put = null;
-          curSize = 0;
-          cnt = 0;
-        }
-      }
-    }
-    if (put != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
-      }
-      context.write(row, put);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
deleted file mode 100644
index 17ab9cb..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.TagUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.security.visibility.CellVisibility;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Emits sorted Puts.
- * Reads in all Puts from passed Iterator, sorts them, then emits
- * Puts in sorted order.  If lots of columns per row, it will use lots of
- * memory sorting.
- * @see HFileOutputFormat2
- * @see KeyValueSortReducer
- */
-@InterfaceAudience.Public
-public class PutSortReducer extends
-    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
-  // the cell creator
-  private CellCreator kvCreator;
-
-  @Override
-  protected void
-      setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
-          throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    this.kvCreator = new CellCreator(conf);
-  }
-
-  @Override
-  protected void reduce(
-      ImmutableBytesWritable row,
-      java.lang.Iterable<Put> puts,
-      Reducer<ImmutableBytesWritable, Put,
-              ImmutableBytesWritable, KeyValue>.Context context)
-      throws java.io.IOException, InterruptedException
-  {
-    // although reduce() is called per-row, handle pathological case
-    long threshold = context.getConfiguration().getLong(
-        "putsortreducer.row.threshold", 1L * (1<<30));
-    Iterator<Put> iter = puts.iterator();
-    while (iter.hasNext()) {
-      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
-      long curSize = 0;
-      // stop at the end or the RAM threshold
-      List<Tag> tags = new ArrayList<>();
-      while (iter.hasNext() && curSize < threshold) {
-        // clear the tags
-        tags.clear();
-        Put p = iter.next();
-        long t = p.getTTL();
-        if (t != Long.MAX_VALUE) {
-          // add TTL tag if found
-          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
-        }
-        byte[] acl = p.getACL();
-        if (acl != null) {
-          // add ACL tag if found
-          tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
-        }
-        try {
-          CellVisibility cellVisibility = p.getCellVisibility();
-          if (cellVisibility != null) {
-            // add the visibility labels if any
-            tags.addAll(kvCreator.getVisibilityExpressionResolver()
-                .createVisibilityExpTags(cellVisibility.getExpression()));
-          }
-        } catch (DeserializationException e) {
-          // We just throw exception here. Should we allow other mutations to proceed by
-          // just ignoring the bad one?
-          throw new IOException("Invalid visibility expression found in mutation " + p, e);
-        }
-        for (List<Cell> cells: p.getFamilyCellMap().values()) {
-          for (Cell cell: cells) {
-            // Creating the KV which needs to be directly written to HFiles. Using the Facade
-            // KVCreator for creation of kvs.
-            KeyValue kv = null;
-            TagUtil.carryForwardTags(tags, cell);
-            if (!tags.isEmpty()) {
-              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
-                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
-                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
-                cell.getValueOffset(), cell.getValueLength(), tags);
-            } else {
-              kv = KeyValueUtil.ensureKeyValue(cell);
-            }
-            if (map.add(kv)) {// don't count duplicated kv into size
-              curSize += kv.heapSize();
-            }
-          }
-        }
-      }
-      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
-          + "(" + StringUtils.humanReadableInt(curSize) + ")");
-      int index = 0;
-      for (KeyValue kv : map) {
-        context.write(row, kv);
-        if (++index % 100 == 0)
-          context.setStatus("Wrote " + index);
-      }
-
-      // if we have more entries to process
-      if (iter.hasNext()) {
-        // force flush because we cannot guarantee intra-row sorted order
-        context.write(null, null);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
deleted file mode 100644
index dff04b6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class ResultSerialization extends Configured implements Serialization<Result> {
-  private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
-  // The following configuration property indicates import file format version.
-  public static final String IMPORT_FORMAT_VER = "hbase.import.version";
-
-  @Override
-  public boolean accept(Class<?> c) {
-    return Result.class.isAssignableFrom(c);
-  }
-
-  @Override
-  public Deserializer<Result> getDeserializer(Class<Result> c) {
-    // check input format version
-    Configuration conf = getConf();
-    if (conf != null) {
-      String inputVersion = conf.get(IMPORT_FORMAT_VER);
-      if (inputVersion != null && inputVersion.equals("0.94")) {
-        LOG.info("Load exported file using deserializer for HBase 0.94 format");
-        return new Result94Deserializer();
-      }
-    }
-
-    return new ResultDeserializer();
-  }
-
-  @Override
-  public Serializer<Result> getSerializer(Class<Result> c) {
-    return new ResultSerializer();
-  }
-
-  /**
-   * The following deserializer class is used to load exported file of 0.94
-   */
-  private static class Result94Deserializer implements Deserializer<Result> {
-    private DataInputStream in;
-
-    @Override
-    public void close() throws IOException {
-      in.close();
-    }
-
-    @Override
-    public Result deserialize(Result mutation) throws IOException {
-      int totalBuffer = in.readInt();
-      if (totalBuffer == 0) {
-        return Result.EMPTY_RESULT;
-      }
-      byte[] buf = new byte[totalBuffer];
-      readChunked(in, buf, 0, totalBuffer);
-      List<Cell> kvs = new ArrayList<>();
-      int offset = 0;
-      while (offset < totalBuffer) {
-        int keyLength = Bytes.toInt(buf, offset);
-        offset += Bytes.SIZEOF_INT;
-        kvs.add(new KeyValue(buf, offset, keyLength));
-        offset += keyLength;
-      }
-      return Result.create(kvs);
-    }
-
-    @Override
-    public void open(InputStream in) throws IOException {
-      if (!(in instanceof DataInputStream)) {
-        throw new IOException("Wrong input stream instance passed in");
-      }
-      this.in = (DataInputStream) in;
-    }
-
-    private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
-      int maxRead = 8192;
-
-      for (; ofs < len; ofs += maxRead)
-        in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
-    }
-  }
-
-  private static class ResultDeserializer implements Deserializer<Result> {
-    private InputStream in;
-
-    @Override
-    public void close() throws IOException {
-      in.close();
-    }
-
-    @Override
-    public Result deserialize(Result mutation) throws IOException {
-      ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
-      return ProtobufUtil.toResult(proto);
-    }
-
-    @Override
-    public void open(InputStream in) throws IOException {
-      this.in = in;
-    }
-  }
-
-  private static class ResultSerializer implements Serializer<Result> {
-    private OutputStream out;
-
-    @Override
-    public void close() throws IOException {
-      out.close();
-    }
-
-    @Override
-    public void open(OutputStream out) throws IOException {
-      this.out = out;
-    }
-
-    @Override
-    public void serialize(Result result) throws IOException {
-      ProtobufUtil.toResult(result).writeDelimitedTo(out);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
deleted file mode 100644
index 2e0591e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A job with a just a map phase to count rows. Map outputs table rows IF the
- * input row has columns that have content.
- */
-@InterfaceAudience.Public
-public class RowCounter extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(RowCounter.class);
-
-  /** Name of this 'program'. */
-  static final String NAME = "rowcounter";
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-  private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
-
-  /**
-   * Mapper that runs the count.
-   */
-  static class RowCounterMapper
-  extends TableMapper<ImmutableBytesWritable, Result> {
-
-    /** Counter enumeration to count the actual rows. */
-    public static enum Counters {ROWS}
-
-    /**
-     * Maps the data.
-     *
-     * @param row  The current table row key.
-     * @param values  The columns.
-     * @param context  The current context.
-     * @throws IOException When something is broken with the data.
-     * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context)
-     */
-    @Override
-    public void map(ImmutableBytesWritable row, Result values,
-      Context context)
-    throws IOException {
-      // Count every row containing data, whether it's in qualifiers or values
-      context.getCounter(Counters.ROWS).increment(1);
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException {
-    String tableName = args[0];
-    List<MultiRowRangeFilter.RowRange> rowRangeList = null;
-    long startTime = 0;
-    long endTime = 0;
-
-    StringBuilder sb = new StringBuilder();
-
-    final String rangeSwitch = "--range=";
-    final String startTimeArgKey = "--starttime=";
-    final String endTimeArgKey = "--endtime=";
-    final String expectedCountArg = "--expected-count=";
-
-    // First argument is table name, starting from second
-    for (int i = 1; i < args.length; i++) {
-      if (args[i].startsWith(rangeSwitch)) {
-        try {
-          rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
-        } catch (IllegalArgumentException e) {
-          return null;
-        }
-        continue;
-      }
-      if (args[i].startsWith(startTimeArgKey)) {
-        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
-        continue;
-      }
-      if (args[i].startsWith(endTimeArgKey)) {
-        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
-        continue;
-      }
-      if (args[i].startsWith(expectedCountArg)) {
-        conf.setLong(EXPECTED_COUNT_KEY,
-            Long.parseLong(args[i].substring(expectedCountArg.length())));
-        continue;
-      }
-      // if no switch, assume column names
-      sb.append(args[i]);
-      sb.append(" ");
-    }
-    if (endTime < startTime) {
-      printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
-      return null;
-    }
-
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJarByClass(RowCounter.class);
-    Scan scan = new Scan();
-    scan.setCacheBlocks(false);
-    setScanFilter(scan, rowRangeList);
-    if (sb.length() > 0) {
-      for (String columnName : sb.toString().trim().split(" ")) {
-        String family = StringUtils.substringBefore(columnName, ":");
-        String qualifier = StringUtils.substringAfter(columnName, ":");
-
-        if (StringUtils.isBlank(qualifier)) {
-          scan.addFamily(Bytes.toBytes(family));
-        }
-        else {
-          scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
-        }
-      }
-    }
-    scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
-    job.setNumReduceTasks(0);
-    return job;
-  }
-
-  private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
-    String arg, String rangeSwitch) {
-    final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
-    final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
-    for (String range : ranges) {
-      String[] startEnd = range.split(",", 2);
-      if (startEnd.length != 2 || startEnd[1].contains(",")) {
-        printUsage("Please specify range in such format as \"--range=a,b\" " +
-            "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
-        throw new IllegalArgumentException("Wrong range specification: " + range);
-      }
-      String startKey = startEnd[0];
-      String endKey = startEnd[1];
-      rangeList.add(new MultiRowRangeFilter.RowRange(
-        Bytes.toBytesBinary(startKey), true,
-        Bytes.toBytesBinary(endKey), false));
-    }
-    return rangeList;
-  }
-
-  /**
-   * Sets filter {@link FilterBase} to the {@link Scan} instance.
-   * If provided rowRangeList contains more than one element,
-   * method sets filter which is instance of {@link MultiRowRangeFilter}.
-   * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
-   * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
-   * @param scan
-   * @param rowRangeList
-   */
-  private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
-    final int size = rowRangeList == null ? 0 : rowRangeList.size();
-    if (size <= 1) {
-      scan.setFilter(new FirstKeyOnlyFilter());
-    }
-    if (size == 1) {
-      MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
-      scan.setStartRow(range.getStartRow()); //inclusive
-      scan.setStopRow(range.getStopRow());   //exclusive
-    } else if (size > 1) {
-      scan.setFilter(new MultiRowRangeFilter(rowRangeList));
-    }
-  }
-
-  /*
-   * @param errorMessage Can attach a message when error occurs.
-   */
-  private static void printUsage(String errorMessage) {
-    System.err.println("ERROR: " + errorMessage);
-    printUsage();
-  }
-
-  /**
-   * Prints usage without error message.
-   * Note that we don't document --expected-count, because it's intended for test.
-   */
-  private static void printUsage() {
-    System.err.println("Usage: RowCounter [options] <tablename> " +
-        "[--starttime=[start] --endtime=[end] " +
-        "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
-    System.err.println("For performance consider the following options:\n"
-        + "-Dhbase.client.scanner.caching=100\n"
-        + "-Dmapreduce.map.speculative=false");
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 1) {
-      printUsage("Wrong number of parameters: " + args.length);
-      return -1;
-    }
-    Job job = createSubmittableJob(getConf(), args);
-    if (job == null) {
-      return -1;
-    }
-    boolean success = job.waitForCompletion(true);
-    final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1);
-    if (success && expectedCount != -1) {
-      final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
-      success = expectedCount == counter.getValue();
-      if (!success) {
-        LOG.error("Failing job because count of '" + counter.getValue() +
-            "' does not match expected count of '" + expectedCount + "'");
-      }
-    }
-    return (success ? 0 : 1);
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args);
-    System.exit(errCode);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
deleted file mode 100644
index 4ba1088..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partitioner that takes start and end keys and uses bigdecimal to figure
- * which reduce a key belongs to.  Pass the start and end
- * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
- * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
- * exclusive; i.e. one larger than the biggest key in your key space.
- * You may be surprised at how this class partitions the space; it may not
- * align with preconceptions; e.g. a start key of zero and an end key of 100
- * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
- * Make your own partitioner if you need the region spacing to come out a
- * particular way.
- * @param <VALUE>
- * @see #START
- * @see #END
- */
-@InterfaceAudience.Public
-public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
-implements Configurable {
-  private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
-
-  @Deprecated
-  public static final String START = "hbase.simpletotalorder.start";
-  @Deprecated
-  public static final String END = "hbase.simpletotalorder.end";
-  
-  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
-  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
-  
-  private Configuration c;
-  private byte [] startkey;
-  private byte [] endkey;
-  private byte [][] splits;
-  private int lastReduces = -1;
-
-  public static void setStartKey(Configuration conf, byte[] startKey) {
-    conf.set(START_BASE64, Base64.encodeBytes(startKey));
-  }
-  
-  public static void setEndKey(Configuration conf, byte[] endKey) {
-    conf.set(END_BASE64, Base64.encodeBytes(endKey));
-  }
-  
-  @SuppressWarnings("deprecation")
-  static byte[] getStartKey(Configuration conf) {
-    return getKeyFromConf(conf, START_BASE64, START);
-  }
-  
-  @SuppressWarnings("deprecation")
-  static byte[] getEndKey(Configuration conf) {
-    return getKeyFromConf(conf, END_BASE64, END);
-  }
-  
-  private static byte[] getKeyFromConf(Configuration conf,
-      String base64Key, String deprecatedKey) {
-    String encoded = conf.get(base64Key);
-    if (encoded != null) {
-      return Base64.decode(encoded);
-    }
-    String oldStyleVal = conf.get(deprecatedKey);
-    if (oldStyleVal == null) {
-      return null;
-    }
-    LOG.warn("Using deprecated configuration " + deprecatedKey +
-        " - please use static accessor methods instead.");
-    return Bytes.toBytesBinary(oldStyleVal);
-  }
-  
-  @Override
-  public int getPartition(final ImmutableBytesWritable key, final VALUE value,
-      final int reduces) {
-    if (reduces == 1) return 0;
-    if (this.lastReduces != reduces) {
-      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
-      for (int i = 0; i < splits.length; i++) {
-        LOG.info(Bytes.toStringBinary(splits[i]));
-      }
-      this.lastReduces = reduces;
-    }
-    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
-      key.getLength());
-    // Below code is from hfile index search.
-    if (pos < 0) {
-      pos++;
-      pos *= -1;
-      if (pos == 0) {
-        // falls before the beginning of the file.
-        throw new RuntimeException("Key outside start/stop range: " +
-          key.toString());
-      }
-      pos--;
-    }
-    return pos;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return this.c;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.c = conf;
-    this.startkey = getStartKey(conf);
-    this.endkey = getEndKey(conf);
-    if (startkey == null || endkey == null) {
-      throw new RuntimeException(this.getClass() + " not configured");
-    }
-    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
-        ", endkey=" + Bytes.toStringBinary(endkey));
-    // Reset last reduces count on change of Start / End key
-    this.lastReduces = -1;
-  }
-}


Mime
View raw message