hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1176171 [2/9] - in /hbase/branches/0.89: ./ bin/ src/ src/assembly/ src/docs/src/documentation/content/xdocs/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter...
Date Tue, 27 Sep 2011 02:41:20 GMT
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Sep 27 02:41:16 2011
@@ -65,9 +65,9 @@ import org.apache.hadoop.io.compress.Dec
  * File format for hbase.
  * A file of sorted key/value pairs. Both keys and values are byte arrays.
  * <p>
- * The memory footprint of a HFile includes the following (below is taken from
+ * The memory footprint of a HFile includes the following (below is taken from the
  * <a
- * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315 tfile</a>
+ * href=https://issues.apache.org/jira/browse/HADOOP-3315>TFile</a> documentation
  * but applies also to HFile):
  * <ul>
  * <li>Some constant overhead of reading or writing a compressed block.
@@ -107,7 +107,7 @@ import org.apache.hadoop.io.compress.Dec
  * </ul>
  *
  * For more on the background behind HFile, see <a
- * href=https://issues.apache.org/jira/browse/HBASE-3315>HBASE-61</a>.
+ * href=https://issues.apache.org/jira/browse/HBASE-61>HBASE-61</a>.
  * <p>
  * File is made of data blocks followed by meta data blocks (if any), a fileinfo
  * block, data block index, meta data block index, and a fixed size trailer
@@ -462,7 +462,7 @@ public class HFile {
         throw new NullPointerException("Key nor value may be null");
       }
       if (checkPrefix &&
-          Bytes.toString(k).toLowerCase().startsWith(FileInfo.RESERVED_PREFIX)) {
+          Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
         throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
           " are reserved");
       }
@@ -569,8 +569,8 @@ public class HFile {
             this.lastKeyLength, key, offset, length);
         if (keyComp > 0) {
           throw new IOException("Added a key not lexically larger than" +
-            " previous key=" + Bytes.toString(key, offset, length) +
-            ", lastkey=" + Bytes.toString(this.lastKeyBuffer, this.lastKeyOffset,
+            " previous key=" + Bytes.toStringBinary(key, offset, length) +
+            ", lastkey=" + Bytes.toStringBinary(this.lastKeyBuffer, this.lastKeyOffset,
                 this.lastKeyLength));
         } else if (keyComp == 0) {
           dupKey = true;
@@ -800,7 +800,7 @@ public class HFile {
      * See {@link Writer#appendFileInfo(byte[], byte[])}.
      * @throws IOException
      */
-    public Map<byte [], byte []> loadFileInfo() 
+    public Map<byte [], byte []> loadFileInfo()
     throws IOException {
       this.trailer = readTrailer();
 
@@ -895,7 +895,7 @@ public class HFile {
      * @return Block wrapped in a ByteBuffer
      * @throws IOException
      */
-    public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) 
+    public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
     throws IOException {
       if (trailer.metaIndexCount == 0) {
         return null; // there are no meta blocks
@@ -903,7 +903,7 @@ public class HFile {
       if (metaIndex == null) {
         throw new IOException("Meta index not loaded");
       }
-      
+
       byte [] mbname = Bytes.toBytes(metaBlockName);
       int block = metaIndex.blockContainingKey(mbname, 0, mbname.length);
       if (block == -1)
@@ -924,34 +924,34 @@ public class HFile {
         if (cache != null) {
           ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block);
           if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block, 
+            // Return a distinct 'shallow copy' of the block,
             // so pos doesnt get messed by the scanner
             cacheHits++;
             return cachedBuf.duplicate();
           }
           // Cache Miss, please load.
         }
-        
+
         ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
           longToInt(blockSize), metaIndex.blockDataSizes[block], true);
         byte [] magic = new byte[METABLOCKMAGIC.length];
         buf.get(magic, 0, magic.length);
-  
+
         if (! Arrays.equals(magic, METABLOCKMAGIC)) {
           throw new IOException("Meta magic is bad in block " + block);
         }
-        
+
         // Create a new ByteBuffer 'shallow copy' to hide the magic header
         buf = buf.slice();
-  
+
         readTime += System.currentTimeMillis() - now;
         readOps++;
-  
+
         // Cache the block
         if(cacheBlock && cache != null) {
           cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory);
         }
-  
+
         return buf;
       }
     }
@@ -983,7 +983,7 @@ public class HFile {
         if (cache != null) {
           ByteBuffer cachedBuf = cache.getBlock(name + block);
           if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block, 
+            // Return a distinct 'shallow copy' of the block,
             // so pos doesnt get messed by the scanner
             cacheHits++;
             return cachedBuf.duplicate();
@@ -1015,10 +1015,10 @@ public class HFile {
         }
 
         // 'shallow copy' to hide the header
-        // NOTE: you WILL GET BIT if you call buf.array() but don't start 
+        // NOTE: you WILL GET BIT if you call buf.array() but don't start
         //       reading at buf.arrayOffset()
         buf = buf.slice();
-        
+
         readTime += System.currentTimeMillis() - now;
         readOps++;
 
@@ -1069,6 +1069,8 @@ public class HFile {
 
     /**
      * @return First key in the file.  May be null if file has no entries.
+     * Note that this is not the first rowkey, but rather the byte form of
+     * the first KeyValue.
      */
     public byte [] getFirstKey() {
       if (blockIndex == null) {
@@ -1078,6 +1080,17 @@ public class HFile {
     }
 
     /**
+     * @return the first row key, or null if the file is empty.
+     * TODO move this to StoreFile after Ryan's patch goes in
+     * to eliminate KeyValue here
+     */
+    public byte[] getFirstRowKey() {
+      byte[] firstKey = getFirstKey();
+      if (firstKey == null) return null;
+      return KeyValue.createKeyValueFromKey(firstKey).getRow();
+    }
+
+    /**
      * @return number of KV entries in this HFile
      */
     public int getEntries() {
@@ -1089,6 +1102,8 @@ public class HFile {
 
     /**
      * @return Last key in the file.  May be null if file has no entries.
+     * Note that this is not the last rowkey, but rather the byte form of
+     * the last KeyValue.
      */
     public byte [] getLastKey() {
       if (!isFileInfoLoaded()) {
@@ -1096,6 +1111,17 @@ public class HFile {
       }
       return this.blockIndex.isEmpty()? null: this.lastkey;
     }
+
+    /**
+     * @return the last row key, or null if the file is empty.
+     * TODO move this to StoreFile after Ryan's patch goes in
+     * to eliminate KeyValue here
+     */
+    public byte[] getLastRowKey() {
+      byte[] lastKey = getLastKey();
+      if (lastKey == null) return null;
+      return KeyValue.createKeyValueFromKey(lastKey).getRow();
+    }
     
     /**
      * @return number of K entries in this HFile's filter.  Returns KV count if no filter.
@@ -1222,7 +1248,7 @@ public class HFile {
         return true;
       }
 
-      public boolean shouldSeek(final byte[] row, 
+      public boolean shouldSeek(final byte[] row,
           final SortedSet<byte[]> columns) {
         return true;
       }
@@ -1368,6 +1394,11 @@ public class HFile {
           }
         }
       }
+
+      @Override
+      public String toString() {
+        return "HFileScanner for reader " + String.valueOf(reader);
+      }
     }
 
     public String getTrailerInfo() {
@@ -1620,7 +1651,7 @@ public class HFile {
       sb.append("size=" + count);
       for (int i = 0; i < count ; i++) {
         sb.append(", ");
-        sb.append("key=").append(Bytes.toString(blockKeys[i])).
+        sb.append("key=").append(Bytes.toStringBinary(blockKeys[i])).
           append(", offset=").append(blockOffsets[i]).
           append(", dataSize=" + blockDataSizes[i]);
       }
@@ -1659,6 +1690,7 @@ public class HFile {
    */
   static class FileInfo extends HbaseMapWritable<byte [], byte []> {
     static final String RESERVED_PREFIX = "hfile.";
+    static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
     static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
     static final byte [] AVG_KEY_LEN =
       Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
@@ -1676,6 +1708,15 @@ public class HFile {
   }
 
   /**
+   * Return true if the given file info key is reserved for internal
+   * use by HFile.
+   */
+  public static boolean isReservedFileInfoKey(byte[] key) {
+    return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
+  }
+
+
+  /**
    * Get names of supported compression algorithms. The names are acceptable by
    * HFile.Writer.
    *
@@ -1768,8 +1809,8 @@ public class HFile {
         byte[][] hri = HRegionInfo.parseRegionName(rn);
         Path rootDir = FSUtils.getRootDir(conf);
         Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
-        int enc = HRegionInfo.encodeRegionName(rn);
-        Path regionDir = new Path(tableDir, Integer.toString(enc));
+        String enc = HRegionInfo.encodeRegionName(rn);
+        Path regionDir = new Path(tableDir, enc);
         if (verbose) System.out.println("region dir -> " + regionDir);
         List<Path> regionFiles = getStoreFiles(fs, regionDir);
         if (verbose) System.out.println("Number of region files found -> " +
@@ -1856,5 +1897,4 @@ public class HFile {
       e.printStackTrace();
     }
   }
-
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Sep 27 02:41:16 2011
@@ -152,6 +152,26 @@ public interface HRegionInterface extend
       final Put put)
   throws IOException;
 
+
+  /**
+   * Atomically checks if a row/family/qualifier value match the expectedValue.
+   * If it does, it adds the delete. If passed expected value is null, then the
+   * check is for non-existance of the row/column.
+   *
+   * @param regionName region name
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @throws IOException e
+   * @return true if the new delete was execute, false otherwise
+   */
+  public boolean checkAndDelete(final byte[] regionName, final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Delete delete)
+  throws IOException;
+
   /**
    * Atomically increments a column value. If the column value isn't long-like,
    * this could throw an exception. If passed expected value is null, then the
@@ -259,4 +279,9 @@ public interface HRegionInterface extend
    */
   public MultiPutResponse multiPut(MultiPut puts) throws IOException;
 
+  /**
+   * Bulk load an HFile into an open region
+   */
+  public void bulkLoadHFile(String hfilePath,
+      byte[] regionName, byte[] familyName) throws IOException;
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Tue Sep 27 02:41:16 2011
@@ -36,6 +36,9 @@ public class Driver {
       "Count rows in HBase table");
     pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
     pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+    pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
+    pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
+                 "Complete a bulk data load.");
     pgd.addClass(CopyTable.NAME, CopyTable.class,
         "Export a table from local cluster to peer cluster");
     pgd.driver(args);

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Sep 27 02:41:16 2011
@@ -20,24 +20,40 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.mortbay.log.Log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Writes HFiles. Passed KeyValues must arrive in order.
@@ -48,7 +64,9 @@ import org.mortbay.log.Log;
  * @see KeyValueSortReducer
  */
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
-  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
+  static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+
+  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
     // Get the path of the temporary output file
     final Path outputPath = FileOutputFormat.getOutputPath(context);
@@ -67,6 +85,7 @@ public class HFileOutputFormat extends F
       private final Map<byte [], WriterLength> writers =
         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
 
       public void write(ImmutableBytesWritable row, KeyValue kv)
       throws IOException {
@@ -86,10 +105,11 @@ public class HFileOutputFormat extends F
             if (!fs.exists(basedir)) fs.mkdirs(basedir);
           }
           wl.writer = getNewWriter(wl.writer, basedir);
-          Log.info("Writer=" + wl.writer.getPath() +
+          LOG.info("Writer=" + wl.writer.getPath() +
             ((wl.written == 0)? "": ", wrote=" + wl.written));
           wl.written = 0;
         }
+        kv.updateLatestStamp(this.now);
         wl.writer.append(kv);
         wl.written += length;
         // Copy the row so we know when a row transition.
@@ -112,8 +132,10 @@ public class HFileOutputFormat extends F
 
       private void close(final HFile.Writer w) throws IOException {
         if (w != null) {
-          w.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, 
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+              Bytes.toBytes(context.getTaskAttemptID().toString()));
           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, 
               Bytes.toBytes(true));
           w.close();
@@ -136,4 +158,116 @@ public class HFileOutputFormat extends F
     long written = 0;
     HFile.Writer writer = null;
   }
+
+  /**
+   * Return the start keys of all of the regions in this table,
+   * as a list of ImmutableBytesWritable.
+   */
+  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
+  throws IOException {
+    byte[][] byteKeys = table.getStartKeys();
+    ArrayList<ImmutableBytesWritable> ret =
+      new ArrayList<ImmutableBytesWritable>(byteKeys.length);
+    for (byte[] byteKey : byteKeys) {
+      ret.add(new ImmutableBytesWritable(byteKey));
+    }
+    return ret;
+  }
+
+  /**
+   * Write out a SequenceFile that can be read by TotalOrderPartitioner
+   * that contains the split points in startKeys.
+   * @param partitionsPath output path for SequenceFile
+   * @param startKeys the region start keys
+   */
+  private static void writePartitions(Configuration conf, Path partitionsPath,
+      List<ImmutableBytesWritable> startKeys) throws IOException {
+    Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed");
+
+    // We're generating a list of split points, and we don't ever
+    // have keys < the first region (which has an empty start key)
+    // so we need to remove it. Otherwise we would end up with an
+    // empty reducer with index 0
+    TreeSet<ImmutableBytesWritable> sorted =
+      new TreeSet<ImmutableBytesWritable>(startKeys);
+
+    ImmutableBytesWritable first = sorted.first();
+    Preconditions.checkArgument(
+        first.equals(HConstants.EMPTY_BYTE_ARRAY),
+        "First region of table should have empty start key. Instead has: %s",
+        Bytes.toStringBinary(first.get()));
+    sorted.remove(first);
+
+    // Write the actual file
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
+
+    try {
+      for (ImmutableBytesWritable startKey : sorted) {
+        writer.append(startKey, NullWritable.get());
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Configure a MapReduce Job to perform an incremental load into the given
+   * table. This
+   * <ul>
+   *   <li>Inspects the table to configure a total order partitioner</li>
+   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
+   *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
+   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+   *     PutSortReducer)</li>
+   * </ul>
+   * The user should be sure to set the map output value class to either KeyValue or Put before
+   * running this function.
+   */
+  public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat.class);
+
+    // Based on the configured map output class, set the correct reducer to properly
+    // sort the incoming values.
+    // TODO it would be nice to pick one or the other of these formats.
+    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(KeyValueSortReducer.class);
+    } else if (Put.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(PutSortReducer.class);
+    } else {
+      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+    }
+
+    LOG.info("Looking up current regions for table " + table);
+    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
+    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+        "to match current region count");
+    job.setNumReduceTasks(startKeys.size());
+
+    Path partitionsPath = new Path(job.getWorkingDirectory(),
+        "partitions_" + System.currentTimeMillis());
+    LOG.info("Writing partition information to " + partitionsPath);
+
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    writePartitions(conf, partitionsPath, startKeys);
+    partitionsPath.makeQualified(fs);
+    URI cacheUri;
+    try {
+      cacheUri = new URI(partitionsPath.toString() + "#" +
+          TotalOrderPartitioner.DEFAULT_PATH);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    DistributedCache.addCacheFile(cacheUri, conf);
+    DistributedCache.createSymlink(conf);
+
+    LOG.info("Incremental table output configured.");
+  }
+
 }

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1176171&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Tue Sep 27 02:41:16 2011
@@ -0,0 +1,360 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * Tool to import data from a TSV file.
+ *
+ * This tool is rather simplistic - it doesn't do any quoting or
+ * escaping, but is useful for many data loads.
+ *
+ * @see ImportTsv#usage(String)
+ */
+public class ImportTsv {
+  final static String NAME = "importtsv";
+
+  final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
+  final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
+  final static String COLUMNS_CONF_KEY = "importtsv.columns";
+  final static String SEPARATOR_CONF_KEY = "importtsv.separator";
+  final static String DEFAULT_SEPARATOR = "\t";
+
+  static class TsvParser {
+    /**
+     * Column families and qualifiers mapped to the TSV columns
+     */
+    private final byte[][] families;
+    private final byte[][] qualifiers;
+
+    private final byte separatorByte;
+
+    private int rowKeyColumnIndex;
+
+    public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
+
+    /**
+     * @param columnsSpecification the list of columns to parser out, comma separated.
+     * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+     */
+    public TsvParser(String columnsSpecification, String separatorStr) {
+      // Configure separator
+      byte[] separator = Bytes.toBytes(separatorStr);
+      Preconditions.checkArgument(separator.length == 1,
+        "TsvParser only supports single-byte separators");
+      separatorByte = separator[0];
+
+      // Configure columns
+      ArrayList<String> columnStrings = Lists.newArrayList(
+        Splitter.on(',').trimResults().split(columnsSpecification));
+
+      families = new byte[columnStrings.size()][];
+      qualifiers = new byte[columnStrings.size()][];
+
+      for (int i = 0; i < columnStrings.size(); i++) {
+        String str = columnStrings.get(i);
+        if (ROWKEY_COLUMN_SPEC.equals(str)) {
+          rowKeyColumnIndex = i;
+          continue;
+        }
+        String[] parts = str.split(":", 2);
+        if (parts.length == 1) {
+          families[i] = str.getBytes();
+          qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+        } else {
+          families[i] = parts[0].getBytes();
+          qualifiers[i] = parts[1].getBytes();
+        }
+      }
+    }
+
+    public int getRowKeyColumnIndex() {
+      return rowKeyColumnIndex;
+    }
+    public byte[] getFamily(int idx) {
+      return families[idx];
+    }
+    public byte[] getQualifier(int idx) {
+      return qualifiers[idx];
+    }
+
+    public ParsedLine parse(byte[] lineBytes, int length)
+    throws BadTsvLineException {
+      // Enumerate separator offsets
+      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
+      for (int i = 0; i < length; i++) {
+        if (lineBytes[i] == separatorByte) {
+          tabOffsets.add(i);
+        }
+      }
+      tabOffsets.add(length);
+      if (tabOffsets.size() > families.length) {
+        throw new BadTsvLineException("Bad line:\n");
+      }
+
+      return new ParsedLine(tabOffsets, lineBytes);
+    }
+
+    class ParsedLine {
+      private final ArrayList<Integer> tabOffsets;
+      private byte[] lineBytes;
+
+      ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
+        this.tabOffsets = tabOffsets;
+        this.lineBytes = lineBytes;
+      }
+
+      public int getRowKeyOffset() {
+        return getColumnOffset(rowKeyColumnIndex);
+      }
+      public int getRowKeyLength() {
+        return getColumnLength(rowKeyColumnIndex);
+      }
+      public int getColumnOffset(int idx) {
+        if (idx > 0)
+          return tabOffsets.get(idx - 1) + 1;
+        else
+          return 0;
+      }
+      public int getColumnLength(int idx) {
+        return tabOffsets.get(idx) - getColumnOffset(idx);
+      }
+      public int getColumnCount() {
+        return tabOffsets.size();
+      }
+      public byte[] getLineBytes() {
+        return lineBytes;
+      }
+    }
+
+    public static class BadTsvLineException extends Exception {
+      public BadTsvLineException(String err) {
+        super(err);
+      }
+      private static final long serialVersionUID = 1L;
+    }
+  }
+
+  /**
+   * Write table content out to files in hdfs.
+   */
+  static class TsvImporter
+  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
+  {
+
+    /** Timestamp for all inserted rows */
+    private long ts;
+
+    /** Should skip bad lines */
+    private boolean skipBadLines;
+    private Counter badLineCount;
+
+    private TsvParser parser;
+
+    @Override
+    protected void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
+                             conf.get(SEPARATOR_CONF_KEY, DEFAULT_SEPARATOR));
+      if (parser.getRowKeyColumnIndex() == -1) {
+        throw new RuntimeException("No row key column specified");
+      }
+      ts = System.currentTimeMillis();
+
+      skipBadLines = context.getConfiguration().getBoolean(
+        SKIP_LINES_CONF_KEY, true);
+      badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+    }
+
+    /**
+     * Convert a line of TSV text into an HBase table row.
+     */
+    @Override
+    public void map(LongWritable offset, Text value,
+      Context context)
+    throws IOException {
+      byte[] lineBytes = value.getBytes();
+
+      try {
+        TsvParser.ParsedLine parsed = parser.parse(
+            lineBytes, value.getLength());
+        ImmutableBytesWritable rowKey =
+          new ImmutableBytesWritable(lineBytes,
+              parsed.getRowKeyOffset(),
+              parsed.getRowKeyLength());
+
+        Put put = new Put(rowKey.copyBytes());
+        for (int i = 0; i < parsed.getColumnCount(); i++) {
+          if (i == parser.getRowKeyColumnIndex()) continue;
+          KeyValue kv = new KeyValue(
+              lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+              parser.getFamily(i), 0, parser.getFamily(i).length,
+              parser.getQualifier(i), 0, parser.getQualifier(i).length,
+              ts,
+              KeyValue.Type.Put,
+              lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+          put.add(kv);
+        }
+        context.write(rowKey, put);
+      } catch (BadTsvLineException badLine) {
+        if (skipBadLines) {
+          System.err.println(
+              "Bad line at offset: " + offset.get() + ":\n" +
+              badLine.getMessage());
+          badLineCount.increment(1);
+          return;
+        } else {
+          throw new IOException(badLine);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    String tableName = args[0];
+    Path inputDir = new Path(args[1]);
+    Job job = new Job(conf, NAME + "_" + tableName);
+    job.setJarByClass(TsvImporter.class);
+    FileInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(TsvImporter.class);
+
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      HTable table = new HTable(conf, tableName);
+      job.setReducerClass(PutSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(Put.class);
+      HFileOutputFormat.configureIncrementalLoad(job, table);
+    } else {
+      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // to set up the TableOutputFormat.
+      TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+      job.setNumReduceTasks(0);
+    }
+
+    TableMapReduceUtil.addDependencyJars(job);
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    String usage =
+      "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
+      "\n" +
+      "Imports the given input directory of TSV data into the specified table.\n" +
+      "\n" +
+      "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
+      "option. This option takes the form of comma-separated column names, where each\n" +
+      "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
+      "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
+      "as the row key for each imported record. You must specify exactly one column\n" +
+      "to be the row key.\n" +
+      "\n" +
+      "In order to prepare data for a bulk data load, pass the option:\n" +
+      "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
+      "\n" +
+      "Other options that may be specified with -D include:\n" +
+      "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
+      "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs";
+    System.err.println(usage);
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length < 2) {
+      usage("Wrong number of arguments: " + otherArgs.length);
+      System.exit(-1);
+    }
+
+    // Make sure columns are specified
+    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
+    if (columns == null) {
+      usage("No columns specified. Please specify with -D" +
+            COLUMNS_CONF_KEY+"=...");
+      System.exit(-1);
+    }
+
+    // Make sure they specify exactly one column as the row key
+    int rowkeysFound=0;
+    for (String col : columns) {
+      if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
+    }
+    if (rowkeysFound != 1) {
+      usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
+      System.exit(-1);
+    }
+
+    Job job = createSubmittableJob(conf, otherArgs);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1176171&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Tue Sep 27 02:41:16 2011
@@ -0,0 +1,321 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool to load the output of HFileOutputFormat into an existing table.
+ * @see usage()
+ */
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+  static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+  public static String NAME = "completebulkload";
+
+  public LoadIncrementalHFiles(Configuration conf) {
+    super(conf);
+  }
+
+  public LoadIncrementalHFiles() {
+    super();
+  }
+
+
+  private void usage() {
+    System.err.println("usage: " + NAME +
+        " /path/to/hfileoutputformat-output " +
+        "tablename");
+  }
+
+  /**
+   * Represents an HFile waiting to be loaded. An queue is used
+   * in this class in order to support the case where a region has
+   * split during the process of the load. When this happens,
+   * the HFile is split into two physical parts across the new
+   * region boundary, and each part is added back into the queue.
+   * The import process finishes when the queue is empty.
+   */
+  private static class LoadQueueItem {
+    final byte[] family;
+    final Path hfilePath;
+
+    public LoadQueueItem(byte[] family, Path hfilePath) {
+      this.family = family;
+      this.hfilePath = hfilePath;
+    }
+  }
+
+  /**
+   * Walk the given directory for all HFiles, and return a Queue
+   * containing all such files.
+   */
+  private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
+  throws IOException {
+    FileSystem fs = hfofDir.getFileSystem(getConf());
+
+    if (!fs.exists(hfofDir)) {
+      throw new FileNotFoundException("HFileOutputFormat dir " +
+          hfofDir + " not found");
+    }
+
+    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+    if (familyDirStatuses == null) {
+      throw new FileNotFoundException("No families found in " + hfofDir);
+    }
+
+    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
+    for (FileStatus stat : familyDirStatuses) {
+      if (!stat.isDir()) {
+        LOG.warn("Skipping non-directory " + stat.getPath());
+        continue;
+      }
+      Path familyDir = stat.getPath();
+      // Skip _logs, etc
+      if (familyDir.getName().startsWith("_")) continue;
+      byte[] family = familyDir.getName().getBytes();
+      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+      for (Path hfile : hfiles) {
+        if (hfile.getName().startsWith("_")) continue;
+        ret.add(new LoadQueueItem(family, hfile));
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.
+   * @param hfofDir the directory that was provided as the output path
+   * of a job using HFileOutputFormat
+   * @param table the table to load into
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Path hfofDir, HTable table)
+    throws TableNotFoundException, IOException
+  {
+    HConnection conn = table.getConnection();
+
+    if (!conn.isTableAvailable(table.getTableName())) {
+      throw new TableNotFoundException("Table " +
+          Bytes.toStringBinary(table.getTableName()) +
+          "is not currently available.");
+    }
+
+    Deque<LoadQueueItem> queue = null;
+    try {
+      queue = discoverLoadQueue(hfofDir);
+      while (!queue.isEmpty()) {
+        LoadQueueItem item = queue.remove();
+        tryLoad(item, conn, table.getTableName(), queue);
+      }
+    } finally {
+      if (queue != null && !queue.isEmpty()) {
+        StringBuilder err = new StringBuilder();
+        err.append("-------------------------------------------------\n");
+        err.append("Bulk load aborted with some files not yet loaded:\n");
+        err.append("-------------------------------------------------\n");
+        for (LoadQueueItem q : queue) {
+          err.append("  ").append(q.hfilePath).append('\n');
+        }
+        LOG.error(err);
+      }
+    }
+  }
+
+  /**
+   * Attempt to load the given load queue item into its target region server.
+   * If the hfile boundary no longer fits into a region, physically splits
+   * the hfile such that the new bottom half will fit, and adds the two
+   * resultant hfiles back into the load queue.
+   */
+  private void tryLoad(final LoadQueueItem item,
+      HConnection conn, final byte[] table,
+      final Deque<LoadQueueItem> queue)
+  throws IOException {
+    final Path hfilePath = item.hfilePath;
+    final FileSystem fs = hfilePath.getFileSystem(getConf());
+    HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
+    final byte[] first, last;
+    try {
+      hfr.loadFileInfo();
+      first = hfr.getFirstRowKey();
+      last = hfr.getLastRowKey();
+    }  finally {
+      hfr.close();
+    }
+
+    LOG.info("Trying to load hfile=" + hfilePath +
+        " first=" + Bytes.toStringBinary(first) +
+        " last="  + Bytes.toStringBinary(last));
+    if (first == null || last == null) {
+      assert first == null && last == null;
+      LOG.info("hfile " + hfilePath + " has no entries, skipping");
+      return;
+    }
+
+    // We use a '_' prefix which is ignored when walking directory trees
+    // above.
+    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+    conn.getRegionServerWithRetries(
+      new ServerCallable<Void>(conn, table, first) {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Going to connect to server " + location +
+              "for row " + Bytes.toStringBinary(row));
+          HRegionInfo hri = location.getRegionInfo();
+          if (!hri.containsRange(first, last)) {
+            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+                "region. Splitting...");
+
+            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
+            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
+            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
+            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
+                botOut, topOut);
+
+            // Add these back at the *front* of the queue, so there's a lower
+            // chance that the region will just split again before we get there.
+            queue.addFirst(new LoadQueueItem(item.family, botOut));
+            queue.addFirst(new LoadQueueItem(item.family, topOut));
+            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+            return null;
+          }
+
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+          return null;
+        }
+      });
+  }
+
+  /**
+   * Split a storefile into a top and bottom half, maintaining
+   * the metadata, recreating bloom filters, etc.
+   */
+  static void splitStoreFile(
+      Configuration conf, Path inFile,
+      HColumnDescriptor familyDesc, byte[] splitKey,
+      Path bottomOut, Path topOut) throws IOException
+  {
+    // Open reader with no block cache, and not in-memory
+    Reference topReference = new Reference(splitKey, Range.top);
+    Reference bottomReference = new Reference(splitKey, Range.bottom);
+
+    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+  }
+
+  /**
+   * Copy half of an HFile into a new HFile.
+   */
+  private static void copyHFileHalf(
+      Configuration conf, Path inFile, Path outFile, Reference reference,
+      HColumnDescriptor familyDescriptor)
+  throws IOException {
+    FileSystem fs = inFile.getFileSystem(conf);
+    HalfStoreFileReader halfReader = null;
+    HFile.Writer halfWriter = null;
+    try {
+      halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
+      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
+
+      int blocksize = familyDescriptor.getBlocksize();
+      Algorithm compression = familyDescriptor.getCompression();
+      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+
+      halfWriter = new StoreFile.Writer(
+          fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
+          bloomFilterType, 0);
+      HFileScanner scanner = halfReader.getScanner(false, false);
+      scanner.seekTo();
+      do {
+        KeyValue kv = scanner.getKeyValue();
+        halfWriter.append(kv);
+      } while (scanner.next());
+
+      for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
+        if (shouldCopyHFileMetaKey(entry.getKey())) {
+          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+        }
+      }
+    } finally {
+      if (halfWriter != null) halfWriter.close();
+      if (halfReader != null) halfReader.close();
+    }
+  }
+
+  private static boolean shouldCopyHFileMetaKey(byte[] key) {
+    return !HFile.isReservedFileInfoKey(key);
+  }
+
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      usage();
+      return -1;
+    }
+
+    Path hfofDir = new Path(args[0]);
+    HTable table = new HTable(args[1]);
+
+    doBulkLoad(hfofDir, table);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new LoadIncrementalHFiles(), args);
+  }
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1176171&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Tue Sep 27 02:41:16 2011
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted Puts.
+ * Reads in all Puts from passed Iterator, sorts them, then emits
+ * Puts in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ */
+public class PutSortReducer extends
+    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
+
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable row,
+      java.lang.Iterable<Put> puts,
+      Reducer<ImmutableBytesWritable, Put,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+
+    for (Put p : puts) {
+      for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+        for (KeyValue kv : kvs) {
+          map.add(kv.clone());
+        }
+      }
+    }
+    context.setStatus("Read " + map.getClass());
+    int index = 0;
+    for (KeyValue kv : map) {
+      context.write(row, kv);
+      if (index > 0 && index % 100 == 0)
+        context.setStatus("Wrote " + index);
+    }
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java Tue Sep 27 02:41:16 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Partitioner;
 
@@ -44,15 +45,55 @@ import org.apache.hadoop.mapreduce.Parti
  */
 public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
 implements Configurable {
-  private final Log LOG = LogFactory.getLog(this.getClass());
+  private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
+
+  @Deprecated
   public static final String START = "hbase.simpletotalorder.start";
+  @Deprecated
   public static final String END = "hbase.simpletotalorder.end";
+
+  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
+  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
+
   private Configuration c;
   private byte [] startkey;
   private byte [] endkey;
   private byte [][] splits;
   private int lastReduces = -1;
 
+  public static void setStartKey(Configuration conf, byte[] startKey) {
+    conf.set(START_BASE64, Base64.encodeBytes(startKey));
+  }
+
+  public static void setEndKey(Configuration conf, byte[] endKey) {
+    conf.set(END_BASE64, Base64.encodeBytes(endKey));
+  }
+
+  @SuppressWarnings("deprecation")
+  static byte[] getStartKey(Configuration conf) {
+    return getKeyFromConf(conf, START_BASE64, START);
+  }
+
+  @SuppressWarnings("deprecation")
+  static byte[] getEndKey(Configuration conf) {
+    return getKeyFromConf(conf, END_BASE64, END);
+  }
+
+  private static byte[] getKeyFromConf(Configuration conf,
+      String base64Key, String deprecatedKey) {
+    String encoded = conf.get(base64Key);
+    if (encoded != null) {
+      return Base64.decode(encoded);
+    }
+    String oldStyleVal = conf.get(deprecatedKey);
+    if (oldStyleVal == null) {
+      return null;
+    }
+    LOG.warn("Using deprecated configuration " + deprecatedKey +
+        " - please use static accessor methods instead.");
+    return Bytes.toBytes(oldStyleVal);
+  }
+
   @Override
   public int getPartition(final ImmutableBytesWritable key, final VALUE value,
       final int reduces) {
@@ -87,10 +128,12 @@ implements Configurable {
   @Override
   public void setConf(Configuration conf) {
     this.c = conf;
-    String startStr = this.c.get(START);
-    String endStr = this.c.get(END);
-    LOG.info("startkey=" + startStr + ", endkey=" + endStr);
-    this.startkey = Bytes.toBytes(startStr);
-    this.endkey = Bytes.toBytes(endStr);
+    this.startkey = getStartKey(conf);
+    this.endkey = getEndKey(conf);
+    if (startkey == null || endkey == null) {
+      throw new RuntimeException(this.getClass() + " not configured");
+    }
+    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
+        ", endkey=" + Bytes.toStringBinary(endkey));
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Tue Sep 27 02:41:16 2011
@@ -24,7 +24,18 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
@@ -34,13 +45,18 @@ import org.apache.hadoop.hbase.util.Base
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.google.common.base.Function;
 
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
 @SuppressWarnings("unchecked")
 public class TableMapReduceUtil {
+  static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
 
   /**
    * Use this before submitting a TableMap job. It will appropriately set up
@@ -222,4 +238,106 @@ public class TableMapReduceUtil {
     job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
   }
 
+  /**
+   * Add the HBase dependency jars as well as jars for any of the configured
+   * job classes to the job configuration, so that JobClient will ship them
+   * to the cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    try {
+      addDependencyJars(job.getConfiguration(),
+          ZooKeeper.class,
+          Function.class, // Guava collections
+          job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(),
+          job.getOutputKeyClass(),
+          job.getOutputValueClass(),
+          job.getOutputFormatClass(),
+          job.getPartitionerClass(),
+          job.getCombinerClass());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration
+   * such that JobClient will ship them to the cluster and add them to
+   * the DistributedCache.
+   */
+  public static void addDependencyJars(Configuration conf,
+      Class... classes) throws IOException {
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    Set<String> jars = new HashSet<String>();
+    for (Class clazz : classes) {
+      if (clazz == null) continue;
+
+      String pathStr = findContainingJar(clazz);
+      if (pathStr == null) {
+        LOG.warn("Could not find jar for class " + clazz +
+            " in order to ship it to the cluster.");
+        continue;
+      }
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class "
+            + clazz);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs).toString());
+    }
+    if (jars.isEmpty()) return;
+
+    String tmpJars = conf.get("tmpjars");
+    if (tmpJars == null) {
+      tmpJars = StringUtils.arrayToString(jars.toArray(new String[0]));
+    } else {
+      tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0]));
+    }
+    conf.set("tmpjars", tmpJars);
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any.
+   * It will return a jar file, even if that is not the first thing
+   * on the class path that has a class with the same name.
+   *
+   * This is shamelessly copied from JobConf
+   *
+   * @param my_class the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class my_class) {
+    ClassLoader loader = my_class.getClassLoader();
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for(Enumeration itr = loader.getResources(class_file);
+          itr.hasMoreElements();) {
+        URL url = (URL) itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+
+
 }
\ No newline at end of file

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java?rev=1176171&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java Tue Sep 27 02:41:16 2011
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link TotalOrderPartitioner}.
+ *
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774, with the exception of replacing
+ * TaskAttemptContextImpl with TaskAttemptContext.
+ */
+public class InputSampler<K,V> extends Configured implements Tool  {
+
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+  static int printUsage() {
+    System.out.println("sampler -r <reduces>\n" +
+      "      [-inFormat <input format class>]\n" +
+      "      [-keyClass <map input & output key class>]\n" +
+      "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+      "// Sample from random splits at random (general)\n" +
+      "       -splitSample <numSamples> <maxsplits> | " +
+      "             // Sample from first records in splits (random data)\n"+
+      "       -splitInterval <double pcnt> <maxsplits>]" +
+      "             // Sample from splits at intervals (sorted data)");
+    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public InputSampler(Configuration conf) {
+    setConf(conf);
+  }
+
+  /**
+   * Interface to sample using an
+   * {@link org.apache.hadoop.mapreduce.InputFormat}.
+   */
+  public interface Sampler<K,V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, Job job)
+    throws IOException, InterruptedException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> implements Sampler<K,V> {
+
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job)
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep),
+          new TaskAttemptContext(job.getConfiguration(),
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          samples.add(reader.getCurrentKey());
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> implements Sampler<K,V> {
+    private double freq;
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      this.freq = freq;
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job)
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.size(); ++i) {
+        InputSplit tmp = splits.get(i);
+        int j = r.nextInt(splits.size());
+        splits.set(i, splits.get(j));
+        splits.set(j, tmp);
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.size() && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(splits.get(i),
+          new TaskAttemptContext(job.getConfiguration(),
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(reader.getCurrentKey());
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, reader.getCurrentKey());
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> implements Sampler<K,V> {
+    private final double freq;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      this.freq = freq;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job)
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep),
+          new TaskAttemptContext(job.getConfiguration(),
+                                 new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(reader.getCurrentKey());
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Write a partition file for the given job, using the Sampler provided.
+   * Queries the sampler for a sample keyset, sorts by the output key
+   * comparator, selects the keys for each rank, and writes to the destination
+   * returned from {@link TotalOrderPartitioner#getPartitionFile}.
+   */
+  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
+  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    final InputFormat inf =
+        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
+    int numPartitions = job.getNumReduceTasks();
+    K[] samples = sampler.getSample(inf, job);
+    LOG.info("Using " + samples.length + " samples");
+    RawComparator<K> comparator =
+      (RawComparator<K>) job.getSortComparator();
+    Arrays.sort(samples, comparator);
+    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+    FileSystem fs = dst.getFileSystem(conf);
+    if (fs.exists(dst)) {
+      fs.delete(dst, false);
+    }
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+      conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
+    NullWritable nullValue = NullWritable.get();
+    float stepSize = samples.length / (float) numPartitions;
+    int last = -1;
+    for(int i = 1; i < numPartitions; ++i) {
+      int k = Math.round(stepSize * i);
+      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
+        ++k;
+      }
+      writer.append(samples[k], nullValue);
+      last = k;
+    }
+    writer.close();
+  }
+
+  /**
+   * Driver for InputSampler from the command line.
+   * Configures a JobConf instance and calls {@link #writePartitionFile}.
+   */
+  public int run(String[] args) throws Exception {
+    Job job = new Job(getConf());
+    ArrayList<String> otherArgs = new ArrayList<String>();
+    Sampler<K,V> sampler = null;
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          job.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else if ("-inFormat".equals(args[i])) {
+          job.setInputFormatClass(
+              Class.forName(args[++i]).asSubclass(InputFormat.class));
+        } else if ("-keyClass".equals(args[i])) {
+          job.setMapOutputKeyClass(
+              Class.forName(args[++i]).asSubclass(WritableComparable.class));
+        } else if ("-splitSample".equals(args[i])) {
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
+        } else if ("-splitRandom".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
+        } else if ("-splitInterval".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage();
+      }
+    }
+    if (job.getNumReduceTasks() <= 1) {
+      System.err.println("Sampler requires more than one reducer");
+      return printUsage();
+    }
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+    if (null == sampler) {
+      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
+    }
+
+    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
+    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
+    for (String s : otherArgs) {
+      FileInputFormat.addInputPath(job, new Path(s));
+    }
+    InputSampler.<K,V>writePartitionFile(job, sampler);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    InputSampler<?,?> sampler = new InputSampler(new Configuration());
+    int res = ToolRunner.run(sampler, args);
+    System.exit(res);
+  }
+}



Mime
View raw message