hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r950321 [1/3] - in /hbase/trunk: ./ src/docs/src/documentation/content/xdocs/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/h...
Date Wed, 02 Jun 2010 00:40:49 GMT
Author: todd
Date: Wed Jun  2 00:40:48 2010
New Revision: 950321

URL: http://svn.apache.org/viewvc?rev=950321&view=rev
Log:
HBASE-1923. Bulk incremental load into an existing table

Added:
    hbase/trunk/src/docs/src/documentation/content/xdocs/bulk-loads.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/pom.xml
    hbase/trunk/src/docs/src/documentation/content/xdocs/site.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Jun  2 00:40:48 2010
@@ -694,6 +694,7 @@ Release 0.21.0 - Unreleased
    HBASE-2559  Set hbase.hregion.majorcompaction to 0 to disable
    HBASE-1200  Add bloomfilters (Nicolas Spiegelberg via Stack)
    HBASE-2588  Add easier way to ship HBase dependencies to MR cluster within Job
+   HBASE-1923  Bulk incremental load into an existing table
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Wed Jun  2 00:40:48 2010
@@ -433,6 +433,7 @@
     <jetty.version>6.1.24</jetty.version>
     <jetty.jspapi.version>6.1.14</jetty.jspapi.version>
     <junit.version>4.8.1</junit.version>
+    <mockito-all.version>1.8.4</mockito-all.version>
     <log4j.version>1.2.15</log4j.version>
     <zookeeper.version>3.3.1</zookeeper.version>
 
@@ -530,6 +531,12 @@
         <version>${junit.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>${mockito-all.version}</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <dependencies>
@@ -594,6 +601,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>r03</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
     </dependency>
@@ -693,6 +706,10 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
       <version>${commons-math.version}</version>

Added: hbase/trunk/src/docs/src/documentation/content/xdocs/bulk-loads.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docs/src/documentation/content/xdocs/bulk-loads.xml?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/docs/src/documentation/content/xdocs/bulk-loads.xml (added)
+++ hbase/trunk/src/docs/src/documentation/content/xdocs/bulk-loads.xml Wed Jun  2 00:40:48 2010
@@ -0,0 +1,148 @@
+<?xml version="1.0"?>
+<!--
+  Copyright 2010 The Apache Software Foundation
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+          "http://forrest.apache.org/dtd/document-v20.dtd">
+
+
+<document>
+
+  <header>
+    <title> 
+      Bulk Loads in HBase
+    </title>
+  </header>
+  <body>
+    <section>
+      <title>Overview</title>
+      <p>
+        HBase includes several methods of loading data into tables.
+        The most straightforward method is to either use the TableOutputFormat
+        class from a MapReduce job, or use the normal client APIs; however,
+        these are not always the most efficient methods.
+      </p>
+      <p>
+        This document describes HBase's bulk load functionality. The bulk load
+        feature uses a MapReduce job to output table data in HBase's internal
+        data format, and then directly loads the data files into a running
+        cluster.
+      </p>
+    </section>
+    <section>
+      <title>Bulk Load Architecture</title>
+      <p>
+        The HBase bulk load process consists of two main steps.
+      </p>
+      <subsection>
+        <title>Preparing data via a MapReduce job</title>
+        <p>
+          The first step of a bulk load is to generate HBase data files from
+          a MapReduce job using HFileOutputFormat. This output format writes
+          out data in HBase's internal storage format so that they can be
+          later loaded very efficiently into the cluster.
+        </p>
+        <p>
+          In order to function efficiently, HFileOutputFormat must be configured
+          such that each output HFile fits within a single region. In order to
+          do this, jobs use Hadoop's TotalOrderPartitioner class to partition the
+          map output into disjoint ranges of the key space, corresponding to the
+          key ranges of the regions in the table.
+        </p>
+        <p>
+          HFileOutputFormat includes a convenience function, <code>configureIncrementalLoad()</code>,
+          which automatically sets up a TotalOrderPartitioner based on the current
+          region boundaries of a table.
+        </p>
+      </subsection>
+      <subsection>
+        <title>Completing the data load</title>
+        <p>
+          After the data has been prepared using <code>HFileOutputFormat</code>, it
+          is loaded into the cluster using a command line tool. This command line tool
+          iterates through the prepared data files, and for each one determines the
+          region the file belongs to. It then contacts the appropriate Region Server
+          which adopts the HFile, moving it into its storage directory and making
+          the data available to clients.
+        </p>
+        <p>
+          If the region boundaries have changed during the course of bulk load
+          preparation, or between the preparation and completion steps, the bulk
+          load commandline utility will automatically split the data files into
+          pieces corresponding to the new boundaries. This process is not
+          optimally efficient, so users should take care to minimize the delay between
+          preparing a bulk load and importing it into the cluster, especially
+          if other clients are simultaneously loading data through other means.
+        </p>
+      </subsection>
+    </section>
+    <section>
+      <title>Preparing a bulk load using the <code>importtsv</code> tool</title>
+      <p>
+        HBase ships with a command line tool called <code>importtsv</code>. This tool
+        is available by running <code>hadoop jar /path/to/hbase-VERSION.jar importtsv</code>.
+        Running this tool with no arguments prints brief usage information:
+      </p>
+      <code><pre>
+Usage: importtsv -Dimporttsv.columns=a,b,c &lt;tablename&gt; &lt;inputdir&gt;
+
+Imports the given input directory of TSV data into the specified table.
+
+The column names of the TSV data must be specified using the -Dimporttsv.columns
+option. This option takes the form of comma-separated column names, where each
+column name is either a simple column family, or a columnfamily:qualifier. The special
+column name HBASE_ROW_KEY is used to designate that this column should be used
+as the row key for each imported record. You must specify exactly one column
+to be the row key.
+
+In order to prepare data for a bulk data load, pass the option:
+  -Dimporttsv.bulk.output=/path/for/output
+
+Other options that may be specified with -D include:
+  -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
+</pre></code>
+    </section>
+    <section>
+      <title>Importing the prepared data using the <code>completebulkload</code> tool</title>
+      <p>
+        After a data import has been prepared using the <code>importtsv</code> tool, the
+        <code>completebulkload</code> tool is used to import the data into the running cluster.
+      </p>
+      <p>
+        The <code>completebulkload</code> tool simply takes the same output path where
+        <code>importtsv</code> put its results, and the table name. For example:
+      </p>
+      <code>$ hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable</code>
+      <p>
+        This tool will run quickly, after which point the new data will be visible in
+        the cluster.
+      </p>
+    </section>
+    <section>
+      <title>Advanced Usage</title>
+      <p>
+        Although the <code>importtsv</code> tool is useful in many cases, advanced users may
+        want to generate data programatically, or import data from other formats. To get
+        started doing so, dig into <code>ImportTsv.java</code> and check the JavaDoc for
+        HFileOutputFormat.
+      </p>
+      <p>
+        The import step of the bulk load can also be done programatically. See the
+        <code>LoadIncrementalHFiles</code> class for more information.
+      </p>
+    </section>
+  </body>
+</document>
\ No newline at end of file

Modified: hbase/trunk/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docs/src/documentation/content/xdocs/site.xml?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hbase/trunk/src/docs/src/documentation/content/xdocs/site.xml Wed Jun  2 00:40:48 2010
@@ -35,6 +35,7 @@ See http://forrest.apache.org/docs/linki
     <overview  label="Overview"           href="index.html" />
     <started   label="Getting Started"    href="ext:api/started" />
     <api       label="API Docs"           href="ext:api/index" />
+    <api       label="Bulk Loads"         href="bulk-load.html" />
     <api       label="HBase Metrics"      href="metrics.html" />
     <api       label="HBase Semantics"      href="acid-semantics.html" />
     <api       label="HBase  Default Configuration" href="hbase-conf.html" />

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Wed Jun  2 00:40:48 2010
@@ -357,11 +357,6 @@ public class HRegionInfo extends Version
     return elements;
   }
 
-  /** @return the endKey */
-  public byte [] getEndKey(){
-    return endKey;
-  }
-
   /** @return the regionId */
   public long getRegionId(){
     return regionId;
@@ -402,6 +397,32 @@ public class HRegionInfo extends Version
   public byte [] getStartKey(){
     return startKey;
   }
+  
+  /** @return the endKey */
+  public byte [] getEndKey(){
+    return endKey;
+  }
+
+  /**
+   * Returns true if the given inclusive range of rows is fully contained
+   * by this region. For example, if the region is foo,a,g and this is
+   * passed ["b","c"] or ["a","c"] it will return true, but if this is passed
+   * ["b","z"] it will return false.
+   * @throws IllegalArgumentException if the range passed is invalid (ie end < start)
+   */
+  public boolean containsRange(byte[] rangeStartKey, byte[] rangeEndKey) {
+    if (Bytes.compareTo(rangeStartKey, rangeEndKey) > 0) {
+      throw new IllegalArgumentException(
+      "Invalid range: " + Bytes.toStringBinary(rangeStartKey) +
+      " > " + Bytes.toStringBinary(rangeEndKey));
+    }
+
+    boolean firstKeyInRange = Bytes.compareTo(rangeStartKey, startKey) >= 0;
+    boolean lastKeyInRange =
+      Bytes.compareTo(rangeEndKey, endKey) < 0 ||
+      Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY);
+    return firstKeyInRange && lastKeyInRange;
+  }
 
   /** @return the tableDesc */
   public HTableDescriptor getTableDesc(){

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Wed Jun  2 00:40:48 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.io;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.io.BytesWritable;
@@ -258,4 +259,11 @@ implements WritableComparable<ImmutableB
     }
     return results;
   }
+
+  /**
+   * Returns a copy of the bytes referred to by this writable
+   */
+  public byte[] copyBytes() {
+    return Arrays.copyOfRange(bytes, offset, offset+length);
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Jun  2 00:40:48 2010
@@ -462,7 +462,7 @@ public class HFile {
         throw new NullPointerException("Key nor value may be null");
       }
       if (checkPrefix &&
-          Bytes.toString(k).toLowerCase().startsWith(FileInfo.RESERVED_PREFIX)) {
+          Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
         throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
           " are reserved");
       }
@@ -1069,6 +1069,8 @@ public class HFile {
 
     /**
      * @return First key in the file.  May be null if file has no entries.
+     * Note that this is not the first rowkey, but rather the byte form of
+     * the first KeyValue.
      */
     public byte [] getFirstKey() {
       if (blockIndex == null) {
@@ -1076,6 +1078,17 @@ public class HFile {
       }
       return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
     }
+    
+    /**
+     * @return the first row key, or null if the file is empty.
+     * TODO move this to StoreFile after Ryan's patch goes in
+     * to eliminate KeyValue here
+     */
+    public byte[] getFirstRowKey() {
+      byte[] firstKey = getFirstKey();
+      if (firstKey == null) return null;
+      return KeyValue.createKeyValueFromKey(firstKey).getRow();
+    }
 
     /**
      * @return number of KV entries in this HFile
@@ -1089,6 +1102,8 @@ public class HFile {
 
     /**
      * @return Last key in the file.  May be null if file has no entries.
+     * Note that this is not the last rowkey, but rather the byte form of
+     * the last KeyValue.
      */
     public byte [] getLastKey() {
       if (!isFileInfoLoaded()) {
@@ -1098,6 +1113,17 @@ public class HFile {
     }
 
     /**
+     * @return the last row key, or null if the file is empty.
+     * TODO move this to StoreFile after Ryan's patch goes in
+     * to eliminate KeyValue here
+     */
+    public byte[] getLastRowKey() {
+      byte[] lastKey = getLastKey();
+      if (lastKey == null) return null;
+      return KeyValue.createKeyValueFromKey(lastKey).getRow();
+    }
+    
+    /**
      * @return number of K entries in this HFile's filter.  Returns KV count if no filter.
      */
     public int getFilterEntries() {
@@ -1664,6 +1690,7 @@ public class HFile {
    */
   static class FileInfo extends HbaseMapWritable<byte [], byte []> {
     static final String RESERVED_PREFIX = "hfile.";
+    static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
     static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
     static final byte [] AVG_KEY_LEN =
       Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
@@ -1679,6 +1706,15 @@ public class HFile {
       super();
     }
   }
+  
+  /**
+   * Return true if the given file info key is reserved for internal
+   * use by HFile.
+   */
+  public static boolean isReservedFileInfoKey(byte[] key) {
+    return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
+  }
+
 
   /**
    * Get names of supported compression algorithms. The names are acceptable by
@@ -1861,5 +1897,4 @@ public class HFile {
       e.printStackTrace();
     }
   }
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Jun  2 00:40:48 2010
@@ -258,5 +258,10 @@ public interface HRegionInterface extend
    * @throws IOException e
    */
   public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-
+  
+  /**
+   * Bulk load an HFile into an open region
+   */
+  public void bulkLoadHFile(String hfilePath,
+      byte[] regionName, byte[] familyName) throws IOException;
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Wed Jun  2 00:40:48 2010
@@ -36,6 +36,9 @@ public class Driver {
       "Count rows in HBase table");
     pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
     pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+    pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
+    pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
+                 "Complete a bulk data load.");
     pgd.addClass(CopyTable.NAME, CopyTable.class,
         "Export a table from local cluster to peer cluster");
     pgd.driver(args);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Jun  2 00:40:48 2010
@@ -20,24 +20,40 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.mortbay.log.Log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Writes HFiles. Passed KeyValues must arrive in order.
@@ -48,7 +64,9 @@ import org.mortbay.log.Log;
  * @see KeyValueSortReducer
  */
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
-  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
+  static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+  
+  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
     // Get the path of the temporary output file
     final Path outputPath = FileOutputFormat.getOutputPath(context);
@@ -86,7 +104,7 @@ public class HFileOutputFormat extends F
             if (!fs.exists(basedir)) fs.mkdirs(basedir);
           }
           wl.writer = getNewWriter(wl.writer, basedir);
-          Log.info("Writer=" + wl.writer.getPath() +
+          LOG.info("Writer=" + wl.writer.getPath() +
             ((wl.written == 0)? "": ", wrote=" + wl.written));
           wl.written = 0;
         }
@@ -112,8 +130,10 @@ public class HFileOutputFormat extends F
 
       private void close(final HFile.Writer w) throws IOException {
         if (w != null) {
-          w.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, 
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+              Bytes.toBytes(context.getTaskAttemptID().toString()));
           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, 
               Bytes.toBytes(true));
           w.close();
@@ -136,4 +156,116 @@ public class HFileOutputFormat extends F
     long written = 0;
     HFile.Writer writer = null;
   }
+
+  /**
+   * Return the start keys of all of the regions in this table,
+   * as a list of ImmutableBytesWritable.
+   */
+  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
+  throws IOException {
+    byte[][] byteKeys = table.getStartKeys();
+    ArrayList<ImmutableBytesWritable> ret =
+      new ArrayList<ImmutableBytesWritable>(byteKeys.length);
+    for (byte[] byteKey : byteKeys) {
+      ret.add(new ImmutableBytesWritable(byteKey));
+    }
+    return ret;
+  }
+
+  /**
+   * Write out a SequenceFile that can be read by TotalOrderPartitioner
+   * that contains the split points in startKeys.
+   * @param partitionsPath output path for SequenceFile
+   * @param startKeys the region start keys
+   */
+  private static void writePartitions(Configuration conf, Path partitionsPath,
+      List<ImmutableBytesWritable> startKeys) throws IOException {
+    Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed");
+
+    // We're generating a list of split points, and we don't ever
+    // have keys < the first region (which has an empty start key)
+    // so we need to remove it. Otherwise we would end up with an
+    // empty reducer with index 0
+    TreeSet<ImmutableBytesWritable> sorted =
+      new TreeSet<ImmutableBytesWritable>(startKeys);
+
+    ImmutableBytesWritable first = sorted.first();
+    Preconditions.checkArgument(
+        first.equals(HConstants.EMPTY_BYTE_ARRAY),
+        "First region of table should have empty start key. Instead has: %s",
+        Bytes.toStringBinary(first.get()));
+    sorted.remove(first);
+    
+    // Write the actual file
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
+        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
+    
+    try {
+      for (ImmutableBytesWritable startKey : sorted) {
+        writer.append(startKey, NullWritable.get());
+      }
+    } finally {
+      writer.close();
+    }
+  }
+  
+  /**
+   * Configure a MapReduce Job to perform an incremental load into the given
+   * table. This
+   * <ul>
+   *   <li>Inspects the table to configure a total order partitioner</li>
+   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
+   *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
+   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+   *     PutSortReducer)</li>
+   * </ul> 
+   * The user should be sure to set the map output value class to either KeyValue or Put before
+   * running this function.
+   */
+  public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat.class);
+    
+    // Based on the configured map output class, set the correct reducer to properly
+    // sort the incoming values.
+    // TODO it would be nice to pick one or the other of these formats.
+    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(KeyValueSortReducer.class);
+    } else if (Put.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(PutSortReducer.class);
+    } else {
+      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+    }
+    
+    LOG.info("Looking up current regions for table " + table);
+    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
+    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+        "to match current region count");
+    job.setNumReduceTasks(startKeys.size());
+    
+    Path partitionsPath = new Path(job.getWorkingDirectory(),
+        "partitions_" + System.currentTimeMillis());
+    LOG.info("Writing partition information to " + partitionsPath);
+
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    writePartitions(conf, partitionsPath, startKeys);
+    partitionsPath.makeQualified(fs);
+    URI cacheUri;
+    try {
+      cacheUri = new URI(partitionsPath.toString() + "#" +
+          TotalOrderPartitioner.DEFAULT_PATH);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    DistributedCache.addCacheFile(cacheUri, conf);
+    DistributedCache.createSymlink(conf);
+    
+    LOG.info("Incremental table output configured.");
+  }
+  
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,345 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * Tool to import data from a TSV file.
+ *
+ * This tool is rather simplistic - it doesn't do any quoting or
+ * escaping, but is useful for many data loads.
+ *
+ * @see ImportTsv#usage(String)
+ */
+public class ImportTsv {
+  final static String NAME = "importtsv";
+
+  final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
+  final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
+  final static String COLUMNS_CONF_KEY = "importtsv.columns";
+
+  static class TsvParser {
+    /**
+     * Column families and qualifiers mapped to the TSV columns
+     */
+    private byte[][] families;
+    private byte[][] qualifiers;
+
+    private int rowKeyColumnIndex;
+    
+    public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
+
+    /**
+     * @param columnsSpecification the list of columns to parser out, comma separated.
+     * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+     */
+    public TsvParser(String columnsSpecification) {
+      ArrayList<String> columnStrings = Lists.newArrayList(
+        Splitter.on(',').trimResults().split(columnsSpecification));
+      
+      families = new byte[columnStrings.size()][];
+      qualifiers = new byte[columnStrings.size()][];
+
+      for (int i = 0; i < columnStrings.size(); i++) {
+        String str = columnStrings.get(i);
+        if (ROWKEY_COLUMN_SPEC.equals(str)) {
+          rowKeyColumnIndex = i;
+          continue;
+        }
+        String[] parts = str.split(":", 2);
+        if (parts.length == 1) {
+          families[i] = str.getBytes();
+          qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+        } else {
+          families[i] = parts[0].getBytes();
+          qualifiers[i] = parts[1].getBytes();
+        }
+      }
+    }
+    
+    public int getRowKeyColumnIndex() {
+      return rowKeyColumnIndex;
+    }
+    public byte[] getFamily(int idx) {
+      return families[idx];
+    }
+    public byte[] getQualifier(int idx) {
+      return qualifiers[idx];
+    }
+    
+    public ParsedLine parse(byte[] lineBytes, int length)
+    throws BadTsvLineException {
+      // Enumerate separator offsets
+      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
+      for (int i = 0; i < length; i++) {
+        if (lineBytes[i] == '\t') {
+          tabOffsets.add(i);
+        }
+      }
+      tabOffsets.add(length);
+      if (tabOffsets.size() > families.length) {
+        throw new BadTsvLineException("Bad line:\n");
+      }
+
+      return new ParsedLine(tabOffsets, lineBytes);
+    }
+    
+    class ParsedLine {
+      private final ArrayList<Integer> tabOffsets;
+      private byte[] lineBytes;
+      
+      ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
+        this.tabOffsets = tabOffsets;
+        this.lineBytes = lineBytes;
+      }
+      
+      public int getRowKeyOffset() {
+        return getColumnOffset(rowKeyColumnIndex);
+      }
+      public int getRowKeyLength() {
+        return getColumnLength(rowKeyColumnIndex);
+      }
+      public int getColumnOffset(int idx) {
+        if (idx > 0)
+          return tabOffsets.get(idx - 1) + 1;
+        else
+          return 0;
+      }      
+      public int getColumnLength(int idx) {
+        return tabOffsets.get(idx) - getColumnOffset(idx);
+      }
+      public int getColumnCount() {
+        return tabOffsets.size();
+      }
+      public byte[] getLineBytes() {
+        return lineBytes;
+      }
+    }
+    
+    public static class BadTsvLineException extends Exception {
+      public BadTsvLineException(String err) {
+        super(err);
+      }
+      private static final long serialVersionUID = 1L;
+    }
+  }
+  
+  /**
+   * Write table content out to files in hdfs.
+   */
+  static class TsvImporter
+  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
+  {
+    
+    /** Timestamp for all inserted rows */
+    private long ts;
+
+    /** Should skip bad lines */
+    private boolean skipBadLines;
+    private Counter badLineCount;
+
+    private TsvParser parser;
+
+    @Override
+    protected void setup(Context context) {
+      parser = new TsvParser(context.getConfiguration().get(
+                               COLUMNS_CONF_KEY));
+      if (parser.getRowKeyColumnIndex() == -1) {
+        throw new RuntimeException("No row key column specified");
+      }
+      ts = System.currentTimeMillis();
+
+      skipBadLines = context.getConfiguration().getBoolean(
+        SKIP_LINES_CONF_KEY, true);
+      badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+    }
+
+    /**
+     * Convert a line of TSV text into an HBase table row.
+     */
+    @Override
+    public void map(LongWritable offset, Text value,
+      Context context)
+    throws IOException {
+      byte[] lineBytes = value.getBytes();
+
+      try {
+        TsvParser.ParsedLine parsed = parser.parse(
+            lineBytes, value.getLength());
+        ImmutableBytesWritable rowKey =
+          new ImmutableBytesWritable(lineBytes,
+              parsed.getRowKeyOffset(),
+              parsed.getRowKeyLength());
+
+        Put put = new Put(rowKey.copyBytes());
+        for (int i = 0; i < parsed.getColumnCount(); i++) {
+          if (i == parser.getRowKeyColumnIndex()) continue;
+          KeyValue kv = new KeyValue(
+              lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+              parser.getFamily(i), 0, parser.getFamily(i).length,
+              parser.getQualifier(i), 0, parser.getQualifier(i).length,
+              ts,
+              KeyValue.Type.Put,
+              lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+          put.add(kv);
+        }
+        context.write(rowKey, put);
+      } catch (BadTsvLineException badLine) {
+        if (skipBadLines) {
+          System.err.println(
+              "Bad line at offset: " + offset.get() + ":\n" +
+              badLine.getMessage());
+          badLineCount.increment(1);
+          return;
+        } else {
+          throw new IOException(badLine);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    String tableName = args[0];
+    Path inputDir = new Path(args[1]);
+    Job job = new Job(conf, NAME + "_" + tableName);
+    job.setJarByClass(TsvImporter.class);
+    FileInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(TsvImporter.class);
+
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      HTable table = new HTable(conf, tableName);
+      job.setReducerClass(PutSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(Put.class);
+      HFileOutputFormat.configureIncrementalLoad(job, table);
+    } else {
+      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // to set up the TableOutputFormat.
+      TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+      job.setNumReduceTasks(0);
+    }
+    
+    TableMapReduceUtil.addDependencyJars(job);
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    String usage = 
+      "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
+      "\n" +
+      "Imports the given input directory of TSV data into the specified table.\n" +
+      "\n" +
+      "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
+      "option. This option takes the form of comma-separated column names, where each\n" +
+      "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
+      "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
+      "as the row key for each imported record. You must specify exactly one column\n" +
+      "to be the row key.\n" +
+      "\n" +
+      "In order to prepare data for a bulk data load, pass the option:\n" +
+      "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
+      "\n" +
+      "Other options that may be specified with -D include:\n" +
+      "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line";
+    System.err.println(usage);
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length < 2) {
+      usage("Wrong number of arguments: " + otherArgs.length);
+      System.exit(-1);
+    }
+
+    // Make sure columns are specified
+    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
+    if (columns == null) {
+      usage("No columns specified. Please specify with -D" +
+            COLUMNS_CONF_KEY+"=...");
+      System.exit(-1);
+    }
+
+    // Make sure they specify exactly one column as the row key
+    int rowkeysFound=0;
+    for (String col : columns) {
+      if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
+    }
+    if (rowkeysFound != 1) {
+      usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
+      System.exit(-1);
+    }
+
+    Job job = createSubmittableJob(conf, otherArgs);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,321 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool to load the output of HFileOutputFormat into an existing table.
+ * @see usage()
+ */
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+  static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+  public static String NAME = "completebulkload";
+  
+  public LoadIncrementalHFiles(Configuration conf) {
+    super(conf);
+  }
+  
+  public LoadIncrementalHFiles() {
+    super();
+  }
+
+
+  private void usage() {
+    System.err.println("usage: " + NAME +
+        " /path/to/hfileoutputformat-output " + 
+        "tablename");
+  }
+
+  /**
+   * Represents an HFile waiting to be loaded. An queue is used
+   * in this class in order to support the case where a region has
+   * split during the process of the load. When this happens,
+   * the HFile is split into two physical parts across the new
+   * region boundary, and each part is added back into the queue.
+   * The import process finishes when the queue is empty.
+   */
+  private static class LoadQueueItem {
+    final byte[] family;
+    final Path hfilePath;
+    
+    public LoadQueueItem(byte[] family, Path hfilePath) {
+      this.family = family;
+      this.hfilePath = hfilePath;
+    }
+  }
+
+  /**
+   * Walk the given directory for all HFiles, and return a Queue
+   * containing all such files.
+   */
+  private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
+  throws IOException {
+    FileSystem fs = hfofDir.getFileSystem(getConf());
+    
+    if (!fs.exists(hfofDir)) {
+      throw new FileNotFoundException("HFileOutputFormat dir " +
+          hfofDir + " not found"); 
+    }
+    
+    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+    if (familyDirStatuses == null) {
+      throw new FileNotFoundException("No families found in " + hfofDir);
+    }
+    
+    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
+    for (FileStatus stat : familyDirStatuses) {
+      if (!stat.isDir()) {
+        LOG.warn("Skipping non-directory " + stat.getPath());
+        continue;
+      }
+      Path familyDir = stat.getPath();
+      // Skip _logs, etc
+      if (familyDir.getName().startsWith("_")) continue;
+      byte[] family = familyDir.getName().getBytes();
+      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+      for (Path hfile : hfiles) {
+        if (hfile.getName().startsWith("_")) continue;
+        ret.add(new LoadQueueItem(family, hfile));
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.
+   * @param hfofDir the directory that was provided as the output path
+   * of a job using HFileOutputFormat
+   * @param table the table to load into
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Path hfofDir, HTable table)
+    throws TableNotFoundException, IOException
+  {
+    HConnection conn = table.getConnection();
+    
+    if (!conn.isTableAvailable(table.getTableName())) {
+      throw new TableNotFoundException("Table " + 
+          Bytes.toStringBinary(table.getTableName()) +
+          "is not currently available.");
+    }
+    
+    Deque<LoadQueueItem> queue = null;
+    try {
+      queue = discoverLoadQueue(hfofDir);
+      while (!queue.isEmpty()) {
+        LoadQueueItem item = queue.remove();
+        tryLoad(item, conn, table.getTableName(), queue);
+      }
+    } finally {
+      if (queue != null && !queue.isEmpty()) {
+        StringBuilder err = new StringBuilder();
+        err.append("-------------------------------------------------\n");
+        err.append("Bulk load aborted with some files not yet loaded:\n");
+        err.append("-------------------------------------------------\n");
+        for (LoadQueueItem q : queue) {
+          err.append("  ").append(q.hfilePath).append('\n');
+        }
+        LOG.error(err);
+      }
+    }
+  }
+
+  /**
+   * Attempt to load the given load queue item into its target region server.
+   * If the hfile boundary no longer fits into a region, physically splits
+   * the hfile such that the new bottom half will fit, and adds the two
+   * resultant hfiles back into the load queue.
+   */
+  private void tryLoad(final LoadQueueItem item,
+      HConnection conn, final byte[] table,
+      final Deque<LoadQueueItem> queue)
+  throws IOException {
+    final Path hfilePath = item.hfilePath;
+    final FileSystem fs = hfilePath.getFileSystem(getConf());
+    HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
+    final byte[] first, last;
+    try {
+      hfr.loadFileInfo();
+      first = hfr.getFirstRowKey();
+      last = hfr.getLastRowKey();
+    }  finally {
+      hfr.close();
+    }
+    
+    LOG.info("Trying to load hfile=" + hfilePath +
+        " first=" + Bytes.toStringBinary(first) +
+        " last="  + Bytes.toStringBinary(last));
+    if (first == null || last == null) {
+      assert first == null && last == null;
+      LOG.info("hfile " + hfilePath + " has no entries, skipping");
+      return;
+    }
+    
+    // We use a '_' prefix which is ignored when walking directory trees
+    // above.
+    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+    conn.getRegionServerWithRetries(
+      new ServerCallable<Void>(conn, table, first) {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Going to connect to server " + location +
+              "for row " + Bytes.toStringBinary(row));
+          HRegionInfo hri = location.getRegionInfo();
+          if (!hri.containsRange(first, last)) {
+            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+                "region. Splitting...");
+            
+            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); 
+            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
+            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
+            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
+                botOut, topOut);
+
+            // Add these back at the *front* of the queue, so there's a lower
+            // chance that the region will just split again before we get there.
+            queue.addFirst(new LoadQueueItem(item.family, botOut));
+            queue.addFirst(new LoadQueueItem(item.family, topOut));
+            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+            return null;
+          }
+          
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+          return null;
+        }
+      });
+  }
+  
+  /**
+   * Split a storefile into a top and bottom half, maintaining
+   * the metadata, recreating bloom filters, etc.
+   */
+  static void splitStoreFile(
+      Configuration conf, Path inFile,
+      HColumnDescriptor familyDesc, byte[] splitKey,
+      Path bottomOut, Path topOut) throws IOException
+  {
+    // Open reader with no block cache, and not in-memory
+    Reference topReference = new Reference(splitKey, Range.top);
+    Reference bottomReference = new Reference(splitKey, Range.bottom);
+    
+    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+  }
+  
+  /**
+   * Copy half of an HFile into a new HFile.
+   */
+  private static void copyHFileHalf(
+      Configuration conf, Path inFile, Path outFile, Reference reference,
+      HColumnDescriptor familyDescriptor)
+  throws IOException {
+    FileSystem fs = inFile.getFileSystem(conf);
+    HalfStoreFileReader halfReader = null;
+    HFile.Writer halfWriter = null;
+    try {
+      halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
+      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
+      
+      int blocksize = familyDescriptor.getBlocksize();
+      Algorithm compression = familyDescriptor.getCompression();
+      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+      
+      halfWriter = new StoreFile.Writer(
+          fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
+          bloomFilterType, 0);
+      HFileScanner scanner = halfReader.getScanner(false, false);
+      scanner.seekTo();
+      do {
+        KeyValue kv = scanner.getKeyValue();
+        halfWriter.append(kv);
+      } while (scanner.next());
+      
+      for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
+        if (shouldCopyHFileMetaKey(entry.getKey())) {
+          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+        }
+      }
+    } finally {
+      if (halfWriter != null) halfWriter.close();
+      if (halfReader != null) halfReader.close();
+    }    
+  }
+  
+  private static boolean shouldCopyHFileMetaKey(byte[] key) {
+    return !HFile.isReservedFileInfoKey(key);
+  }
+
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      usage();
+      return -1;
+    }
+    
+    Path hfofDir = new Path(args[0]);
+    HTable table = new HTable(args[1]);
+    
+    doBulkLoad(hfofDir, table);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new LoadIncrementalHFiles(), args);
+  }
+  
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted Puts.
+ * Reads in all Puts from passed Iterator, sorts them, then emits
+ * Puts in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ */
+public class PutSortReducer extends
+    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
+  
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable row,
+      java.lang.Iterable<Put> puts,
+      Reducer<ImmutableBytesWritable, Put,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+  
+    for (Put p : puts) {
+      for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+        for (KeyValue kv : kvs) {
+          map.add(kv.clone());
+        }
+      }
+    }
+    context.setStatus("Read " + map.getClass());
+    int index = 0;
+    for (KeyValue kv : map) {
+      context.write(row, kv);
+      if (index > 0 && index % 100 == 0)
+        context.setStatus("Wrote " + index);
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java Wed Jun  2 00:40:48 2010
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Partitioner;
 
@@ -44,15 +45,55 @@ import org.apache.hadoop.mapreduce.Parti
  */
 public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
 implements Configurable {
-  private final Log LOG = LogFactory.getLog(this.getClass());
+  private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
+
+  @Deprecated
   public static final String START = "hbase.simpletotalorder.start";
+  @Deprecated
   public static final String END = "hbase.simpletotalorder.end";
+  
+  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
+  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
+  
   private Configuration c;
   private byte [] startkey;
   private byte [] endkey;
   private byte [][] splits;
   private int lastReduces = -1;
 
+  public static void setStartKey(Configuration conf, byte[] startKey) {
+    conf.set(START_BASE64, Base64.encodeBytes(startKey));
+  }
+  
+  public static void setEndKey(Configuration conf, byte[] endKey) {
+    conf.set(END_BASE64, Base64.encodeBytes(endKey));
+  }
+  
+  @SuppressWarnings("deprecation")
+  static byte[] getStartKey(Configuration conf) {
+    return getKeyFromConf(conf, START_BASE64, START);
+  }
+  
+  @SuppressWarnings("deprecation")
+  static byte[] getEndKey(Configuration conf) {
+    return getKeyFromConf(conf, END_BASE64, END);
+  }
+  
+  private static byte[] getKeyFromConf(Configuration conf,
+      String base64Key, String deprecatedKey) {
+    String encoded = conf.get(base64Key);
+    if (encoded != null) {
+      return Base64.decode(encoded);
+    }
+    String oldStyleVal = conf.get(deprecatedKey);
+    if (oldStyleVal == null) {
+      return null;
+    }
+    LOG.warn("Using deprecated configuration " + deprecatedKey +
+        " - please use static accessor methods instead.");
+    return Bytes.toBytes(oldStyleVal);
+  }
+  
   @Override
   public int getPartition(final ImmutableBytesWritable key, final VALUE value,
       final int reduces) {
@@ -87,10 +128,12 @@ implements Configurable {
   @Override
   public void setConf(Configuration conf) {
     this.c = conf;
-    String startStr = this.c.get(START);
-    String endStr = this.c.get(END);
-    LOG.info("startkey=" + startStr + ", endkey=" + endStr);
-    this.startkey = Bytes.toBytes(startStr);
-    this.endkey = Bytes.toBytes(endStr);
+    this.startkey = getStartKey(conf);
+    this.endkey = getEndKey(conf);
+    if (startkey == null || endkey == null) {
+      throw new RuntimeException(this.getClass() + " not configured");
+    }
+    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
+        ", endkey=" + Bytes.toStringBinary(endkey));
   }
 }
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Wed Jun  2 00:40:48 2010
@@ -49,6 +49,8 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.ZooKeeper;
 
+import com.google.common.base.Function;
+
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
@@ -245,6 +247,7 @@ public class TableMapReduceUtil {
     try {
       addDependencyJars(job.getConfiguration(),
           ZooKeeper.class,
+          Function.class, // Guava collections
           job.getMapOutputKeyClass(),
           job.getMapOutputValueClass(),
           job.getOutputKeyClass(),

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link TotalOrderPartitioner}.
+ *
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774, with the exception of replacing
+ * TaskAttemptContextImpl with TaskAttemptContext.
+ */
+public class InputSampler<K,V> extends Configured implements Tool  {
+
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+  static int printUsage() {
+    System.out.println("sampler -r <reduces>\n" +
+      "      [-inFormat <input format class>]\n" +
+      "      [-keyClass <map input & output key class>]\n" +
+      "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+      "// Sample from random splits at random (general)\n" +
+      "       -splitSample <numSamples> <maxsplits> | " +
+      "             // Sample from first records in splits (random data)\n"+
+      "       -splitInterval <double pcnt> <maxsplits>]" +
+      "             // Sample from splits at intervals (sorted data)");
+    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public InputSampler(Configuration conf) {
+    setConf(conf);
+  }
+
+  /**
+   * Interface to sample using an 
+   * {@link org.apache.hadoop.mapreduce.InputFormat}.
+   */
+  public interface Sampler<K,V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, Job job) 
+    throws IOException, InterruptedException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> implements Sampler<K,V> {
+
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep), 
+          new TaskAttemptContext(job.getConfiguration(), 
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          samples.add(reader.getCurrentKey());
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> implements Sampler<K,V> {
+    private double freq;
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      this.freq = freq;
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.size(); ++i) {
+        InputSplit tmp = splits.get(i);
+        int j = r.nextInt(splits.size());
+        splits.set(i, splits.get(j));
+        splits.set(j, tmp);
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.size() && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), 
+          new TaskAttemptContext(job.getConfiguration(), 
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(reader.getCurrentKey());
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, reader.getCurrentKey());
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> implements Sampler<K,V> {
+    private final double freq;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      this.freq = freq;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep),
+          new TaskAttemptContext(job.getConfiguration(), 
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(reader.getCurrentKey());
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Write a partition file for the given job, using the Sampler provided.
+   * Queries the sampler for a sample keyset, sorts by the output key
+   * comparator, selects the keys for each rank, and writes to the destination
+   * returned from {@link TotalOrderPartitioner#getPartitionFile}.
+   */
+  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
+  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
+      throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    final InputFormat inf = 
+        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
+    int numPartitions = job.getNumReduceTasks();
+    K[] samples = sampler.getSample(inf, job);
+    LOG.info("Using " + samples.length + " samples");
+    RawComparator<K> comparator =
+      (RawComparator<K>) job.getSortComparator();
+    Arrays.sort(samples, comparator);
+    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+    FileSystem fs = dst.getFileSystem(conf);
+    if (fs.exists(dst)) {
+      fs.delete(dst, false);
+    }
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
+      conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
+    NullWritable nullValue = NullWritable.get();
+    float stepSize = samples.length / (float) numPartitions;
+    int last = -1;
+    for(int i = 1; i < numPartitions; ++i) {
+      int k = Math.round(stepSize * i);
+      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
+        ++k;
+      }
+      writer.append(samples[k], nullValue);
+      last = k;
+    }
+    writer.close();
+  }
+
+  /**
+   * Driver for InputSampler from the command line.
+   * Configures a JobConf instance and calls {@link #writePartitionFile}.
+   */
+  public int run(String[] args) throws Exception {
+    Job job = new Job(getConf());
+    ArrayList<String> otherArgs = new ArrayList<String>();
+    Sampler<K,V> sampler = null;
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          job.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else if ("-inFormat".equals(args[i])) {
+          job.setInputFormatClass(
+              Class.forName(args[++i]).asSubclass(InputFormat.class));
+        } else if ("-keyClass".equals(args[i])) {
+          job.setMapOutputKeyClass(
+              Class.forName(args[++i]).asSubclass(WritableComparable.class));
+        } else if ("-splitSample".equals(args[i])) {
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
+        } else if ("-splitRandom".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
+        } else if ("-splitInterval".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage();
+      }
+    }
+    if (job.getNumReduceTasks() <= 1) {
+      System.err.println("Sampler requires more than one reducer");
+      return printUsage();
+    }
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+    if (null == sampler) {
+      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
+    }
+
+    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
+    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
+    for (String s : otherArgs) {
+      FileInputFormat.addInputPath(job, new Path(s));
+    }
+    InputSampler.<K,V>writePartitionFile(job, sampler);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    InputSampler<?,?> sampler = new InputSampler(new Configuration());
+    int res = ToolRunner.run(sampler, args);
+    System.exit(res);
+  }
+}



Mime
View raw message