hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject svn commit: r1594984 - in /hbase/branches/0.98: hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/test/jav...
Date Thu, 15 May 2014 17:11:32 GMT
Author: ndimiduk
Date: Thu May 15 17:11:31 2014
New Revision: 1594984

URL: http://svn.apache.org/r1594984
Log:
HBASE-11137 Add mapred.TableSnapshotInputFormat

Added:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
Modified:
    hbase/branches/0.98/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java

Modified: hbase/branches/0.98/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java?rev=1594984&r1=1594983&r2=1594984&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java (original)
+++ hbase/branches/0.98/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java Thu May 15 17:11:31 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Integrati
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.After;
 import org.junit.Before;
@@ -51,15 +52,18 @@ import org.junit.experimental.categories
  *
  * Then the test creates a snapshot from this table, and overrides the values in the original
  * table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot
- * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and
- * inspected later to verify that the MR job has seen all the values from the snapshot.
+ * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output
+ * file, and inspected later to verify that the MR job has seen all the values from the snapshot.
  *
  * <p> These parameters can be used to configure the job:
  * <br>"IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table
  * <br>"IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot
- * <br>"IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table to be created
- * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the snapshot files
- *
+ * <br>"IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table
+ * to be created (default, 32).
+ * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
+ * snapshot files
+ * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
+ * snapshot files
  */
 @Category(IntegrationTests.class)
 // Not runnable as a unit test. See TestTableSnapshotInputFormat
@@ -72,12 +76,24 @@ public class IntegrationTestTableSnapsho
 
   private static final String SNAPSHOT_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.snapshot";
 
+  private static final String MR_IMPLEMENTATION_KEY =
+    "IntegrationTestTableSnapshotInputFormat.API";
+  private static final String MAPRED_IMPLEMENTATION = "mapred";
+  private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
 
-  private static final String NUM_REGIONS_KEY = "IntegrationTestTableSnapshotInputFormat.numRegions";
+  private static final String NUM_REGIONS_KEY =
+    "IntegrationTestTableSnapshotInputFormat.numRegions";
   private static final int DEFAULT_NUM_REGIONS = 32;
-
   private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
 
+  private static final byte[] START_ROW = Bytes.toBytes("bbb");
+  private static final byte[] END_ROW = Bytes.toBytes("yyy");
+
+  // mapred API missing feature pairity with mapreduce. See comments in
+  // mapred.TestTableSnapshotInputFormat
+  private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
+  private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
+
   private IntegrationTestingUtility util;
 
   @Override
@@ -120,17 +136,39 @@ public class IntegrationTestTableSnapsho
       tableDir = new Path(tableDirStr);
     }
 
-    /* We create the table using HBaseAdmin#createTable(), which will create the table
-     * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
-     * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
-     * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
-     * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
-     * the input format, and we expect numRegions - 2 splits between bbb and yyy.
-     */
-    int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
-
-    TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir,
-      numRegions, expectedNumSplits, false);
+    final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
+    if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
+      /*
+       * We create the table using HBaseAdmin#createTable(), which will create the table
+       * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
+       * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
+       * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
+       * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
+       * the input format, and we expect numRegions - 2 splits between bbb and yyy.
+       */
+      LOG.debug("Running job with mapreduce API.");
+      int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
+
+      org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
+        tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions,
+        expectedNumSplits, false);
+    } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
+      /*
+       * Similar considerations to above. The difference is that mapred API does not support
+       * specifying start/end rows (or a scan object at all). Thus the omission of first and
+       * last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat
+       * for details of how that test works around the problem. This feature should be added
+       * in follow-on work.
+       */
+      LOG.debug("Running job with mapred API.");
+      int expectedNumSplits = numRegions;
+
+      org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
+        tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions,
+        expectedNumSplits, false);
+    } else {
+      throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
+    }
 
     return 0;
   }

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=1594984&r1=1594983&r2=1594984&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Thu May 15 17:11:31 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Put;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapred.TextInpu
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.security.token.Token;
 import org.apache.zookeeper.KeeperException;
+import org.cliffc.high_scale_lib.Counter;
 
 /**
  * Utility for {@link TableMap} and {@link TableReduce}
@@ -69,7 +71,16 @@ public class TableMapReduceUtil {
     Class<? extends TableMap> mapper,
     Class<?> outputKeyClass,
     Class<?> outputValueClass, JobConf job) {
-    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
+    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+      true, TableInputFormat.class);
+  }
+
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper,
+    Class<?> outputKeyClass,
+    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
+    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
+      addDependencyJars, TableInputFormat.class);
   }
 
   /**
@@ -88,9 +99,10 @@ public class TableMapReduceUtil {
   public static void initTableMapJob(String table, String columns,
     Class<? extends TableMap> mapper,
     Class<?> outputKeyClass,
-    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
+    Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
+    Class<? extends InputFormat> inputFormat) {
 
-    job.setInputFormat(TableInputFormat.class);
+    job.setInputFormat(inputFormat);
     job.setMapOutputValueClass(outputValueClass);
     job.setMapOutputKeyClass(outputKeyClass);
     job.setMapperClass(mapper);
@@ -114,6 +126,39 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+   * and read directly from snapshot files.
+   *
+   * @param snapshotName The name of the snapshot (of a table) to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job to adjust.  Make sure the passed job is
+   * carrying all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the configured
+   *           job classes via the distributed cache (tmpjars).
+   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restore directory can be deleted.
+   * @throws IOException When setting up the details fails.
+   * @see TableSnapshotInputFormat
+   */
+  public static void initTableSnapshotMapJob(String snapshotName, String columns,
+      Class<? extends TableMap> mapper,
+      Class<?> outputKeyClass,
+      Class<?> outputValueClass, JobConf job,
+      boolean addDependencyJars, Path tmpRestoreDir)
+  throws IOException {
+    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
+      addDependencyJars, TableSnapshotInputFormat.class);
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+    // We would need even more libraries that hbase-server depends on
+    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(job, Counter.class);
+  }
+
+  /**
    * Use this before submitting a TableReduce job. It will
    * appropriately set up the JobConf.
    *

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java?rev=1594984&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java Thu May 15 17:11:31 2014
@@ -0,0 +1,134 @@
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
+
+  static class TableSnapshotRegionSplit implements InputSplit {
+    private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+    // constructor for mapreduce framework / Writable
+    public TableSnapshotRegionSplit() {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+    }
+
+    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+      this.delegate = delegate;
+    }
+
+    public TableSnapshotRegionSplit(String regionName, List<String> locations) {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return delegate.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return delegate.getLocations();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      delegate.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      delegate.readFields(in);
+    }
+  }
+
+  static class TableSnapshotRecordReader
+      implements RecordReader<ImmutableBytesWritable, Result> {
+
+    private TableSnapshotInputFormatImpl.RecordReader delegate;
+
+    public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
+        throws IOException {
+      delegate = new TableSnapshotInputFormatImpl.RecordReader();
+      delegate.initialize(split.delegate, job);
+    }
+
+    @Override
+    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+      if (!delegate.nextKeyValue()) {
+        return false;
+      }
+      ImmutableBytesWritable currentKey = delegate.getCurrentKey();
+      key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
+      value.copyFrom(delegate.getCurrentValue());
+      return true;
+    }
+
+    @Override
+    public ImmutableBytesWritable createKey() {
+      return new ImmutableBytesWritable();
+    }
+
+    @Override
+    public Result createValue() {
+      return new Result();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return delegate.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return delegate.getProgress();
+    }
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<TableSnapshotInputFormatImpl.InputSplit> splits =
+      TableSnapshotInputFormatImpl.getSplits(job);
+    InputSplit[] results = new InputSplit[splits.size()];
+    for (int i = 0; i < splits.size(); i++) {
+      results[i] = new TableSnapshotRegionSplit(splits.get(i));
+    }
+    return results;
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result>
+  getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+   * @param job the job to configure
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(JobConf job, String snapshotName, Path restoreDir) throws IOException {
+    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
+  }
+}

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1594984&r1=1594983&r2=1594984&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Thu May 15 17:11:31 2014
@@ -273,6 +273,19 @@ public class TableMapReduceUtil {
   }
 
   /**
+   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
+   * direct memory will likely cause the map tasks to OOM when opening the region. This
+   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
+   * wants to override this behavior in their job.
+   */
+  public static void resetCacheConfig(Configuration conf) {
+    conf.setFloat(
+      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+    conf.setFloat("hbase.offheapcache.percentage", 0f);
+    conf.setFloat("hbase.bucketcache.size", 0f);
+  }
+
+  /**
    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
    * and read directly from snapshot files.
    *
@@ -302,17 +315,7 @@ public class TableMapReduceUtil {
     initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
         outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
 
-    /*
-     * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
-     * direct memory will likely cause the map tasks to OOM when opening the region. This
-     * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
-     * wants to override this behavior in their job.
-     */
-    job.getConfiguration().setFloat(
-      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
-    job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f);
-    job.getConfiguration().setFloat("hbase.bucketcache.size", 0f);
-
+    resetCacheConfig(job.getConfiguration());
     // We would need even more libraries that hbase-server depends on
     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
   }

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java?rev=1594984&r1=1594983&r2=1594984&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java Thu May 15 17:11:31 2014
@@ -18,49 +18,24 @@
 
 package org.apache.hadoop.hbase.mapreduce;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import java.util.UUID;
 
-import com.google.protobuf.HBaseZeroCopyByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
-import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableSnapshotScanner;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -111,268 +86,112 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
-  // TODO: Snapshots files are owned in fs by the hbase user. There is no
-  // easy way to delegate access.
 
   private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
 
-  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
-  private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
-  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
-
-  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
-  private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
-
-  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
-    private String regionName;
-    private String[] locations;
-
-    // constructor for mapreduce framework / Writable
-    public TableSnapshotRegionSplit() { }
-
-    TableSnapshotRegionSplit(String regionName, List<String> locations) {
-      this.regionName = regionName;
-      if (locations == null || locations.isEmpty()) {
-        this.locations = new String[0];
-      } else {
-        this.locations = locations.toArray(new String[locations.size()]);
-      }
+  @VisibleForTesting
+  static class TableSnapshotRegionSplit extends InputSplit implements Writable {
+    private TableSnapshotInputFormatImpl.InputSplit delegate;
+
+    public TableSnapshotRegionSplit() {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
+    }
+
+    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
+      this.delegate = delegate;
+    }
+
+    public TableSnapshotRegionSplit(String regionName, List<String> locations) {
+      this.delegate = new TableSnapshotInputFormatImpl.InputSplit(regionName, locations);
     }
+
     @Override
     public long getLength() throws IOException, InterruptedException {
-      //TODO: We can obtain the file sizes of the snapshot here.
-      return 0;
+      return delegate.getLength();
     }
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
-      return locations;
+      return delegate.getLocations();
     }
 
-    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
-    // doing this wrapping with Writables.
     @Override
     public void write(DataOutput out) throws IOException {
-    MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
-      MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
-        .setRegion(RegionSpecifier.newBuilder()
-          .setType(RegionSpecifierType.ENCODED_REGION_NAME)
-          .setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(regionName))).build());
-
-      for (String location : locations) {
-        builder.addLocations(location);
-      }
-
-      MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
-
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      split.writeTo(baos);
-      baos.close();
-      byte[] buf = baos.toByteArray();
-      out.writeInt(buf.length);
-      out.write(buf);
+      delegate.write(out);
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
-      int len = in.readInt();
-      byte[] buf = new byte[len];
-      in.readFully(buf);
-      MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
-      this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
-      List<String> locationsList = split.getLocationsList();
-      this.locations = locationsList.toArray(new String[locationsList.size()]);
+      delegate.readFields(in);
     }
   }
 
   @VisibleForTesting
   static class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
-    private TableSnapshotRegionSplit split;
-    private Scan scan;
-    private Result result = null;
-    private ImmutableBytesWritable row = null;
-    private ClientSideRegionScanner scanner;
+    private TableSnapshotInputFormatImpl.RecordReader delegate =
+      new TableSnapshotInputFormatImpl.RecordReader();
     private TaskAttemptContext context;
     private Method getCounter;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
-        InterruptedException {
+      InterruptedException {
 
-      Configuration conf = context.getConfiguration();
-      this.split = (TableSnapshotRegionSplit) split;
-      String regionName = this.split.regionName;
-      String snapshotName = getSnapshotName(conf);
-      Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
-      FileSystem fs = rootDir.getFileSystem(conf);
-
-      Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
-      // directory where snapshot was restored
-
-      Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-
-      //load table descriptor
-      HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
-
-      //load region descriptor
-      Path regionDir = new Path(snapshotDir, regionName);
-      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
-
-      // create scan
-      String scanStr = conf.get(TableInputFormat.SCAN);
-      if (scanStr == null) {
-        throw new IllegalArgumentException("A Scan is not configured for this job");
-      }
-      scan = TableMapReduceUtil.convertStringToScan(scanStr);
-      // region is immutable, this should be fine,
-      // otherwise we have to set the thread read point
-      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
-      // disable caching of data blocks
-      scan.setCacheBlocks(false);
-
-      scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
-      if (context != null) {
-        this.context = context;
-        getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
-      }
+      this.context = context;
+      getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+      delegate.initialize(
+        ((TableSnapshotRegionSplit) split).delegate,
+        context.getConfiguration());
     }
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-      result = scanner.next();
-      if (result == null) {
-        //we are done
-        return false;
-      }
-
-      if (this.row == null) {
-        this.row = new ImmutableBytesWritable();
-      }
-      this.row.set(result.getRow());
-
-      ScanMetrics scanMetrics = scanner.getScanMetrics();
-      if (scanMetrics != null && context != null) {
-        TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+      boolean result = delegate.nextKeyValue();
+      if (result) {
+        ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
+        if (scanMetrics != null && context != null) {
+          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+        }
       }
 
-      return true;
+      return result;
     }
 
     @Override
     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
-      return row;
+      return delegate.getCurrentKey();
     }
 
     @Override
     public Result getCurrentValue() throws IOException, InterruptedException {
-      return result;
+      return delegate.getCurrentValue();
     }
 
     @Override
     public float getProgress() throws IOException, InterruptedException {
-      return 0; // TODO: use total bytes to estimate
+      return delegate.getProgress();
     }
 
     @Override
     public void close() throws IOException {
-      if (this.scanner != null) {
-        this.scanner.close();
-      }
+      delegate.close();
     }
   }
 
   @Override
   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
+    InputSplit split, TaskAttemptContext context) throws IOException {
     return new TableSnapshotRegionRecordReader();
   }
 
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
-    Configuration conf = job.getConfiguration();
-    String snapshotName = getSnapshotName(conf);
-
-    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-
-    Set<String> snapshotRegionNames
-      = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
-    if (snapshotRegionNames == null) {
-      throw new IllegalArgumentException("Snapshot seems empty");
-    }
-
-    // load table descriptor
-    HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
-        snapshotDir);
-
-    Scan scan = TableMapReduceUtil.convertStringToScan(conf
-      .get(TableInputFormat.SCAN));
-    Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
-
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (String regionName : snapshotRegionNames) {
-      // load region descriptor
-      Path regionDir = new Path(snapshotDir, regionName);
-      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
-          regionDir);
-
-      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
-          hri.getStartKey(), hri.getEndKey())) {
-        // compute HDFS locations from snapshot files (which will get the locations for
-        // referred hfiles)
-        List<String> hosts = getBestLocations(conf,
-          HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
-        int len = Math.min(3, hosts.size());
-        hosts = hosts.subList(0, len);
-        splits.add(new TableSnapshotRegionSplit(regionName, hosts));
-      }
-    }
-
-    return splits;
-  }
-
-  /**
-   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
-   * weights into account, thus will treat every location passed from the input split as equal. We
-   * do not want to blindly pass all the locations, since we are creating one split per region, and
-   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
-   * is used. On the expected stable case, only one location will contain most of the blocks as local.
-   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
-   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
-   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
-   * host with the best locality.
-   */
-  @VisibleForTesting
-  List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
-    List<String> locations = new ArrayList<String>(3);
-
-    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
-
-    if (hostAndWeights.length == 0) {
-      return locations;
-    }
-
-    HostAndWeight topHost = hostAndWeights[0];
-    locations.add(topHost.getHost());
-
-    // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
-    double cutoffMultiplier
-      = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
-
-    double filterWeight = topHost.getWeight() * cutoffMultiplier;
-
-    for (int i = 1; i < hostAndWeights.length; i++) {
-      if (hostAndWeights[i].getWeight() >= filterWeight) {
-        locations.add(hostAndWeights[i].getHost());
-      } else {
-        break;
-      }
+    List<InputSplit> results = new ArrayList<InputSplit>();
+    for (TableSnapshotInputFormatImpl.InputSplit split :
+      TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
+      results.add(new TableSnapshotRegionSplit(split));
     }
-
-    return locations;
+    return results;
   }
 
   /**
@@ -385,25 +204,6 @@ public class TableSnapshotInputFormat ex
    * @throws IOException if an error occurs
    */
   public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
-    Configuration conf = job.getConfiguration();
-    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
-
-    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
-    FileSystem fs = rootDir.getFileSystem(conf);
-
-    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
-
-    // TODO: restore from record readers to parallelize.
-    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
-
-    conf.set(TABLE_DIR_KEY, restoreDir.toString());
-  }
-
-  private static String getSnapshotName(Configuration conf) {
-    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
-    if (snapshotName == null) {
-      throw new IllegalArgumentException("Snapshot name must be provided");
-    }
-    return snapshotName;
+    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
   }
-}
+}
\ No newline at end of file

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java?rev=1594984&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java Thu May 15 17:11:31 2014
@@ -0,0 +1,346 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * API-agnostic implementation for mapreduce over table snapshots.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TableSnapshotInputFormatImpl {
+  // TODO: Snapshots files are owned in fs by the hbase user. There is no
+  // easy way to delegate access.
+
+  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+  private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+
+  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+  private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
+  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
+
+  /**
+   * Implementation class for InputSplit logic common between mapred and mapreduce.
+   */
+  public static class InputSplit implements Writable {
+    private String regionName;
+    private String[] locations;
+
+    // constructor for mapreduce framework / Writable
+    public InputSplit() { }
+
+    public InputSplit(String regionName, List<String> locations) {
+      this.regionName = regionName;
+      if (locations == null || locations.isEmpty()) {
+        this.locations = new String[0];
+      } else {
+        this.locations = locations.toArray(new String[locations.size()]);
+      }
+    }
+
+    public long getLength() {
+      //TODO: We can obtain the file sizes of the snapshot here.
+      return 0;
+    }
+
+    public String[] getLocations() {
+      return locations;
+    }
+
+    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
+    // doing this wrapping with Writables.
+    @Override
+    public void write(DataOutput out) throws IOException {
+      MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
+        MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
+          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.ENCODED_REGION_NAME)
+            .setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(regionName))).build());
+
+      for (String location : locations) {
+        builder.addLocations(location);
+      }
+
+      MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      split.writeTo(baos);
+      baos.close();
+      byte[] buf = baos.toByteArray();
+      out.writeInt(buf.length);
+      out.write(buf);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int len = in.readInt();
+      byte[] buf = new byte[len];
+      in.readFully(buf);
+      MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+      this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
+      List<String> locationsList = split.getLocationsList();
+      this.locations = locationsList.toArray(new String[locationsList.size()]);
+    }
+  }
+
+  /**
+   * Implementation class for RecordReader logic common between mapred and mapreduce.
+   */
+  public static class RecordReader {
+    InputSplit split;
+    private Scan scan;
+    private Result result = null;
+    private ImmutableBytesWritable row = null;
+    private ClientSideRegionScanner scanner;
+
+    public ClientSideRegionScanner getScanner() {
+      return scanner;
+    }
+
+    public void initialize(InputSplit split, Configuration conf) throws IOException {
+      this.split = split;
+      String regionName = this.split.regionName;
+      String snapshotName = getSnapshotName(conf);
+      Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+      FileSystem fs = rootDir.getFileSystem(conf);
+
+      Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
+      // directory where snapshot was restored
+
+      Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+      //load table descriptor
+      HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+      //load region descriptor
+      Path regionDir = new Path(snapshotDir, regionName);
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+      // create scan
+      // TODO: mapred does not support scan as input API. Work around for now.
+      if (conf.get(TableInputFormat.SCAN) != null) {
+        scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+      } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
+        String[] columns =
+          conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
+        scan = new Scan();
+        for (String col : columns) {
+          scan.addFamily(Bytes.toBytes(col));
+        }
+      } else {
+        throw new IllegalArgumentException("A Scan is not configured for this job");
+      }
+
+      // region is immutable, this should be fine,
+      // otherwise we have to set the thread read point
+      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+      // disable caching of data blocks
+      scan.setCacheBlocks(false);
+
+      scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+    }
+
+    public boolean nextKeyValue() throws IOException {
+      result = scanner.next();
+      if (result == null) {
+        //we are done
+        return false;
+      }
+
+      if (this.row == null) {
+        this.row = new ImmutableBytesWritable();
+      }
+      this.row.set(result.getRow());
+      return true;
+    }
+
+    public ImmutableBytesWritable getCurrentKey() {
+      return row;
+    }
+
+    public Result getCurrentValue() {
+      return result;
+    }
+
+    public long getPos() {
+      return 0;
+    }
+
+    public float getProgress() {
+      return 0; // TODO: use total bytes to estimate
+    }
+
+    public void close() {
+      if (this.scanner != null) {
+        this.scanner.close();
+      }
+    }
+  }
+
+  public static List<InputSplit> getSplits(Configuration conf) throws IOException {
+    String snapshotName = getSnapshotName(conf);
+
+    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+    HBaseProtos.SnapshotDescription snapshotDesc =
+      SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+    Set<String> snapshotRegionNames =
+      SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+    if (snapshotRegionNames == null) {
+      throw new IllegalArgumentException("Snapshot seems empty");
+    }
+
+    // load table descriptor
+    HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
+      snapshotDir);
+
+    // TODO: mapred does not support scan as input API. Work around for now.
+    Scan scan = null;
+    if (conf.get(TableInputFormat.SCAN) != null) {
+      scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+    } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
+      String[] columns =
+        conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
+      scan = new Scan();
+      for (String col : columns) {
+        scan.addFamily(Bytes.toBytes(col));
+      }
+    } else {
+      throw new IllegalArgumentException("Unable to create scan");
+    }
+    Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (String regionName : snapshotRegionNames) {
+      // load region descriptor
+      Path regionDir = new Path(snapshotDir, regionName);
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
+        regionDir);
+
+      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+        hri.getStartKey(), hri.getEndKey())) {
+        // compute HDFS locations from snapshot files (which will get the locations for
+        // referred hfiles)
+        List<String> hosts = getBestLocations(conf,
+          HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+        int len = Math.min(3, hosts.size());
+        hosts = hosts.subList(0, len);
+        splits.add(new InputSplit(regionName, hosts));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
+   * weights into account, thus will treat every location passed from the input split as equal. We
+   * do not want to blindly pass all the locations, since we are creating one split per region, and
+   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
+   * is used. On the expected stable case, only one location will contain most of the blocks as local.
+   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
+   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
+   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
+   * host with the best locality.
+   */
+  public static List<String> getBestLocations(
+      Configuration conf, HDFSBlocksDistribution blockDistribution) {
+    List<String> locations = new ArrayList<String>(3);
+
+    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
+
+    if (hostAndWeights.length == 0) {
+      return locations;
+    }
+
+    HostAndWeight topHost = hostAndWeights[0];
+    locations.add(topHost.getHost());
+
+    // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+    double cutoffMultiplier
+      = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+    double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+    for (int i = 1; i < hostAndWeights.length; i++) {
+      if (hostAndWeights[i].getWeight() >= filterWeight) {
+        locations.add(hostAndWeights[i].getHost());
+      } else {
+        break;
+      }
+    }
+
+    return locations;
+  }
+
+  private static String getSnapshotName(Configuration conf) {
+    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+    if (snapshotName == null) {
+      throw new IllegalArgumentException("Snapshot name must be provided");
+    }
+    return snapshotName;
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+   * @param conf the job to configure
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
+      throws IOException {
+    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
+
+    // TODO: restore from record readers to parallelize.
+    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+
+    conf.set(TABLE_DIR_KEY, restoreDir.toString());
+  }
+}

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java?rev=1594984&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java Thu May 15 17:11:31 2014
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
+
+  private static final byte[] aaa = Bytes.toBytes("aaa");
+  private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
+  private static final String COLUMNS =
+    Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
+
+  @Override
+  protected byte[] getStartRow() {
+    return aaa;
+  }
+
+  @Override
+  protected byte[] getEndRow() {
+    return after_zzz;
+  }
+
+  static class TestTableSnapshotMapper extends MapReduceBase
+      implements  TableMap<ImmutableBytesWritable, NullWritable> {
+    @Override
+    public void map(ImmutableBytesWritable key, Result value,
+        OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
+        throws IOException {
+      verifyRowFromMap(key, value);
+      collector.collect(key, NullWritable.get());
+    }
+  }
+
+  public static class TestTableSnapshotReducer extends MapReduceBase
+      implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
+    HBaseTestingUtility.SeenRowTracker rowTracker =
+      new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
+
+    @Override
+    public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
+        OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
+        throws IOException {
+      rowTracker.addRow(key.get());
+    }
+
+    @Override
+    public void close() {
+      rowTracker.validate();
+    }
+  }
+
+  @Test
+  public void testInitTableSnapshotMapperJobConfig() throws Exception {
+    setupCluster();
+    TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
+    String snapshotName = "foo";
+
+    try {
+      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
+      JobConf job = new JobConf(UTIL.getConfiguration());
+      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+
+      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+        COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, job, false, tmpTableDir);
+
+      // TODO: would be better to examine directly the cache instance that results from this
+      // config. Currently this is not possible because BlockCache initialization is static.
+      Assert.assertEquals(
+        "Snapshot job should be configured for default LruBlockCache.",
+        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
+        job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
+      Assert.assertEquals(
+        "Snapshot job should not use SlabCache.",
+        0, job.getFloat("hbase.offheapcache.percentage", -1), 0.01);
+      Assert.assertEquals(
+        "Snapshot job should not use BucketCache.",
+        0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
+    } finally {
+      UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  // TODO: mapred does not support limiting input range by startrow, endrow.
+  // Thus the following tests must override parameterverification.
+
+  @Test
+  @Override
+  public void testWithMockedMapReduceMultiRegion() throws Exception {
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
+  }
+
+  @Test
+  @Override
+  public void testWithMapReduceMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
+  }
+
+  @Test
+  @Override
+  // run the MR job while HBase is offline
+  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
+  }
+
+  @Override
+  protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+      int numRegions, int expectedNumSplits) throws Exception {
+    setupCluster();
+    TableName tableName = TableName.valueOf("testWithMockedMapReduce");
+    try {
+      createTableAndSnapshot(
+        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
+
+      JobConf job = new JobConf(util.getConfiguration());
+      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+
+      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+        COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, job, false, tmpTableDir);
+
+      // mapred doesn't support start and end keys? o.O
+      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
+
+    } finally {
+      util.getHBaseAdmin().deleteSnapshot(snapshotName);
+      util.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
+      byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
+    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+    InputSplit[] splits = tsif.getSplits(job, 0);
+
+    Assert.assertEquals(expectedNumSplits, splits.length);
+
+    HBaseTestingUtility.SeenRowTracker rowTracker =
+      new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+    for (int i = 0; i < splits.length; i++) {
+      // validate input split
+      InputSplit split = splits[i];
+      Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
+
+      // validate record reader
+      OutputCollector collector = mock(OutputCollector.class);
+      Reporter reporter = mock(Reporter.class);
+      RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
+
+      // validate we can read all the data back
+      ImmutableBytesWritable key = rr.createKey();
+      Result value = rr.createValue();
+      while (rr.next(key, value)) {
+        verifyRowFromMap(key, value);
+        rowTracker.addRow(key.copyBytes());
+      }
+
+      rr.close();
+    }
+
+    // validate all rows are seen
+    rowTracker.validate();
+  }
+
+  @Override
+  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+      boolean shutdownCluster) throws Exception {
+    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+      numRegions, expectedNumSplits, shutdownCluster);
+  }
+
+  // this is also called by the IntegrationTestTableSnapshotInputFormat
+  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+      int expectedNumSplits, boolean shutdownCluster) throws Exception {
+
+    //create the table and snapshot
+    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
+
+    if (shutdownCluster) {
+      util.shutdownMiniHBaseCluster();
+    }
+
+    try {
+      // create the job
+      JobConf jobConf = new JobConf(util.getConfiguration());
+
+      jobConf.setJarByClass(util.getClass());
+      org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
+        TestTableSnapshotInputFormat.class);
+
+      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
+        TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, jobConf, true, tableDir);
+
+      jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+      jobConf.setNumReduceTasks(1);
+      jobConf.setOutputFormat(NullOutputFormat.class);
+
+      RunningJob job = JobClient.runJob(jobConf);
+      Assert.assertTrue(job.isSuccessful());
+    } finally {
+      if (!shutdownCluster) {
+        util.getHBaseAdmin().deleteSnapshot(snapshotName);
+        util.deleteTable(tableName);
+      }
+    }
+  }
+}

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java?rev=1594984&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java Thu May 15 17:11:31 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public abstract class TableSnapshotInputFormatTestBase {
+
+  protected final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  protected static final int NUM_REGION_SERVERS = 2;
+  protected static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+
+  protected FileSystem fs;
+  protected Path rootDir;
+
+  public void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(NUM_REGION_SERVERS);
+    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    fs = rootDir.getFileSystem(UTIL.getConfiguration());
+  }
+
+  public void tearDownCluster() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static void setupConf(Configuration conf) {
+    // Enable snapshot
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+  }
+
+  protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+    int numRegions, int expectedNumSplits) throws Exception;
+
+  protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+    String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+    boolean shutdownCluster) throws Exception;
+
+  protected abstract byte[] getStartRow();
+
+  protected abstract byte[] getEndRow();
+
+  @Test
+  public void testWithMockedMapReduceSingleRegion() throws Exception {
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+  }
+
+  @Test
+  public void testWithMockedMapReduceMultiRegion() throws Exception {
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+  }
+
+  @Test
+  public void testWithMapReduceSingleRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+  }
+
+  @Test
+  public void testWithMapReduceMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+  }
+
+  @Test
+  // run the MR job while HBase is offline
+  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+  }
+
+  protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
+      int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+    setupCluster();
+    util.startMiniMapReduceCluster();
+    try {
+      Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
+      TableName tableName = TableName.valueOf("testWithMapReduce");
+      testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions,
+        expectedNumSplits, shutdownCluster);
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      tearDownCluster();
+    }
+  }
+
+  protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result)
+    throws IOException {
+    byte[] row = key.get();
+    CellScanner scanner = result.cellScanner();
+    while (scanner.advance()) {
+      Cell cell = scanner.current();
+
+      //assert that all Cells in the Result have the same key
+      Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+    }
+
+    for (int j = 0; j < FAMILIES.length; j++) {
+      byte[] actual = result.getValue(FAMILIES[j], null);
+      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+        + " ,actual:" + Bytes.toString(actual), row, actual);
+    }
+  }
+
+  protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+    String snapshotName, byte[] startRow, byte[] endRow, int numRegions)
+    throws Exception {
+    try {
+      util.deleteTable(tableName);
+    } catch(Exception ex) {
+      // ignore
+    }
+
+    if (numRegions > 1) {
+      util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions);
+    } else {
+      util.createTable(tableName, FAMILIES);
+    }
+    HBaseAdmin admin = util.getHBaseAdmin();
+
+    // put some stuff in the table
+    HTable table = new HTable(util.getConfiguration(), tableName);
+    util.loadTable(table, FAMILIES);
+
+    Path rootDir = FSUtils.getRootDir(util.getConfiguration());
+    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+      Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+    // load different values
+    byte[] value = Bytes.toBytes("after_snapshot_value");
+    util.loadTable(table, FAMILIES, value);
+
+    // cause flush to create new files in the region
+    admin.flush(tableName.toString());
+    table.close();
+  }
+
+}

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java?rev=1594984&r1=1594983&r2=1594984&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java Thu May 15 17:11:31 2014
@@ -22,33 +22,19 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
-import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
-import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -57,52 +43,31 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 
 @Category(LargeTests.class)
-public class TestTableSnapshotInputFormat {
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
 
-  private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
-  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-  private static final int NUM_REGION_SERVERS = 2;
-  private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
-  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
-  private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
   public static byte[] bbb = Bytes.toBytes("bbb");
   public static byte[] yyy = Bytes.toBytes("yyy");
 
-  private FileSystem fs;
-  private Path rootDir;
-
-  public void setupCluster() throws Exception {
-    setupConf(UTIL.getConfiguration());
-    UTIL.startMiniCluster(NUM_REGION_SERVERS);
-    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
-    fs = rootDir.getFileSystem(UTIL.getConfiguration());
-  }
-
-  public void tearDownCluster() throws Exception {
-    UTIL.shutdownMiniCluster();
+  @Override
+  protected byte[] getStartRow() {
+    return bbb;
   }
 
-  private static void setupConf(Configuration conf) {
-    // Enable snapshot
-    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
-  }
-
-  @After
-  public void tearDown() throws Exception {
+  @Override
+  protected byte[] getEndRow() {
+    return yyy;
   }
 
   @Test
   public void testGetBestLocations() throws IOException {
-    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
     Configuration conf = UTIL.getConfiguration();
 
     HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
@@ -167,41 +132,6 @@ public class TestTableSnapshotInputForma
     }
   }
 
-  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
-      String snapshotName, int numRegions)
-      throws Exception {
-    try {
-      util.deleteTable(tableName);
-    } catch(Exception ex) {
-      // ignore
-    }
-
-    if (numRegions > 1) {
-      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
-    } else {
-      util.createTable(tableName, FAMILIES);
-    }
-    HBaseAdmin admin = util.getHBaseAdmin();
-
-    // put some stuff in the table
-    HTable table = new HTable(util.getConfiguration(), tableName);
-    util.loadTable(table, FAMILIES);
-
-    Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
-    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
-
-    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
-        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
-
-    // load different values
-    byte[] value = Bytes.toBytes("after_snapshot_value");
-    util.loadTable(table, FAMILIES, value);
-
-    // cause flush to create new files in the region
-    admin.flush(tableName.toString());
-    table.close();
-  }
-
   @Test
   public void testInitTableSnapshotMapperJobConfig() throws Exception {
     setupCluster();
@@ -209,7 +139,7 @@ public class TestTableSnapshotInputForma
     String snapshotName = "foo";
 
     try {
-      createTableAndSnapshot(UTIL, tableName, snapshotName, 1);
+      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
       Job job = new Job(UTIL.getConfiguration());
       Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
 
@@ -236,32 +166,23 @@ public class TestTableSnapshotInputForma
     }
   }
 
-  @Test
-  public void testWithMockedMapReduceSingleRegion() throws Exception {
-    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
-  }
-
-  @Test
-  public void testWithMockedMapReduceMultiRegion() throws Exception {
-    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
-  }
-
-  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits)
-      throws Exception {
+  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+      int numRegions, int expectedNumSplits) throws Exception {
     setupCluster();
     TableName tableName = TableName.valueOf("testWithMockedMapReduce");
     try {
-      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+      createTableAndSnapshot(
+        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
 
       Job job = new Job(util.getConfiguration());
       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
-      Scan scan = new Scan(bbb, yyy); // limit the scan
+      Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
 
       TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
           scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
           NullWritable.class, job, false, tmpTableDir);
 
-      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
+      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
 
     } finally {
       util.getHBaseAdmin().deleteSnapshot(snapshotName);
@@ -305,62 +226,21 @@ public class TestTableSnapshotInputForma
     rowTracker.validate();
   }
 
-  public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException {
-    byte[] row = key.get();
-    CellScanner scanner = result.cellScanner();
-    while (scanner.advance()) {
-      Cell cell = scanner.current();
-
-      //assert that all Cells in the Result have the same key
-     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
-         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
-    }
-
-    for (int j = 0; j < FAMILIES.length; j++) {
-      byte[] actual = result.getValue(FAMILIES[j], null);
-      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
-          + " ,actual:" + Bytes.toString(actual), row, actual);
-    }
-  }
-
-  @Test
-  public void testWithMapReduceSingleRegion() throws Exception {
-    testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
-  }
-
-  @Test
-  public void testWithMapReduceMultiRegion() throws Exception {
-    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
-  }
-
-  @Test
-  // run the MR job while HBase is offline
-  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
-    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
-  }
-
-  private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
-      int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
-    setupCluster();
-    util.startMiniMapReduceCluster();
-    try {
-      Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
-      TableName tableName = TableName.valueOf("testWithMapReduce");
-      doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
-        expectedNumSplits, shutdownCluster);
-    } finally {
-      util.shutdownMiniMapReduceCluster();
-      tearDownCluster();
-    }
+  @Override
+  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+      boolean shutdownCluster) throws Exception {
+    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+      numRegions, expectedNumSplits, shutdownCluster);
   }
 
   // this is also called by the IntegrationTestTableSnapshotInputFormat
   public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
-      String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster)
-          throws Exception {
+      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+      int expectedNumSplits, boolean shutdownCluster) throws Exception {
 
     //create the table and snapshot
-    createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
 
     if (shutdownCluster) {
       util.shutdownMiniHBaseCluster();
@@ -369,7 +249,7 @@ public class TestTableSnapshotInputForma
     try {
       // create the job
       Job job = new Job(util.getConfiguration());
-      Scan scan = new Scan(bbb, yyy); // limit the scan
+      Scan scan = new Scan(startRow, endRow); // limit the scan
 
       job.setJarByClass(util.getClass());
       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);



Mime
View raw message