hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1575645 [2/2] - in /hbase/branches/0.96: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/test/java/org/apache/hadoop/hbase/ hbase-it/src/test/java/org/apache/h...
Date Sun, 09 Mar 2014 01:58:44 GMT
Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java Sun Mar  9 01:58:43 2014
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
+ * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
+ * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
+ * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
+ * online or offline hbase cluster. The snapshot files can be exported by using the
+ * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
+ * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
+ * while there are jobs reading from snapshot files.
+ * <p>
+ * Usage is similar to TableInputFormat, and
+ * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ *      scan, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true);
+ * }
+ * </pre>
+ * <p>
+ * Internally, this input format restores the snapshot into the given tmp directory. Similar to
+ * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
+ * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
+ * from the user.
+ * <p>
+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
+ * snapshot files and data files. HBase also enforces security because all the requests are handled
+ * by the server layer, and the user cannot read from the data files directly. To read from snapshot
+ * files directly from the file system, the user who is running the MR job must have sufficient
+ * permissions to access snapshot and reference files. This means that to run mapreduce over
+ * snapshot files, the MR job has to be run as the HBase user or the user must have group or other
+ * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
+ * snapshot/data files will completely circumvent the access control enforced by HBase.
+ * @see TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
+  // TODO: Snapshots files are owned in fs by the hbase user. There is no
+  // easy way to delegate access.
+
+  private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
+
+  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+  private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
+  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
+
+  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+  private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+
+  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
+    private String regionName;
+    private String[] locations;
+
+    // constructor for mapreduce framework / Writable
+    public TableSnapshotRegionSplit() { }
+
+    TableSnapshotRegionSplit(String regionName, List<String> locations) {
+      this.regionName = regionName;
+      if (locations == null || locations.isEmpty()) {
+        this.locations = new String[0];
+      } else {
+        this.locations = locations.toArray(new String[locations.size()]);
+      }
+    }
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      //TODO: We can obtain the file sizes of the snapshot here.
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return locations;
+    }
+
+    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
+    // doing this wrapping with Writables.
+    @Override
+    public void write(DataOutput out) throws IOException {
+    MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
+      MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
+        .setRegion(RegionSpecifier.newBuilder()
+          .setType(RegionSpecifierType.ENCODED_REGION_NAME)
+          .setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build());
+
+      for (String location : locations) {
+        builder.addLocations(location);
+      }
+
+      MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      split.writeTo(baos);
+      baos.close();
+      byte[] buf = baos.toByteArray();
+      out.writeInt(buf.length);
+      out.write(buf);
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int len = in.readInt();
+      byte[] buf = new byte[len];
+      in.readFully(buf);
+      MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+      this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
+      List<String> locationsList = split.getLocationsList();
+      this.locations = locationsList.toArray(new String[locationsList.size()]);
+    }
+  }
+
+  @VisibleForTesting
+  class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
+    private TableSnapshotRegionSplit split;
+    private Scan scan;
+    private Result result = null;
+    private ImmutableBytesWritable row = null;
+    private ClientSideRegionScanner scanner;
+    private TaskAttemptContext context;
+    private Method getCounter;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+        InterruptedException {
+
+      Configuration conf = context.getConfiguration();
+      this.split = (TableSnapshotRegionSplit) split;
+      String regionName = this.split.regionName;
+      String snapshotName = getSnapshotName(conf);
+      Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+      FileSystem fs = rootDir.getFileSystem(conf);
+
+      Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
+      // directory where snapshot was restored
+
+      Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+      //load table descriptor
+      HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+      //load region descriptor
+      Path regionDir = new Path(snapshotDir, regionName);
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+      // create scan
+      String scanStr = conf.get(TableInputFormat.SCAN);
+      if (scanStr == null) {
+        throw new IllegalArgumentException("A Scan is not configured for this job");
+      }
+      scan = TableMapReduceUtil.convertStringToScan(scanStr);
+      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine,
+                                                               // otherwise we have to set the thread read point
+
+      scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+      if (context != null) {
+        this.context = context;
+        getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      result = scanner.next();
+      if (result == null) {
+        //we are done
+        return false;
+      }
+
+      if (this.row == null) {
+        this.row = new ImmutableBytesWritable();
+      }
+      this.row.set(result.getRow());
+
+      ScanMetrics scanMetrics = scanner.getScanMetrics();
+      if (scanMetrics != null && context != null) {
+        TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+      }
+
+      return true;
+    }
+
+    @Override
+    public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+      return row;
+    }
+
+    @Override
+    public Result getCurrentValue() throws IOException, InterruptedException {
+      return result;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0; // TODO: use total bytes to estimate
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.scanner != null) {
+        this.scanner.close();
+      }
+    }
+  }
+
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new TableSnapshotRegionRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    String snapshotName = getSnapshotName(conf);
+
+    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+    Set<String> snapshotRegionNames
+      = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+    if (snapshotRegionNames == null) {
+      throw new IllegalArgumentException("Snapshot seems empty");
+    }
+
+    // load table descriptor
+    HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
+        snapshotDir);
+
+    Scan scan = TableMapReduceUtil.convertStringToScan(conf
+      .get(TableInputFormat.SCAN));
+    Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (String regionName : snapshotRegionNames) {
+      // load region descriptor
+      Path regionDir = new Path(snapshotDir, regionName);
+      HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
+          regionDir);
+
+      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+          hri.getStartKey(), hri.getEndKey())) {
+        // compute HDFS locations from snapshot files (which will get the locations for
+        // referred hfiles)
+        List<String> hosts = getBestLocations(conf,
+          HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+        int len = Math.min(3, hosts.size());
+        hosts = hosts.subList(0, len);
+        splits.add(new TableSnapshotRegionSplit(regionName, hosts));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
+   * weights into account, thus will treat every location passed from the input split as equal. We
+   * do not want to blindly pass all the locations, since we are creating one split per region, and
+   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
+   * is used. On the expected stable case, only one location will contain most of the blocks as local.
+   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
+   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
+   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
+   * host with the best locality.
+   */
+  @VisibleForTesting
+  List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
+    List<String> locations = new ArrayList<String>(3);
+
+    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
+
+    if (hostAndWeights.length == 0) {
+      return locations;
+    }
+
+    HostAndWeight topHost = hostAndWeights[0];
+    locations.add(topHost.getHost());
+
+    // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+    double cutoffMultiplier
+      = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+    double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+    for (int i = 1; i < hostAndWeights.length; i++) {
+      if (hostAndWeights[i].getWeight() >= filterWeight) {
+        locations.add(hostAndWeights[i].getHost());
+      } else {
+        break;
+      }
+    }
+
+    return locations;
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+   * @param job the job to configure
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
+    Configuration conf = job.getConfiguration();
+    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
+
+    // TODO: restore from record readers to parallelize.
+    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+
+    conf.set(TABLE_DIR_KEY, restoreDir.toString());
+  }
+
+  private static String getSnapshotName(Configuration conf) {
+    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+    if (snapshotName == null) {
+      throw new IllegalArgumentException("Snapshot name must be provided");
+    }
+    return snapshotName;
+  }
+}

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Mar  9 01:58:43 2014
@@ -773,8 +773,23 @@ public class HRegion implements HeapSize
    */
   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
-    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
+    return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
+  }
+
+  /**
+   * This is a helper function to compute HDFS block distribution on demand
+   * @param conf configuration
+   * @param tableDescriptor HTableDescriptor of the table
+   * @param regionInfo encoded name of the region
+   * @param tablePath the table directory
+   * @return The HDFS blocks distribution for the given region.
+   * @throws IOException
+   */
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
+      final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
+      throws IOException {
+    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
     FileSystem fs = tablePath.getFileSystem(conf);
 
     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
@@ -3969,11 +3984,36 @@ public class HRegion implements HeapSize
                                       final HLog hlog,
                                       final boolean initialize, final boolean ignoreHLog)
       throws IOException {
+      Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+      return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+  }
+
+  /**
+   * Convenience method creating new HRegions. Used by createTable.
+   * The {@link HLog} for the created region needs to be closed
+   * explicitly, if it is not null.
+   * Use {@link HRegion#getLog()} to get access.
+   *
+   * @param info Info for region to create.
+   * @param rootDir Root directory for HBase instance
+   * @param tableDir table directory
+   * @param conf
+   * @param hTableDescriptor
+   * @param hlog shared HLog
+   * @param initialize - true to initialize the region
+   * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+   * @return new HRegion
+   * @throws IOException
+   */
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
+                                      final Configuration conf,
+                                      final HTableDescriptor hTableDescriptor,
+                                      final HLog hlog,
+                                      final boolean initialize, final boolean ignoreHLog)
+      throws IOException {
     LOG.info("creating HRegion " + info.getTable().getNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
         " Table name == " + info.getTable().getNameAsString());
-
-    Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
     FileSystem fs = FileSystem.get(conf);
     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
     HLog effectiveHLog = hlog;
@@ -4129,15 +4169,39 @@ public class HRegion implements HeapSize
       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
       final RegionServerServices rsServices, final CancelableProgressable reporter)
       throws IOException {
+    Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+    return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
+  }
+
+  /**
+   * Open a Region.
+   * @param conf The Configuration object to use.
+   * @param fs Filesystem to use
+   * @param rootDir Root directory for HBase instance
+   * @param info Info for region to be opened.
+   * @param htd the table descriptor
+   * @param wal HLog for region to use. This method will call
+   * HLog#setSequenceNumber(long) passing the result of the call to
+   * HRegion#getMinSequenceId() to ensure the log id is properly kept
+   * up.  HRegionStore does this every time it opens a new region.
+   * @param rsServices An interface we can request flushes against.
+   * @param reporter An interface we can report progress against.
+   * @return new HRegion
+   * @throws IOException
+   */
+  public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
+      final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+      final RegionServerServices rsServices, final CancelableProgressable reporter)
+      throws IOException {
     if (info == null) throw new NullPointerException("Passed region info is null");
     if (LOG.isDebugEnabled()) {
       LOG.debug("Opening region: " + info);
     }
-    Path dir = FSUtils.getTableDir(rootDir, info.getTable());
-    HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
+    HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
     return r.openHRegion(reporter);
   }
 
+
   /**
    * Useful when reopening a closed region (normally for unit tests)
    * @param other original object

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java Sun Mar  9 01:58:43 2014
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hbase.snapshot;
 
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -37,23 +37,24 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.FSVisitor;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@@ -471,8 +472,9 @@ public class RestoreSnapshotHelper {
     }
 
     // create the regions on disk
-    ModifyRegionUtils.createRegions(conf, rootDir,
+    ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
       tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
+        @Override
         public void fillRegion(final HRegion region) throws IOException {
           cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
         }
@@ -499,6 +501,7 @@ public class RestoreSnapshotHelper {
     final String tableName = tableDesc.getTableName().getNameAsString();
     SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
       new FSVisitor.StoreFileVisitor() {
+        @Override
         public void storeFile (final String region, final String family, final String hfile)
             throws IOException {
           LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
@@ -627,10 +630,13 @@ public class RestoreSnapshotHelper {
   private void restoreWALs() throws IOException {
     final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
         snapshotTable, regionsMap);
+    // TODO: use executors to parallelize splitting
+    // TODO: once split, we do not need to split again for other restores
     try {
       // Recover.Edits
       SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
           new FSVisitor.RecoveredEditsVisitor() {
+        @Override
         public void recoveredEdits (final String region, final String logfile) throws IOException {
           Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
           logSplitter.splitRecoveredEdit(path);
@@ -639,6 +645,7 @@ public class RestoreSnapshotHelper {
 
       // Region Server Logs
       SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
+        @Override
         public void logFile (final String server, final String logfile) throws IOException {
           logSplitter.splitLog(server, logfile);
         }
@@ -689,4 +696,45 @@ public class RestoreSnapshotHelper {
     }
     return htd;
   }
+
+  /**
+   * Copy the snapshot files for a snapshot scanner, discards meta changes.
+   * @param conf
+   * @param fs
+   * @param rootDir
+   * @param restoreDir
+   * @param snapshotName
+   * @throws IOException
+   */
+  public static void copySnapshotForScanner(Configuration conf, FileSystem fs, Path rootDir,
+      Path restoreDir, String snapshotName) throws IOException {
+    // ensure that restore dir is not under root dir
+    if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
+      throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " +
+          "should be the same");
+    }
+    if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
+      throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +
+          "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
+    }
+
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+    //load table descriptor
+    HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Restoring  snapshot '" + snapshotName + "' to directory " + restoreDir);
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
+
+    RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc,
+        snapshotDir, htd, restoreDir, monitor, status);
+    helper.restoreHdfsRegions(); // TODO: parallelize.
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restored table dir:" + restoreDir);
+      FSUtils.logFileSystemState(fs, restoreDir, LOG);
+    }
+  }
 }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Sun Mar  9 01:58:43 2014
@@ -54,7 +54,7 @@ public abstract class AbstractHBaseTool 
   protected Configuration conf = null;
 
   private static final Set<String> requiredOptions = new TreeSet<String>();
-  
+
   protected String[] cmdLineArgs = null;
 
   /**
@@ -151,6 +151,11 @@ public abstract class AbstractHBaseTool 
     addOptWithArg(opt, description);
   }
 
+  protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) {
+    requiredOptions.add(longOpt);
+    addOptWithArg(shortOpt, longOpt, description);
+  }
+
   protected void addOptNoArg(String opt, String description) {
     options.addOption(opt, false, description);
   }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java Sun Mar  9 01:58:43 2014
@@ -84,6 +84,26 @@ public abstract class ModifyRegionUtils 
   public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
       final RegionFillTask task) throws IOException {
+
+      Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
+      return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+  }
+
+  /**
+   * Create new set of regions on the specified file-system.
+   * NOTE: that you should add the regions to hbase:meta after this operation.
+   *
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param tableDir table directory
+   * @param hTableDescriptor description of the table
+   * @param newRegions {@link HRegionInfo} that describes the regions to create
+   * @param task {@link RegionFillTask} custom code to populate region after creation
+   * @throws IOException
+   */
+  public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
+      final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+      final RegionFillTask task) throws IOException {
     if (newRegions == null) return null;
     int regionNumber = newRegions.length;
     ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
@@ -93,26 +113,14 @@ public abstract class ModifyRegionUtils 
     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
     for (final HRegionInfo newRegion : newRegions) {
       completionService.submit(new Callable<HRegionInfo>() {
+        @Override
         public HRegionInfo call() throws IOException {
-          // 1. Create HRegion
-          HRegion region = HRegion.createHRegion(newRegion,
-              rootDir, conf, hTableDescriptor, null,
-              false, true);
-          try {
-            // 2. Custom user code to interact with the created region
-            if (task != null) {
-              task.fillRegion(region);
-            }
-          } finally {
-            // 3. Close the new region to flush to disk. Close log file too.
-            region.close();
-          }
-          return region.getRegionInfo();
+          return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
         }
       });
     }
     try {
-      // 4. wait for all regions to finish creation
+      // wait for all regions to finish creation
       for (int i = 0; i < regionNumber; i++) {
         Future<HRegionInfo> future = completionService.take();
         HRegionInfo regionInfo = future.get();
@@ -129,6 +137,35 @@ public abstract class ModifyRegionUtils 
     return regionInfos;
   }
 
+  /**
+   * Create new set of regions on the specified file-system.
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param tableDir table directory
+   * @param hTableDescriptor description of the table
+   * @param newRegion {@link HRegionInfo} that describes the region to create
+   * @param task {@link RegionFillTask} custom code to populate region after creation
+   * @throws IOException
+   */
+  public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
+      final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
+      final RegionFillTask task) throws IOException {
+    // 1. Create HRegion
+    HRegion region = HRegion.createHRegion(newRegion,
+      rootDir, tableDir, conf, hTableDescriptor, null,
+      false, true);
+    try {
+      // 2. Custom user code to interact with the created region
+      if (task != null) {
+        task.fillRegion(region);
+      }
+    } finally {
+      // 3. Close the new region to flush to disk. Close log file too.
+      region.close();
+    }
+    return region.getRegionInfo();
+  }
+
   /*
    * used by createRegions() to get the thread pool executor based on the
    * "hbase.hregion.open.and.init.threads.max" property.
@@ -142,6 +179,7 @@ public abstract class ModifyRegionUtils 
         new ThreadFactory() {
           private int count = 1;
 
+          @Override
           public Thread newThread(Runnable r) {
             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
             return t;

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Sun Mar  9 01:58:43 2014
@@ -163,6 +163,7 @@ public class HBaseTestingUtility extends
    *  mini dfs.
    *  @deprecated can be used only with mini dfs
    */
+  @Deprecated
   private static final String TEST_DIRECTORY_KEY = "test.build.data";
 
   /** Filesystem URI used for map-reduce mini-cluster setup */
@@ -1612,24 +1613,7 @@ public class HBaseTestingUtility extends
    * @throws IOException
    */
   public int loadTable(final HTable t, final byte[] f) throws IOException {
-    t.setAutoFlush(false, true);
-    byte[] k = new byte[3];
-    int rowCount = 0;
-    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
-      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
-        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
-          k[0] = b1;
-          k[1] = b2;
-          k[2] = b3;
-          Put put = new Put(k);
-          put.add(f, null, k);
-          t.put(put);
-          rowCount++;
-        }
-      }
-    }
-    t.flushCommits();
-    return rowCount;
+    return loadTable(t, new byte[][] {f});
   }
 
   /**
@@ -1640,28 +1624,83 @@ public class HBaseTestingUtility extends
    * @throws IOException
    */
   public int loadTable(final HTable t, final byte[][] f) throws IOException {
-    t.setAutoFlush(false, true);
-    byte[] k = new byte[3];
+    return loadTable(t, f, null);
+  }
+
+  /**
+   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
+   * @param t Table
+   * @param f Array of Families to load
+   * @param value the values of the cells. If null is passed, the row key is used as value
+   * @return Count of rows loaded.
+   * @throws IOException
+   */
+  public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
+    t.setAutoFlush(false);
     int rowCount = 0;
-    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
-      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
-        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
-          k[0] = b1;
-          k[1] = b2;
-          k[2] = b3;
-          Put put = new Put(k);
-          for (int i = 0; i < f.length; i++) {
-            put.add(f[i], null, k);
-          }
-          t.put(put);
-          rowCount++;
-        }
+    for (byte[] row : HBaseTestingUtility.ROWS) {
+      Put put = new Put(row);
+      for (int i = 0; i < f.length; i++) {
+        put.add(f[i], null, value != null ? value : row);
       }
+      t.put(put);
+      rowCount++;
     }
     t.flushCommits();
     return rowCount;
   }
 
+  /** A tracker for tracking and validating table rows
+   * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
+   */
+  public static class SeenRowTracker {
+    int dim = 'z' - 'a' + 1;
+    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
+    byte[] startRow;
+    byte[] stopRow;
+
+    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
+      this.startRow = startRow;
+      this.stopRow = stopRow;
+    }
+
+    void reset() {
+      for (byte[] row : ROWS) {
+        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
+      }
+    }
+
+    int i(byte b) {
+      return b - 'a';
+    }
+
+    public void addRow(byte[] row) {
+      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
+    }
+
+    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
+     * all other rows none
+     */
+    public void validate() {
+      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+            int count = seenRows[i(b1)][i(b2)][i(b3)];
+            int expectedCount = 0;
+            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
+                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
+              expectedCount = 1;
+            }
+            if (count != expectedCount) {
+              String row = new String(new byte[] {b1,b2,b3});
+              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
+            }
+          }
+        }
+      }
+    }
+  }
+
   public int loadRegion(final HRegion r, final byte[] f) throws IOException {
     return loadRegion(r, f, false);
   }
@@ -1773,6 +1812,22 @@ public class HBaseTestingUtility extends
     return createMultiRegions(getConfiguration(), table, columnFamily);
   }
 
+  /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
+  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
+  static {
+    int i = 0;
+    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+          ROWS[i][0] = b1;
+          ROWS[i][1] = b2;
+          ROWS[i][2] = b3;
+          i++;
+        }
+      }
+    }
+  }
+
   public static final byte[][] KEYS = {
     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
@@ -3206,4 +3261,15 @@ public class HBaseTestingUtility extends
     };
   }
 
+  /**
+   * Returns a {@link Predicate} for checking that table is enabled
+   */
+  public Waiter.Predicate<Exception> predicateTableEnabled(final TableName tableName) {
+    return new Waiter.Predicate<Exception>() {
+     @Override
+     public boolean evaluate() throws Exception {
+       return getHBaseAdmin().isTableEnabled(tableName);
+      }
+    };
+  }
 }

Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java Sun Mar  9 01:58:43 2014
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * A simple performance evaluation tool for single client and MR scans
+ * and snapshot scans.
+ */
+public class ScanPerformanceEvaluation extends AbstractHBaseTool {
+
+  private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
+
+  private String type;
+  private String file;
+  private String tablename;
+  private String snapshotName;
+  private String restoreDir;
+  private String caching;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    Path rootDir;
+    try {
+      rootDir = FSUtils.getRootDir(conf);
+      rootDir.getFileSystem(conf);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  protected void addOptions() {
+    this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
+    this.addOptWithArg("f", "file", "the filename to read from");
+    this.addOptWithArg("tn", "table", "the tablename to read from");
+    this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
+    this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
+    this.addOptWithArg("ch", "caching", "scanner caching value");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    type = cmd.getOptionValue("type");
+    file = cmd.getOptionValue("file");
+    tablename = cmd.getOptionValue("table");
+    snapshotName = cmd.getOptionValue("snapshot");
+    restoreDir = cmd.getOptionValue("restoredir");
+    caching = cmd.getOptionValue("caching");
+  }
+
+  protected void testHdfsStreaming(Path filename) throws IOException {
+    byte[] buf = new byte[1024];
+    FileSystem fs = filename.getFileSystem(getConf());
+
+    // read the file from start to finish
+    Stopwatch fileOpenTimer = new Stopwatch();
+    Stopwatch streamTimer = new Stopwatch();
+
+    fileOpenTimer.start();
+    FSDataInputStream in = fs.open(filename);
+    fileOpenTimer.stop();
+
+    long totalBytes = 0;
+    streamTimer.start();
+    while (true) {
+      int read = in.read(buf);
+      if (read < 0) {
+        break;
+      }
+      totalBytes += read;
+    }
+    streamTimer.stop();
+
+    double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
+
+    System.out.println("HDFS streaming: ");
+    System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+  }
+
+  private Scan getScan() {
+    Scan scan = new Scan(); // default scan settings
+    scan.setCacheBlocks(false);
+    scan.setMaxVersions(1);
+    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+    if (caching != null) {
+      scan.setCaching(Integer.parseInt(caching));
+    }
+
+    return scan;
+  }
+
+  public void testScan() throws IOException {
+    Stopwatch tableOpenTimer = new Stopwatch();
+    Stopwatch scanOpenTimer = new Stopwatch();
+    Stopwatch scanTimer = new Stopwatch();
+
+    tableOpenTimer.start();
+    HTable table = new HTable(getConf(), TableName.valueOf(tablename));
+    tableOpenTimer.stop();
+
+    Scan scan = getScan();
+    scanOpenTimer.start();
+    ResultScanner scanner = table.getScanner(scan);
+    scanOpenTimer.stop();
+
+    long numRows = 0;
+    long numCells = 0;
+    scanTimer.start();
+    while (true) {
+      Result result = scanner.next();
+      if (result == null) {
+        break;
+      }
+      numRows++;
+
+      numCells += result.rawCells().length;
+    }
+    scanTimer.stop();
+    scanner.close();
+    table.close();
+
+    ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
+    long totalBytes = metrics.countOfBytesInResults.get();
+    double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan: ");
+    System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+
+  public void testSnapshotScan() throws IOException {
+    Stopwatch snapshotRestoreTimer = new Stopwatch();
+    Stopwatch scanOpenTimer = new Stopwatch();
+    Stopwatch scanTimer = new Stopwatch();
+
+    Path restoreDir = new Path(this.restoreDir);
+
+    snapshotRestoreTimer.start();
+    restoreDir.getFileSystem(conf).delete(restoreDir, true);
+    snapshotRestoreTimer.stop();
+
+    Scan scan = getScan();
+    scanOpenTimer.start();
+    TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+    scanOpenTimer.stop();
+
+    long numRows = 0;
+    long numCells = 0;
+    scanTimer.start();
+    while (true) {
+      Result result = scanner.next();
+      if (result == null) {
+        break;
+      }
+      numRows++;
+
+      numCells += result.rawCells().length;
+    }
+    scanTimer.stop();
+    scanner.close();
+
+    ScanMetrics metrics = scanner.getScanMetrics();
+    long totalBytes = metrics.countOfBytesInResults.get();
+    double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan snapshot: ");
+    System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
+    System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+
+  }
+
+  public static enum ScanCounter {
+    NUM_ROWS,
+    NUM_CELLS,
+  }
+
+  public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value,
+        Context context) throws IOException,
+        InterruptedException {
+      context.getCounter(ScanCounter.NUM_ROWS).increment(1);
+      context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
+    }
+  }
+
+  public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+    Stopwatch scanOpenTimer = new Stopwatch();
+    Stopwatch scanTimer = new Stopwatch();
+
+    Scan scan = getScan();
+
+    String jobName = "testScanMapReduce";
+
+    Job job = new Job(conf);
+    job.setJobName(jobName);
+
+    job.setJarByClass(getClass());
+
+    TableMapReduceUtil.initTableMapperJob(
+        this.tablename,
+        scan,
+        MyMapper.class,
+        NullWritable.class,
+        NullWritable.class,
+        job
+    );
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    scanTimer.start();
+    job.waitForCompletion(true);
+    scanTimer.stop();
+
+    Counters counters = job.getCounters();
+    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+    double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan mapreduce: ");
+    System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+  public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+    Stopwatch scanOpenTimer = new Stopwatch();
+    Stopwatch scanTimer = new Stopwatch();
+
+    Scan scan = getScan();
+
+    String jobName = "testSnapshotScanMapReduce";
+
+    Job job = new Job(conf);
+    job.setJobName(jobName);
+
+    job.setJarByClass(getClass());
+
+    TableMapReduceUtil.initTableSnapshotMapperJob(
+        this.snapshotName,
+        scan,
+        MyMapper.class,
+        NullWritable.class,
+        NullWritable.class,
+        job,
+        true,
+        new Path(restoreDir)
+    );
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    scanTimer.start();
+    job.waitForCompletion(true);
+    scanTimer.stop();
+
+    Counters counters = job.getCounters();
+    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+    double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+    double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+    System.out.println("HBase scan mapreduce: ");
+    System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+    System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+    System.out.println("total bytes: " + totalBytes + " bytes ("
+        + StringUtils.humanReadableInt(totalBytes) + ")");
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+    System.out.println("total rows  : " + numRows);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+    System.out.println("total cells : " + numCells);
+    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+  }
+
+  @Override
+  protected int doWork() throws Exception {
+    if (type.equals("streaming")) {
+      testHdfsStreaming(new Path(file));
+    } else if (type.equals("scan")){
+      testScan();
+    } else if (type.equals("snapshotscan")) {
+      testSnapshotScan();
+    } else if (type.equals("scanmapreduce")) {
+      testScanMapReduce();
+    } else if (type.equals("snapshotscanmapreduce")) {
+      testSnapshotScanMapReduce();
+    }
+    return 0;
+  }
+
+  public static void main (String[] args) throws Exception {
+    int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
+    System.exit(ret);
+  }
+}

Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java Sun Mar  9 01:58:43 2014
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotScanner {
+
+  private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final int NUM_REGION_SERVERS = 2;
+  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+  public static byte[] bbb = Bytes.toBytes("bbb");
+  public static byte[] yyy = Bytes.toBytes("yyy");
+
+  private FileSystem fs;
+  private Path rootDir;
+
+  public void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(NUM_REGION_SERVERS);
+    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    fs = rootDir.getFileSystem(UTIL.getConfiguration());
+  }
+
+  public void tearDownCluster() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static void setupConf(Configuration conf) {
+    // Enable snapshot
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, int numRegions)
+      throws Exception {
+    try {
+      util.deleteTable(tableName);
+    } catch(Exception ex) {
+      // ignore
+    }
+
+    if (numRegions > 1) {
+      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
+    } else {
+      util.createTable(tableName, FAMILIES);
+    }
+    HBaseAdmin admin = util.getHBaseAdmin();
+
+    // put some stuff in the table
+    HTable table = new HTable(util.getConfiguration(), tableName);
+    util.loadTable(table, FAMILIES);
+
+    Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+    // load different values
+    byte[] value = Bytes.toBytes("after_snapshot_value");
+    util.loadTable(table, FAMILIES, value);
+
+    // cause flush to create new files in the region
+    admin.flush(tableName.toString());
+    table.close();
+  }
+
+  @Test
+  public void testWithSingleRegion() throws Exception {
+    testScanner(UTIL, "testWithSingleRegion", 1, false);
+  }
+
+  @Test
+  public void testWithMultiRegion() throws Exception {
+    testScanner(UTIL, "testWithMultiRegion", 10, false);
+  }
+
+  @Test
+  public void testWithOfflineHBaseMultiRegion() throws Exception {
+    testScanner(UTIL, "testWithMultiRegion", 20, true);
+  }
+
+  private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, boolean shutdownCluster)
+      throws Exception {
+    setupCluster();
+    TableName tableName = TableName.valueOf("testScanner");
+    try {
+      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+      if (shutdownCluster) {
+        util.shutdownMiniHBaseCluster();
+      }
+
+      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan(bbb, yyy); // limit the scan
+
+      TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
+
+      verifyScanner(scanner, bbb, yyy);
+      scanner.close();
+    } finally {
+      if (!shutdownCluster) {
+        util.getHBaseAdmin().deleteSnapshot(snapshotName);
+        util.deleteTable(tableName);
+        tearDownCluster();
+      }
+    }
+  }
+
+  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
+      throws IOException, InterruptedException {
+
+    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+    while (true) {
+      Result result = scanner.next();
+      if (result == null) {
+        break;
+      }
+      verifyRow(result);
+      rowTracker.addRow(result.getRow());
+    }
+
+    // validate all rows are seen
+    rowTracker.validate();
+  }
+
+  private static void verifyRow(Result result) throws IOException {
+    byte[] row = result.getRow();
+    CellScanner scanner = result.cellScanner();
+    while (scanner.advance()) {
+      Cell cell = scanner.current();
+
+      //assert that all Cells in the Result have the same key
+     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+    }
+
+    for (int j = 0; j < FAMILIES.length; j++) {
+      byte[] actual = result.getValue(FAMILIES[j], null);
+      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+          + " ,actual:" + Bytes.toString(actual), row, actual);
+    }
+  }
+
+}

Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java Sun Mar  9 01:58:43 2014
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormat {
+
+  private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final int NUM_REGION_SERVERS = 2;
+  private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
+  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+  private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
+  public static byte[] bbb = Bytes.toBytes("bbb");
+  public static byte[] yyy = Bytes.toBytes("yyy");
+
+  private FileSystem fs;
+  private Path rootDir;
+
+  public void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(NUM_REGION_SERVERS);
+    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    fs = rootDir.getFileSystem(UTIL.getConfiguration());
+  }
+
+  public void tearDownCluster() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static void setupConf(Configuration conf) {
+    // Enable snapshot
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testGetBestLocations() throws IOException {
+    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+    Configuration conf = UTIL.getConfiguration();
+
+    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
+    Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution = new HDFSBlocksDistribution();
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
+    Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
+    Assert.assertEquals(Lists.newArrayList("h1", "h2"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
+    Assert.assertEquals(Lists.newArrayList("h2", "h1"), tsif.getBestLocations(conf, blockDistribution));
+
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
+    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
+
+    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), tsif.getBestLocations(conf, blockDistribution));
+  }
+
+  public static enum TestTableSnapshotCounters {
+    VALIDATION_ERROR
+  }
+
+  public static class TestTableSnapshotMapper
+    extends TableMapper<ImmutableBytesWritable, NullWritable> {
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value,
+        Context context) throws IOException, InterruptedException {
+      // Validate a single row coming from the snapshot, and emit the row key
+      verifyRowFromMap(key, value);
+      context.write(key, NullWritable.get());
+    }
+  }
+
+  public static class TestTableSnapshotReducer
+    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
+    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
+    @Override
+    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
+       Context context) throws IOException, InterruptedException {
+      rowTracker.addRow(key.get());
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      rowTracker.validate();
+    }
+  }
+
+  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, int numRegions)
+      throws Exception {
+    try {
+      util.deleteTable(tableName);
+    } catch(Exception ex) {
+      // ignore
+    }
+
+    if (numRegions > 1) {
+      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
+    } else {
+      util.createTable(tableName, FAMILIES);
+    }
+    HBaseAdmin admin = util.getHBaseAdmin();
+
+    // put some stuff in the table
+    HTable table = new HTable(util.getConfiguration(), tableName);
+    util.loadTable(table, FAMILIES);
+
+    Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+    // load different values
+    byte[] value = Bytes.toBytes("after_snapshot_value");
+    util.loadTable(table, FAMILIES, value);
+
+    // cause flush to create new files in the region
+    admin.flush(tableName.toString());
+    table.close();
+  }
+
+  @Test
+  public void testWithMockedMapReduceSingleRegion() throws Exception {
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+  }
+
+  @Test
+  public void testWithMockedMapReduceMultiRegion() throws Exception {
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+  }
+
+  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits)
+      throws Exception {
+    setupCluster();
+    TableName tableName = TableName.valueOf("testWithMockedMapReduce");
+    try {
+      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+      Job job = new Job(util.getConfiguration());
+      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan(bbb, yyy); // limit the scan
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+          scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+          NullWritable.class, job, false, tmpTableDir);
+
+      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
+
+    } finally {
+      util.getHBaseAdmin().deleteSnapshot(snapshotName);
+      util.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
+      byte[] startRow, byte[] stopRow)
+      throws IOException, InterruptedException {
+    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+    List<InputSplit> splits = tsif.getSplits(job);
+
+    Assert.assertEquals(expectedNumSplits, splits.size());
+
+    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+    for (int i = 0; i < splits.size(); i++) {
+      // validate input split
+      InputSplit split = splits.get(i);
+      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+
+      // validate record reader
+      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+      RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext);
+      rr.initialize(split, taskAttemptContext);
+
+      // validate we can read all the data back
+      while (rr.nextKeyValue()) {
+        byte[] row = rr.getCurrentKey().get();
+        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
+        rowTracker.addRow(row);
+      }
+
+      rr.close();
+    }
+
+    // validate all rows are seen
+    rowTracker.validate();
+  }
+
+  public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException {
+    byte[] row = key.get();
+    CellScanner scanner = result.cellScanner();
+    while (scanner.advance()) {
+      Cell cell = scanner.current();
+
+      //assert that all Cells in the Result have the same key
+     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+    }
+
+    for (int j = 0; j < FAMILIES.length; j++) {
+      byte[] actual = result.getValue(FAMILIES[j], null);
+      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+          + " ,actual:" + Bytes.toString(actual), row, actual);
+    }
+  }
+
+  @Test
+  public void testWithMapReduceSingleRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+  }
+
+  @Test
+  public void testWithMapReduceMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+  }
+
+  @Test
+  // run the MR job while HBase is offline
+  public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+    testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+  }
+
+  private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
+      int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+    setupCluster();
+    util.startMiniMapReduceCluster();
+    try {
+      Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
+      TableName tableName = TableName.valueOf("testWithMapReduce");
+      doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
+        expectedNumSplits, shutdownCluster);
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      tearDownCluster();
+    }
+  }
+
+  // this is also called by the IntegrationTestTableSnapshotInputFormat
+  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+      String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster)
+          throws Exception {
+
+    //create the table and snapshot
+    createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+    if (shutdownCluster) {
+      util.shutdownMiniHBaseCluster();
+    }
+
+    try {
+      // create the job
+      Job job = new Job(util.getConfiguration());
+      Scan scan = new Scan(bbb, yyy); // limit the scan
+
+      job.setJarByClass(util.getClass());
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+        scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+        NullWritable.class, job, true, tableDir);
+
+      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+      job.setNumReduceTasks(1);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      Assert.assertTrue(job.waitForCompletion(true));
+    } finally {
+      if (!shutdownCluster) {
+        util.getHBaseAdmin().deleteSnapshot(snapshotName);
+        util.deleteTable(tableName);
+      }
+    }
+  }
+}

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java Sun Mar  9 01:58:43 2014
@@ -664,7 +664,7 @@ public class TestRegionPlacement {
 
   /**
    * Create a table with specified table name and region number.
-   * @param table
+   * @param tablename
    * @param regionNum
    * @return
    * @throws IOException



Mime
View raw message