hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [20/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.
Date Sat, 26 Aug 2017 08:56:06 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java
deleted file mode 100644
index 43560fd..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A job with a map to count rows.
- * Map outputs table rows IF the input row has columns that have content.
- * Uses a org.apache.hadoop.mapred.lib.IdentityReducer
- */
-@InterfaceAudience.Public
-public class RowCounter extends Configured implements Tool {
-  // Name of this 'program'
-  static final String NAME = "rowcounter";
-
-  /**
-   * Mapper that runs the count.
-   */
-  static class RowCounterMapper
-  implements TableMap<ImmutableBytesWritable, Result> {
-    private static enum Counters {ROWS}
-
-    public void map(ImmutableBytesWritable row, Result values,
-        OutputCollector<ImmutableBytesWritable, Result> output,
-        Reporter reporter)
-    throws IOException {
-        // Count every row containing data, whether it's in qualifiers or values
-        reporter.incrCounter(Counters.ROWS, 1);
-    }
-
-    public void configure(JobConf jc) {
-      // Nothing to do.
-    }
-
-    public void close() throws IOException {
-      // Nothing to do.
-    }
-  }
-
-  /**
-   * @param args
-   * @return the JobConf
-   * @throws IOException
-   */
-  public JobConf createSubmittableJob(String[] args) throws IOException {
-    JobConf c = new JobConf(getConf(), getClass());
-    c.setJobName(NAME);
-    // Columns are space delimited
-    StringBuilder sb = new StringBuilder();
-    final int columnoffset = 2;
-    for (int i = columnoffset; i < args.length; i++) {
-      if (i > columnoffset) {
-        sb.append(" ");
-      }
-      sb.append(args[i]);
-    }
-    // Second argument is the table name.
-    TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
-      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
-    c.setNumReduceTasks(0);
-    // First arg is the output directory.
-    FileOutputFormat.setOutputPath(c, new Path(args[0]));
-    return c;
-  }
-
-  static int printUsage() {
-    System.out.println(NAME +
-      " <outputdir> <tablename> <column1> [<column2>...]");
-    return -1;
-  }
-
-  public int run(final String[] args) throws Exception {
-    // Make sure there are at least 3 parameters
-    if (args.length < 3) {
-      System.err.println("ERROR: Wrong number of parameters: " + args.length);
-      return printUsage();
-    }
-    JobClient.runJob(createSubmittableJob(args));
-    return 0;
-  }
-
-  /**
-   * @param args
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args);
-    System.exit(errCode);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
deleted file mode 100644
index 208849a..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Convert HBase tabular data into a format that is consumable by Map/Reduce.
- */
-@InterfaceAudience.Public
-public class TableInputFormat extends TableInputFormatBase implements
-    JobConfigurable {
-  private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
-  /**
-   * space delimited list of columns
-   */
-  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
-
-  public void configure(JobConf job) {
-    try {
-      initialize(job);
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-    }
-  }
-
-  @Override
-  protected void initialize(JobConf job) throws IOException {
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    String colArg = job.get(COLUMN_LIST);
-    String[] colNames = colArg.split(" ");
-    byte [][] m_cols = new byte[colNames.length][];
-    for (int i = 0; i < m_cols.length; i++) {
-      m_cols[i] = Bytes.toBytes(colNames[i]);
-    }
-    setInputColumns(m_cols);
-    Connection connection = ConnectionFactory.createConnection(job);
-    initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
-  }
-
-  public void validateInput(JobConf job) throws IOException {
-    // expecting exactly one path
-    Path [] tableNames = FileInputFormat.getInputPaths(job);
-    if (tableNames == null || tableNames.length > 1) {
-      throw new IOException("expecting one table name");
-    }
-
-    // connected to table?
-    if (getTable() == null) {
-      throw new IOException("could not connect to table '" +
-        tableNames[0].getName() + "'");
-    }
-
-    // expecting at least one column
-    String colArg = job.get(COLUMN_LIST);
-    if (colArg == null || colArg.length() == 0) {
-      throw new IOException("expecting at least one column");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
deleted file mode 100644
index c65810f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a
- * byte[] of input columns and optionally a {@link Filter}.
- * Subclasses may use other TableRecordReader implementations.
- *
- * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
- * function properly. Each of the entry points to this class used by the MapReduce framework,
- * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
- * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
- * retrieving the necessary configuration information. If your subclass overrides either of these
- * methods, either call the parent version or call initialize yourself.
- *
- * <p>
- * An example of a subclass:
- * <pre>
- *   class ExampleTIF extends TableInputFormatBase {
- *
- *     {@literal @}Override
- *     protected void initialize(JobConf context) throws IOException {
- *       // We are responsible for the lifecycle of this connection until we hand it over in
- *       // initializeTable.
- *       Connection connection =
- *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
- *       initializeTable(connection, tableName);
- *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *         Bytes.toBytes("columnB") };
- *       // mandatory
- *       setInputColumns(inputColumns);
- *       // optional, by default we'll get everything for the given columns.
- *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- *       setRowFilter(exampleFilter);
- *     }
- *   }
- * </pre>
- */
-
-@InterfaceAudience.Public
-public abstract class TableInputFormatBase
-implements InputFormat<ImmutableBytesWritable, Result> {
-  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
-  private byte [][] inputColumns;
-  private Table table;
-  private RegionLocator regionLocator;
-  private Connection connection;
-  private TableRecordReader tableRecordReader;
-  private Filter rowFilter;
-
-  private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
-      "initialized. Ensure you call initializeTable either in your constructor or initialize " +
-      "method";
-  private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
-            " previous error. Please look at the previous logs lines from" +
-            " the task's full log for more details.";
-
-  /**
-   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
-   * the default.
-   *
-   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
-   *      JobConf, Reporter)
-   */
-  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
-      InputSplit split, JobConf job, Reporter reporter)
-  throws IOException {
-    // In case a subclass uses the deprecated approach or calls initializeTable directly
-    if (table == null) {
-      initialize(job);
-    }
-    // null check in case our child overrides getTable to not throw.
-    try {
-      if (getTable() == null) {
-        // initialize() must not have been implemented in the subclass.
-        throw new IOException(INITIALIZATION_ERROR);
-      }
-    } catch (IllegalStateException exception) {
-      throw new IOException(INITIALIZATION_ERROR, exception);
-    }
-
-    TableSplit tSplit = (TableSplit) split;
-    // if no table record reader was provided use default
-    final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
-        this.tableRecordReader;
-    trr.setStartRow(tSplit.getStartRow());
-    trr.setEndRow(tSplit.getEndRow());
-    trr.setHTable(this.table);
-    trr.setInputColumns(this.inputColumns);
-    trr.setRowFilter(this.rowFilter);
-    trr.init();
-    return new RecordReader<ImmutableBytesWritable, Result>() {
-
-      @Override
-      public void close() throws IOException {
-        trr.close();
-        closeTable();
-      }
-
-      @Override
-      public ImmutableBytesWritable createKey() {
-        return trr.createKey();
-      }
-
-      @Override
-      public Result createValue() {
-        return trr.createValue();
-      }
-
-      @Override
-      public long getPos() throws IOException {
-        return trr.getPos();
-      }
-
-      @Override
-      public float getProgress() throws IOException {
-        return trr.getProgress();
-      }
-
-      @Override
-      public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
-        return trr.next(key, value);
-      }
-    };
-  }
-
-  /**
-   * Calculates the splits that will serve as input for the map tasks.
-   *
-   * Splits are created in number equal to the smallest between numSplits and
-   * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. 
-   * If the number of splits is smaller than the number of 
-   * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
-   * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s 
-   * and are grouped the most evenly possible. In the
-   * case splits are uneven the bigger splits are placed first in the
-   * {@link InputSplit} array.
-   *
-   * @param job the map task {@link JobConf}
-   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
-   *
-   * @return the input splits
-   *
-   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
-   */
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    if (this.table == null) {
-      initialize(job);
-    }
-    // null check in case our child overrides getTable to not throw.
-    try {
-      if (getTable() == null) {
-        // initialize() must not have been implemented in the subclass.
-        throw new IOException(INITIALIZATION_ERROR);
-      }
-    } catch (IllegalStateException exception) {
-      throw new IOException(INITIALIZATION_ERROR, exception);
-    }
-
-    byte [][] startKeys = this.regionLocator.getStartKeys();
-    if (startKeys == null || startKeys.length == 0) {
-      throw new IOException("Expecting at least one region");
-    }
-    if (this.inputColumns == null || this.inputColumns.length == 0) {
-      throw new IOException("Expecting at least one column");
-    }
-    int realNumSplits = numSplits > startKeys.length? startKeys.length:
-      numSplits;
-    InputSplit[] splits = new InputSplit[realNumSplits];
-    int middle = startKeys.length / realNumSplits;
-    int startPos = 0;
-    for (int i = 0; i < realNumSplits; i++) {
-      int lastPos = startPos + middle;
-      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
-      String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
-        getHostname();
-      splits[i] = new TableSplit(this.table.getName(),
-        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
-          HConstants.EMPTY_START_ROW, regionLocation);
-      LOG.info("split: " + i + "->" + splits[i]);
-      startPos = lastPos;
-    }
-    return splits;
-  }
-
-  /**
-   * Allows subclasses to initialize the table information.
-   *
-   * @param connection  The Connection to the HBase cluster. MUST be unmanaged. We will close.
-   * @param tableName  The {@link TableName} of the table to process.
-   * @throws IOException
-   */
-  protected void initializeTable(Connection connection, TableName tableName) throws IOException {
-    if (this.table != null || this.connection != null) {
-      LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
-          "reference; TableInputFormatBase will not close these old references when done.");
-    }
-    this.table = connection.getTable(tableName);
-    this.regionLocator = connection.getRegionLocator(tableName);
-    this.connection = connection;
-  }
-
-  /**
-   * @param inputColumns to be passed in {@link Result} to the map task.
-   */
-  protected void setInputColumns(byte [][] inputColumns) {
-    this.inputColumns = inputColumns;
-  }
-
-  /**
-   * Allows subclasses to get the {@link Table}.
-   */
-  protected Table getTable() {
-    if (table == null) {
-      throw new IllegalStateException(NOT_INITIALIZED);
-    }
-    return this.table;
-  }
-
-  /**
-   * Allows subclasses to set the {@link TableRecordReader}.
-   *
-   * @param tableRecordReader
-   *                to provide other {@link TableRecordReader} implementations.
-   */
-  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
-    this.tableRecordReader = tableRecordReader;
-  }
-
-  /**
-   * Allows subclasses to set the {@link Filter} to be used.
-   *
-   * @param rowFilter
-   */
-  protected void setRowFilter(Filter rowFilter) {
-    this.rowFilter = rowFilter;
-  }
-
-  /**
-   * Handle subclass specific set up.
-   * Each of the entry points used by the MapReduce framework,
-   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
-   * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
-   * retrieving the necessary configuration information and calling
-   * {@link #initializeTable(Connection, TableName)}.
-   *
-   * Subclasses should implement their initialize call such that it is safe to call multiple times.
-   * The current TableInputFormatBase implementation relies on a non-null table reference to decide
-   * if an initialize call is needed, but this behavior may change in the future. In particular,
-   * it is critical that initializeTable not be called multiple times since this will leak
-   * Connection instances.
-   *
-   */
-  protected void initialize(JobConf job) throws IOException {
-  }
-
-  /**
-   * Close the Table and related objects that were initialized via
-   * {@link #initializeTable(Connection, TableName)}.
-   *
-   * @throws IOException
-   */
-  protected void closeTable() throws IOException {
-    close(table, connection);
-    table = null;
-    connection = null;
-  }
-
-  private void close(Closeable... closables) throws IOException {
-    for (Closeable c : closables) {
-      if(c != null) { c.close(); }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
deleted file mode 100644
index a9f1e61..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Mapper;
-
-/**
- * Scan an HBase table to sort by a specified sort column.
- * If the column does not exist, the record is not passed to Reduce.
- *
- * @param <K> WritableComparable key class
- * @param <V> Writable value class
- */
-@InterfaceAudience.Public
-public interface TableMap<K extends WritableComparable<? super K>, V>
-extends Mapper<ImmutableBytesWritable, Result, K, V> {
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
deleted file mode 100644
index 63ec418..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
-import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Utility for {@link TableMap} and {@link TableReduce}
- */
-@InterfaceAudience.Public
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class TableMapReduceUtil {
-
-  /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The table name to read from.
-   * @param columns  The columns to scan.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job configuration to adjust.
-   */
-  public static void initTableMapJob(String table, String columns,
-    Class<? extends TableMap> mapper,
-    Class<?> outputKeyClass,
-    Class<?> outputValueClass, JobConf job) {
-    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
-      true, TableInputFormat.class);
-  }
-
-  public static void initTableMapJob(String table, String columns,
-    Class<? extends TableMap> mapper,
-    Class<?> outputKeyClass,
-    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
-    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
-      addDependencyJars, TableInputFormat.class);
-  }
-
-  /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The table name to read from.
-   * @param columns  The columns to scan.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job configuration to adjust.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   */
-  public static void initTableMapJob(String table, String columns,
-    Class<? extends TableMap> mapper,
-    Class<?> outputKeyClass,
-    Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
-    Class<? extends InputFormat> inputFormat) {
-
-    job.setInputFormat(inputFormat);
-    job.setMapOutputValueClass(outputValueClass);
-    job.setMapOutputKeyClass(outputKeyClass);
-    job.setMapperClass(mapper);
-    job.setStrings("io.serializations", job.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName());
-    FileInputFormat.addInputPaths(job, table);
-    job.set(TableInputFormat.COLUMN_LIST, columns);
-    if (addDependencyJars) {
-      try {
-        addDependencyJars(job);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-    try {
-      initCredentials(job);
-    } catch (IOException ioe) {
-      // just spit out the stack trace?  really?
-      ioe.printStackTrace();
-    }
-  }
-
-  /**
-   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
-   * per snapshot.
-   * It bypasses hbase servers and read directly from snapshot files.
-   *
-   * @param snapshotScans     map of snapshot name to scans on that snapshot.
-   * @param mapper            The mapper class to use.
-   * @param outputKeyClass    The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job               The current job to adjust.  Make sure the passed job is
-   *                          carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *                          job classes via the distributed cache (tmpjars).
-   */
-  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
-      Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
-      JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
-    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
-
-    job.setInputFormat(MultiTableSnapshotInputFormat.class);
-    if (outputValueClass != null) {
-      job.setMapOutputValueClass(outputValueClass);
-    }
-    if (outputKeyClass != null) {
-      job.setMapOutputKeyClass(outputKeyClass);
-    }
-    job.setMapperClass(mapper);
-    if (addDependencyJars) {
-      addDependencyJars(job);
-    }
-
-    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
-  }
-
-  /**
-   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
-   * and read directly from snapshot files.
-   *
-   * @param snapshotName The name of the snapshot (of a table) to read from.
-   * @param columns  The columns to scan.
-   * @param mapper  The mapper class to use.
-   * @param outputKeyClass  The class of the output key.
-   * @param outputValueClass  The class of the output value.
-   * @param job  The current job to adjust.  Make sure the passed job is
-   * carrying all necessary HBase configuration.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restore directory can be deleted.
-   * @throws IOException When setting up the details fails.
-   * @see TableSnapshotInputFormat
-   */
-  public static void initTableSnapshotMapJob(String snapshotName, String columns,
-      Class<? extends TableMap> mapper,
-      Class<?> outputKeyClass,
-      Class<?> outputValueClass, JobConf job,
-      boolean addDependencyJars, Path tmpRestoreDir)
-  throws IOException {
-    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
-    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
-      addDependencyJars, TableSnapshotInputFormat.class);
-    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReduceJob(String table,
-    Class<? extends TableReduce> reducer, JobConf job)
-  throws IOException {
-    initTableReduceJob(table, reducer, job, null);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job configuration to adjust.
-   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
-   * default partitioner.
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReduceJob(String table,
-    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
-  throws IOException {
-    initTableReduceJob(table, reducer, job, partitioner, true);
-  }
-
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   *
-   * @param table  The output table.
-   * @param reducer  The reducer class to use.
-   * @param job  The current job configuration to adjust.
-   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
-   * default partitioner.
-   * @param addDependencyJars upload HBase jars and jars for any of the configured
-   *           job classes via the distributed cache (tmpjars).
-   * @throws IOException When determining the region count fails.
-   */
-  public static void initTableReduceJob(String table,
-    Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
-    boolean addDependencyJars) throws IOException {
-    job.setOutputFormat(TableOutputFormat.class);
-    job.setReducerClass(reducer);
-    job.set(TableOutputFormat.OUTPUT_TABLE, table);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(Put.class);
-    job.setStrings("io.serializations", job.get("io.serializations"),
-        MutationSerialization.class.getName(), ResultSerialization.class.getName());
-    if (partitioner == HRegionPartitioner.class) {
-      job.setPartitionerClass(HRegionPartitioner.class);
-      int regions =
-        MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
-      if (job.getNumReduceTasks() > regions) {
-        job.setNumReduceTasks(regions);
-      }
-    } else if (partitioner != null) {
-      job.setPartitionerClass(partitioner);
-    }
-    if (addDependencyJars) {
-      addDependencyJars(job);
-    }
-    initCredentials(job);
-  }
-
-  public static void initCredentials(JobConf job) throws IOException {
-    UserProvider userProvider = UserProvider.instantiate(job);
-    if (userProvider.isHadoopSecurityEnabled()) {
-      // propagate delegation related props from launcher job to MR job
-      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
-      }
-    }
-
-    if (userProvider.isHBaseSecurityEnabled()) {
-      Connection conn = ConnectionFactory.createConnection(job);
-      try {
-        // login the server principal (if using secure Hadoop)
-        User user = userProvider.getCurrent();
-        TokenUtil.addTokenForJob(conn, job, user);
-      } catch (InterruptedException ie) {
-        ie.printStackTrace();
-        Thread.currentThread().interrupt();
-      } finally {
-        conn.close();
-      }
-    }
-  }
-
-  /**
-   * Ensures that the given number of reduce tasks for the given job
-   * configuration does not exceed the number of regions for the given table.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  // Used by tests.
-  public static void limitNumReduceTasks(String table, JobConf job)
-  throws IOException {
-    int regions =
-      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
-    if (job.getNumReduceTasks() > regions)
-      job.setNumReduceTasks(regions);
-  }
-
-  /**
-   * Ensures that the given number of map tasks for the given job
-   * configuration does not exceed the number of regions for the given table.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  // Used by tests.
-  public static void limitNumMapTasks(String table, JobConf job)
-  throws IOException {
-    int regions =
-      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
-    if (job.getNumMapTasks() > regions)
-      job.setNumMapTasks(regions);
-  }
-
-  /**
-   * Sets the number of reduce tasks for the given job configuration to the
-   * number of regions the given table has.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void setNumReduceTasks(String table, JobConf job)
-  throws IOException {
-    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
-      TableName.valueOf(table)));
-  }
-
-  /**
-   * Sets the number of map tasks for the given job configuration to the
-   * number of regions the given table has.
-   *
-   * @param table  The table to get the region count for.
-   * @param job  The current job configuration to adjust.
-   * @throws IOException When retrieving the table details fails.
-   */
-  public static void setNumMapTasks(String table, JobConf job)
-  throws IOException {
-    job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
-      TableName.valueOf(table)));
-  }
-
-  /**
-   * Sets the number of rows to return and cache with each scanner iteration.
-   * Higher caching values will enable faster mapreduce jobs at the expense of
-   * requiring more heap to contain the cached rows.
-   *
-   * @param job The current job configuration to adjust.
-   * @param batchSize The number of rows to return in batch with each scanner
-   * iteration.
-   */
-  public static void setScannerCaching(JobConf job, int batchSize) {
-    job.setInt("hbase.client.scanner.caching", batchSize);
-  }
-
-  /**
-   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
-   */
-  public static void addDependencyJars(JobConf job) throws IOException {
-    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
-    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
-      job,
-      // when making changes here, consider also mapreduce.TableMapReduceUtil
-      // pull job classes
-      job.getMapOutputKeyClass(),
-      job.getMapOutputValueClass(),
-      job.getOutputKeyClass(),
-      job.getOutputValueClass(),
-      job.getPartitionerClass(),
-      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
-      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
-      job.getCombinerClass());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
deleted file mode 100644
index 8878eee..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Convert Map/Reduce output and write it to an HBase table
- */
-@InterfaceAudience.Public
-public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
-
-  /** JobConf parameter that specifies the output table */
-  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
-
-  /**
-   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
-   * and write to an HBase table.
-   */
-  protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
-    private BufferedMutator m_mutator;
-    private Connection conn;
-
-
-    /**
-     * Instantiate a TableRecordWriter with the HBase HClient for writing.
-     *
-     * @deprecated Please use {@code #TableRecordWriter(JobConf)}  This version does not clean up
-     * connections and will leak connections (removed in 2.0)
-     */
-    @Deprecated
-    public TableRecordWriter(final BufferedMutator mutator) throws IOException {
-      this.m_mutator = mutator;
-      this.conn = null;
-    }
-
-    /**
-     * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
-     */
-    public TableRecordWriter(JobConf job) throws IOException {
-      // expecting exactly one path
-      TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
-      try {
-        this.conn = ConnectionFactory.createConnection(job);
-        this.m_mutator = conn.getBufferedMutator(tableName);
-      } finally {
-        if (this.m_mutator == null) {
-          conn.close();
-          conn = null;
-        }
-      }
-    }
-
-    public void close(Reporter reporter) throws IOException {
-      try {
-        if (this.m_mutator != null) {
-          this.m_mutator.close();
-        }
-      } finally {
-        if (conn != null) {
-          this.conn.close();
-        }
-      }
-    }
-
-    public void write(ImmutableBytesWritable key, Put value) throws IOException {
-      m_mutator.mutate(new Put(value));
-    }
-  }
-
-  /**
-   * Creates a new record writer.
-   * 
-   * Be aware that the baseline javadoc gives the impression that there is a single
-   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
-   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
-   * Failure to do so will drop writes.
-   *
-   * @param ignored Ignored filesystem
-   * @param job Current JobConf
-   * @param name Name of the job
-   * @param progress
-   * @return The newly created writer instance.
-   * @throws IOException When creating the writer fails.
-   */
-  @Override
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
-      Progressable progress)
-  throws IOException {
-    // Clear write buffer on fail is true by default so no need to reset it.
-    return new TableRecordWriter(job);
-  }
-
-  @Override
-  public void checkOutputSpecs(FileSystem ignored, JobConf job)
-  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
-    String tableName = job.get(OUTPUT_TABLE);
-    if (tableName == null) {
-      throw new IOException("Must specify table name");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
deleted file mode 100644
index cecef7d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.RecordReader;
-
-
-/**
- * Iterate over an HBase table data, return (Text, RowResult) pairs
- */
-@InterfaceAudience.Public
-public class TableRecordReader
-implements RecordReader<ImmutableBytesWritable, Result> {
-
-  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
-
-  /**
-   * Restart from survivable exceptions by creating a new scanner.
-   *
-   * @param firstRow
-   * @throws IOException
-   */
-  public void restart(byte[] firstRow) throws IOException {
-    this.recordReaderImpl.restart(firstRow);
-  }
-
-  /**
-   * Build the scanner. Not done in constructor to allow for extension.
-   *
-   * @throws IOException
-   */
-  public void init() throws IOException {
-    this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow());
-  }
-
-  /**
-   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
-   */
-  public void setHTable(Table htable) {
-    this.recordReaderImpl.setHTable(htable);
-  }
-
-  /**
-   * @param inputColumns the columns to be placed in {@link Result}.
-   */
-  public void setInputColumns(final byte [][] inputColumns) {
-    this.recordReaderImpl.setInputColumns(inputColumns);
-  }
-
-  /**
-   * @param startRow the first row in the split
-   */
-  public void setStartRow(final byte [] startRow) {
-    this.recordReaderImpl.setStartRow(startRow);
-  }
-
-  /**
-   *
-   * @param endRow the last row in the split
-   */
-  public void setEndRow(final byte [] endRow) {
-    this.recordReaderImpl.setEndRow(endRow);
-  }
-
-  /**
-   * @param rowFilter the {@link Filter} to be used.
-   */
-  public void setRowFilter(Filter rowFilter) {
-    this.recordReaderImpl.setRowFilter(rowFilter);
-  }
-
-  public void close() {
-    this.recordReaderImpl.close();
-  }
-
-  /**
-   * @return ImmutableBytesWritable
-   *
-   * @see org.apache.hadoop.mapred.RecordReader#createKey()
-   */
-  public ImmutableBytesWritable createKey() {
-    return this.recordReaderImpl.createKey();
-  }
-
-  /**
-   * @return RowResult
-   *
-   * @see org.apache.hadoop.mapred.RecordReader#createValue()
-   */
-  public Result createValue() {
-    return this.recordReaderImpl.createValue();
-  }
-
-  public long getPos() {
-
-    // This should be the ordinal tuple in the range;
-    // not clear how to calculate...
-    return this.recordReaderImpl.getPos();
-  }
-
-  public float getProgress() {
-    // Depends on the total number of tuples and getPos
-    return this.recordReaderImpl.getPos();
-  }
-
-  /**
-   * @param key HStoreKey as input key.
-   * @param value MapWritable as input value
-   * @return true if there was more data
-   * @throws IOException
-   */
-  public boolean next(ImmutableBytesWritable key, Result value)
-  throws IOException {
-    return this.recordReaderImpl.next(key, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
deleted file mode 100644
index f6b79c3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
-
-import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
-
-/**
- * Iterate over an HBase table data, return (Text, RowResult) pairs
- */
-@InterfaceAudience.Public
-public class TableRecordReaderImpl {
-  private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
-
-  private byte [] startRow;
-  private byte [] endRow;
-  private byte [] lastSuccessfulRow;
-  private Filter trrRowFilter;
-  private ResultScanner scanner;
-  private Table htable;
-  private byte [][] trrInputColumns;
-  private long timestamp;
-  private int rowcount;
-  private boolean logScannerActivity = false;
-  private int logPerRowCount = 100;
-
-  /**
-   * Restart from survivable exceptions by creating a new scanner.
-   *
-   * @param firstRow
-   * @throws IOException
-   */
-  public void restart(byte[] firstRow) throws IOException {
-    Scan currentScan;
-    if ((endRow != null) && (endRow.length > 0)) {
-      if (trrRowFilter != null) {
-        Scan scan = new Scan(firstRow, endRow);
-        TableInputFormat.addColumns(scan, trrInputColumns);
-        scan.setFilter(trrRowFilter);
-        scan.setCacheBlocks(false);
-        this.scanner = this.htable.getScanner(scan);
-        currentScan = scan;
-      } else {
-        LOG.debug("TIFB.restart, firstRow: " +
-            Bytes.toStringBinary(firstRow) + ", endRow: " +
-            Bytes.toStringBinary(endRow));
-        Scan scan = new Scan(firstRow, endRow);
-        TableInputFormat.addColumns(scan, trrInputColumns);
-        this.scanner = this.htable.getScanner(scan);
-        currentScan = scan;
-      }
-    } else {
-      LOG.debug("TIFB.restart, firstRow: " +
-          Bytes.toStringBinary(firstRow) + ", no endRow");
-
-      Scan scan = new Scan(firstRow);
-      TableInputFormat.addColumns(scan, trrInputColumns);
-      scan.setFilter(trrRowFilter);
-      this.scanner = this.htable.getScanner(scan);
-      currentScan = scan;
-    }
-    if (logScannerActivity) {
-      LOG.info("Current scan=" + currentScan.toString());
-      timestamp = System.currentTimeMillis();
-      rowcount = 0;
-    }
-  }
-
-  /**
-   * Build the scanner. Not done in constructor to allow for extension.
-   *
-   * @throws IOException
-   */
-  public void init() throws IOException {
-    restart(startRow);
-  }
-
-  byte[] getStartRow() {
-    return this.startRow;
-  }
-  /**
-   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
-   */
-  public void setHTable(Table htable) {
-    Configuration conf = htable.getConfiguration();
-    logScannerActivity = conf.getBoolean(
-      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
-    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
-    this.htable = htable;
-  }
-
-  /**
-   * @param inputColumns the columns to be placed in {@link Result}.
-   */
-  public void setInputColumns(final byte [][] inputColumns) {
-    this.trrInputColumns = inputColumns;
-  }
-
-  /**
-   * @param startRow the first row in the split
-   */
-  public void setStartRow(final byte [] startRow) {
-    this.startRow = startRow;
-  }
-
-  /**
-   *
-   * @param endRow the last row in the split
-   */
-  public void setEndRow(final byte [] endRow) {
-    this.endRow = endRow;
-  }
-
-  /**
-   * @param rowFilter the {@link Filter} to be used.
-   */
-  public void setRowFilter(Filter rowFilter) {
-    this.trrRowFilter = rowFilter;
-  }
-
-  public void close() {
-    if (this.scanner != null) {
-      this.scanner.close();
-    }
-    try {
-      this.htable.close();
-    } catch (IOException ioe) {
-      LOG.warn("Error closing table", ioe);
-    }
-  }
-
-  /**
-   * @return ImmutableBytesWritable
-   *
-   * @see org.apache.hadoop.mapred.RecordReader#createKey()
-   */
-  public ImmutableBytesWritable createKey() {
-    return new ImmutableBytesWritable();
-  }
-
-  /**
-   * @return RowResult
-   *
-   * @see org.apache.hadoop.mapred.RecordReader#createValue()
-   */
-  public Result createValue() {
-    return new Result();
-  }
-
-  public long getPos() {
-    // This should be the ordinal tuple in the range;
-    // not clear how to calculate...
-    return 0;
-  }
-
-  public float getProgress() {
-    // Depends on the total number of tuples and getPos
-    return 0;
-  }
-
-  /**
-   * @param key HStoreKey as input key.
-   * @param value MapWritable as input value
-   * @return true if there was more data
-   * @throws IOException
-   */
-  public boolean next(ImmutableBytesWritable key, Result value)
-  throws IOException {
-    Result result;
-    try {
-      try {
-        result = this.scanner.next();
-        if (logScannerActivity) {
-          rowcount ++;
-          if (rowcount >= logPerRowCount) {
-            long now = System.currentTimeMillis();
-            LOG.info("Mapper took " + (now-timestamp)
-              + "ms to process " + rowcount + " rows");
-            timestamp = now;
-            rowcount = 0;
-          }
-        }
-      } catch (IOException e) {
-        // do not retry if the exception tells us not to do so
-        if (e instanceof DoNotRetryIOException) {
-          throw e;
-        }
-        // try to handle all other IOExceptions by restarting
-        // the scanner, if the second call fails, it will be rethrown
-        LOG.debug("recovered from " + StringUtils.stringifyException(e));
-        if (lastSuccessfulRow == null) {
-          LOG.warn("We are restarting the first next() invocation," +
-              " if your mapper has restarted a few other times like this" +
-              " then you should consider killing this job and investigate" +
-              " why it's taking so long.");
-        }
-        if (lastSuccessfulRow == null) {
-          restart(startRow);
-        } else {
-          restart(lastSuccessfulRow);
-          this.scanner.next();    // skip presumed already mapped row
-        }
-        result = this.scanner.next();
-      }
-
-      if (result != null && result.size() > 0) {
-        key.set(result.getRow());
-        lastSuccessfulRow = key.get();
-        value.copyFrom(result);
-        return true;
-      }
-      return false;
-    } catch (IOException ioe) {
-      if (logScannerActivity) {
-        long now = System.currentTimeMillis();
-        LOG.info("Mapper took " + (now-timestamp)
-          + "ms to process " + rowcount + " rows");
-        LOG.info(ioe);
-        String lastRow = lastSuccessfulRow == null ?
-          "null" : Bytes.toStringBinary(lastSuccessfulRow);
-        LOG.info("lastSuccessfulRow=" + lastRow);
-      }
-      throw ioe;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
deleted file mode 100644
index 91fb4a1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Reducer;
-
-/**
- * Write a table, sorting by the input key
- *
- * @param <K> key class
- * @param <V> value class
- */
-@InterfaceAudience.Public
-@SuppressWarnings("unchecked")
-public interface TableReduce<K extends WritableComparable, V>
-extends Reducer<K, V, ImmutableBytesWritable, Put> {
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
deleted file mode 100644
index d7b49ff..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
- * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
- *
- * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
- */
-@InterfaceAudience.Public
-public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
-
-  public static class TableSnapshotRegionSplit implements InputSplit {
-    private TableSnapshotInputFormatImpl.InputSplit delegate;
-
-    // constructor for mapreduce framework / Writable
-    public TableSnapshotRegionSplit() {
-      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
-    }
-
-    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
-      this.delegate = delegate;
-    }
-
-    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
-        List<String> locations, Scan scan, Path restoreDir) {
-      this.delegate =
-          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
-    }
-
-    @Override
-    public long getLength() throws IOException {
-      return delegate.getLength();
-    }
-
-    @Override
-    public String[] getLocations() throws IOException {
-      return delegate.getLocations();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      delegate.write(out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      delegate.readFields(in);
-    }
-  }
-
-  static class TableSnapshotRecordReader
-    implements RecordReader<ImmutableBytesWritable, Result> {
-
-    private TableSnapshotInputFormatImpl.RecordReader delegate;
-
-    public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
-        throws IOException {
-      delegate = new TableSnapshotInputFormatImpl.RecordReader();
-      delegate.initialize(split.delegate, job);
-    }
-
-    @Override
-    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
-      if (!delegate.nextKeyValue()) {
-        return false;
-      }
-      ImmutableBytesWritable currentKey = delegate.getCurrentKey();
-      key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
-      value.copyFrom(delegate.getCurrentValue());
-      return true;
-    }
-
-    @Override
-    public ImmutableBytesWritable createKey() {
-      return new ImmutableBytesWritable();
-    }
-
-    @Override
-    public Result createValue() {
-      return new Result();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return delegate.getPos();
-    }
-
-    @Override
-    public void close() throws IOException {
-      delegate.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      return delegate.getProgress();
-    }
-  }
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    List<TableSnapshotInputFormatImpl.InputSplit> splits =
-      TableSnapshotInputFormatImpl.getSplits(job);
-    InputSplit[] results = new InputSplit[splits.size()];
-    for (int i = 0; i < splits.size(); i++) {
-      results[i] = new TableSnapshotRegionSplit(splits.get(i));
-    }
-    return results;
-  }
-
-  @Override
-  public RecordReader<ImmutableBytesWritable, Result>
-  getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
-  }
-
-  /**
-   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
-   * @param job the job to configure
-   * @param snapshotName the name of the snapshot to read from
-   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restoreDir can be deleted.
-   * @throws IOException if an error occurs
-   */
-  public static void setInput(JobConf job, String snapshotName, Path restoreDir)
-      throws IOException {
-    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
deleted file mode 100644
index 0784e5e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.InputSplit;
-
-/**
- * A table split corresponds to a key range [low, high)
- */
-@InterfaceAudience.Public
-public class TableSplit implements InputSplit, Comparable<TableSplit> {
-  private TableName m_tableName;
-  private byte [] m_startRow;
-  private byte [] m_endRow;
-  private String m_regionLocation;
-
-  /** default constructor */
-  public TableSplit() {
-    this((TableName)null, HConstants.EMPTY_BYTE_ARRAY,
-      HConstants.EMPTY_BYTE_ARRAY, "");
-  }
-
-  /**
-   * Constructor
-   * @param tableName
-   * @param startRow
-   * @param endRow
-   * @param location
-   */
-  public TableSplit(TableName tableName, byte [] startRow, byte [] endRow,
-      final String location) {
-    this.m_tableName = tableName;
-    this.m_startRow = startRow;
-    this.m_endRow = endRow;
-    this.m_regionLocation = location;
-  }
-
-  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
-      final String location) {
-    this(TableName.valueOf(tableName), startRow, endRow,
-      location);
-  }
-
-  /** @return table name */
-  public TableName getTable() {
-    return this.m_tableName;
-  }
-
-  /** @return table name */
-   public byte [] getTableName() {
-     return this.m_tableName.getName();
-   }
-
-  /** @return starting row key */
-  public byte [] getStartRow() {
-    return this.m_startRow;
-  }
-
-  /** @return end row key */
-  public byte [] getEndRow() {
-    return this.m_endRow;
-  }
-
-  /** @return the region's hostname */
-  public String getRegionLocation() {
-    return this.m_regionLocation;
-  }
-
-  public String[] getLocations() {
-    return new String[] {this.m_regionLocation};
-  }
-
-  public long getLength() {
-    // Not clear how to obtain this... seems to be used only for sorting splits
-    return 0;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.m_tableName = TableName.valueOf(Bytes.readByteArray(in));
-    this.m_startRow = Bytes.readByteArray(in);
-    this.m_endRow = Bytes.readByteArray(in);
-    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
-  }
-
-  public void write(DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, this.m_tableName.getName());
-    Bytes.writeByteArray(out, this.m_startRow);
-    Bytes.writeByteArray(out, this.m_endRow);
-    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("HBase table split(");
-      sb.append("table name: ").append(m_tableName);
-      sb.append(", start row: ").append(Bytes.toStringBinary(m_startRow));
-      sb.append(", end row: ").append(Bytes.toStringBinary(m_endRow));
-      sb.append(", region location: ").append(m_regionLocation);
-      sb.append(")");
-      return sb.toString();
-  }
-
-  @Override
-  public int compareTo(TableSplit o) {
-    return Bytes.compareTo(getStartRow(), o.getStartRow());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || !(o instanceof TableSplit)) {
-      return false;
-    }
-    TableSplit other = (TableSplit)o;
-    return m_tableName.equals(other.m_tableName) &&
-      Bytes.equals(m_startRow, other.m_startRow) &&
-      Bytes.equals(m_endRow, other.m_endRow) &&
-      m_regionLocation.equals(other.m_regionLocation);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = m_tableName != null ? m_tableName.hashCode() : 0;
-    result = 31 * result + Arrays.hashCode(m_startRow);
-    result = 31 * result + Arrays.hashCode(m_endRow);
-    result = 31 * result + (m_regionLocation != null ? m_regionLocation.hashCode() : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
deleted file mode 100644
index 8a2a363..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
-Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
-Input/OutputFormats, a table indexing MapReduce job, and utility methods.
-
-<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
-in the HBase Reference Guide for mapreduce over hbase documentation. 
-*/
-package org.apache.hadoop.hbase.mapred;

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
deleted file mode 100644
index 078033e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-
-/**
- * A job with a a map and reduce phase to count cells in a table.
- * The counter lists the following stats for a given table:
- * <pre>
- * 1. Total number of rows in the table
- * 2. Total number of CFs across all rows
- * 3. Total qualifiers across all rows
- * 4. Total occurrence of each CF
- * 5. Total occurrence  of each qualifier
- * 6. Total number of versions of each qualifier.
- * </pre>
- *
- * The cellcounter can take optional parameters to use a user
- * supplied row/family/qualifier string to use in the report and
- * second a regex based or prefix based row filter to restrict the
- * count operation to a limited subset of rows from the table or a
- * start time and/or end time to limit the count to a time range.
- */
-@InterfaceAudience.Public
-public class CellCounter extends Configured implements Tool {
-  private static final Log LOG =
-    LogFactory.getLog(CellCounter.class.getName());
-
-
-  /**
-   * Name of this 'program'.
-   */
-  static final String NAME = "CellCounter";
-
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  /**
-   * Mapper that runs the count.
-   */
-  static class CellCounterMapper
-  extends TableMapper<Text, IntWritable> {
-    /**
-     * Counter enumeration to count the actual rows.
-     */
-    public static enum Counters {
-      ROWS,
-      CELLS
-    }
-
-    private Configuration conf;
-    private String separator;
-
-    // state of current row, family, column needs to persist across map() invocations
-    // in order to properly handle scanner batching, where a single qualifier may have too
-    // many versions for a single map() call
-    private byte[] lastRow;
-    private String currentRowKey;
-    byte[] currentFamily = null;
-    String currentFamilyName = null;
-    byte[] currentQualifier = null;
-    // family + qualifier
-    String currentQualifierName = null;
-    // rowkey + family + qualifier
-    String currentRowQualifierName = null;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-      conf = context.getConfiguration();
-      separator = conf.get("ReportSeparator",":");
-    }
-
-    /**
-     * Maps the data.
-     *
-     * @param row     The current table row key.
-     * @param values  The columns.
-     * @param context The current context.
-     * @throws IOException When something is broken with the data.
-     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
-     *      org.apache.hadoop.mapreduce.Mapper.Context)
-     */
-
-    @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-      justification="Findbugs is blind to the Precondition null check")
-    public void map(ImmutableBytesWritable row, Result values,
-                    Context context)
-        throws IOException {
-      Preconditions.checkState(values != null,
-          "values passed to the map is null");
-
-      try {
-        byte[] currentRow = values.getRow();
-        if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
-          lastRow = currentRow;
-          currentRowKey = Bytes.toStringBinary(currentRow);
-          currentFamily = null;
-          currentQualifier = null;
-          context.getCounter(Counters.ROWS).increment(1);
-          context.write(new Text("Total ROWS"), new IntWritable(1));
-        }
-        if (!values.isEmpty()) {
-          int cellCount = 0;
-          for (Cell value : values.listCells()) {
-            cellCount++;
-            if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
-              currentFamily = CellUtil.cloneFamily(value);
-              currentFamilyName = Bytes.toStringBinary(currentFamily);
-              currentQualifier = null;
-              context.getCounter("CF", currentFamilyName).increment(1);
-              if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
-                context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
-                context.write(new Text(currentFamily), new IntWritable(1));
-              }
-            }
-            if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
-              currentQualifier = CellUtil.cloneQualifier(value);
-              currentQualifierName = currentFamilyName + separator +
-                  Bytes.toStringBinary(currentQualifier);
-              currentRowQualifierName = currentRowKey + separator + currentQualifierName;
-
-              context.write(new Text("Total Qualifiers across all Rows"),
-                  new IntWritable(1));
-              context.write(new Text(currentQualifierName), new IntWritable(1));
-            }
-            // Increment versions
-            context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
-          }
-          context.getCounter(Counters.CELLS).increment(cellCount);
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
-      Key, IntWritable> {
-
-    private IntWritable result = new IntWritable();
-    public void reduce(Key key, Iterable<IntWritable> values,
-      Context context)
-    throws IOException, InterruptedException {
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
-      }
-      result.set(sum);
-      context.write(key, result);
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   *
-   * @param conf The current configuration.
-   * @param args The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
-      throws IOException {
-    String tableName = args[0];
-    Path outputDir = new Path(args[1]);
-    String reportSeparatorString = (args.length > 2) ? args[2]: ":";
-    conf.set("ReportSeparator", reportSeparatorString);
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-    job.setJarByClass(CellCounter.class);
-    Scan scan = getConfiguredScanForJob(conf, args);
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
-    job.setNumReduceTasks(1);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(IntWritable.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    FileOutputFormat.setOutputPath(job, outputDir);
-    job.setReducerClass(IntSumReducer.class);
-    return job;
-  }
-
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
-      throws IOException {
-    // create scan with any properties set from TableInputFormat
-    Scan s = TableInputFormat.createScanFromConfiguration(conf);
-    // Set Scan Versions
-    if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
-      // default to all versions unless explicitly set
-      s.setMaxVersions(Integer.MAX_VALUE);
-    }
-    s.setCacheBlocks(false);
-    // Set RowFilter or Prefix Filter if applicable.
-    Filter rowFilter = getRowFilter(args);
-    if (rowFilter!= null) {
-      LOG.info("Setting Row Filter for counter.");
-      s.setFilter(rowFilter);
-    }
-    // Set TimeRange if defined
-    long timeRange[] = getTimeRange(args);
-    if (timeRange != null) {
-      LOG.info("Setting TimeRange for counter.");
-      s.setTimeRange(timeRange[0], timeRange[1]);
-    }
-    return s;
-  }
-
-
-  private static Filter getRowFilter(String[] args) {
-    Filter rowFilter = null;
-    String filterCriteria = (args.length > 3) ? args[3]: null;
-    if (filterCriteria == null) return null;
-    if (filterCriteria.startsWith("^")) {
-      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
-      rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
-    } else {
-      rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
-    }
-    return rowFilter;
-  }
-
-  private static long[] getTimeRange(String[] args) throws IOException {
-    final String startTimeArgKey = "--starttime=";
-    final String endTimeArgKey = "--endtime=";
-    long startTime = 0L;
-    long endTime = 0L;
-
-    for (int i = 1; i < args.length; i++) {
-      System.out.println("i:" + i + "arg[i]" + args[i]);
-      if (args[i].startsWith(startTimeArgKey)) {
-        startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
-      }
-      if (args[i].startsWith(endTimeArgKey)) {
-        endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
-      }
-    }
-
-    if (startTime == 0 && endTime == 0)
-      return null;
-
-    endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
-    return new long [] {startTime, endTime};
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      System.err.println("ERROR: Wrong number of parameters: " + args.length);
-      System.err.println("Usage: CellCounter ");
-      System.err.println("       <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
-        "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
-      System.err.println("  Note: -D properties will be applied to the conf used. ");
-      System.err.println("  Additionally, all of the SCAN properties from TableInputFormat");
-      System.err.println("  can be specified to get fine grained control on what is counted..");
-      System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
-      System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
-      System.err.println("   -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
-      System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
-      System.err.println("   -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
-      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
-      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
-      System.err.println("   -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
-      System.err.println("   -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
-      System.err.println("   -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
-      System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
-          "string : used to separate the rowId/column family name and qualifier name.");
-      System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
-          "operation to a limited subset of rows from the table based on regex or prefix pattern.");
-      return -1;
-    }
-    Job job = createSubmittableJob(getConf(), args);
-    return (job.waitForCompletion(true) ? 0 : 1);
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args);
-    System.exit(errCode);
-  }
-
-}


Mime
View raw message