hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r950321 [2/3] - in /hbase/trunk: ./ src/docs/src/documentation/content/xdocs/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/h...
Date Wed, 02 Jun 2010 00:40:49 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Partitioner effecting a total order by reading split points from
+ * an externally generated source.
+ * 
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774.
+ */
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+    extends Partitioner<K,V> implements Configurable {
+
+  private Node partitions;
+  public static final String DEFAULT_PATH = "_partition.lst";
+  public static final String PARTITIONER_PATH = 
+    "mapreduce.totalorderpartitioner.path";
+  public static final String MAX_TRIE_DEPTH = 
+    "mapreduce.totalorderpartitioner.trie.maxdepth"; 
+  public static final String NATURAL_ORDER = 
+    "mapreduce.totalorderpartitioner.naturalorder";
+  Configuration conf;
+
+  public TotalOrderPartitioner() { }
+
+  /**
+   * Read in the partition file and build indexing data structures.
+   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
+   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
+   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
+   * will be built. Otherwise, keys will be located using a binary search of
+   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
+   * defined for this job. The input file must be sorted with the same
+   * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
+   */
+  @SuppressWarnings("unchecked") // keytype from conf not static
+  public void setConf(Configuration conf) {
+    try {
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
+
+      Job job = new Job(conf);
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
+      if (splitPoints.length != job.getNumReduceTasks() - 1) {
+        throw new IOException("Wrong number of partitions in keyset:"
+            + splitPoints.length);
+      }
+      RawComparator<K> comparator =
+        (RawComparator<K>) job.getSortComparator();
+      for (int i = 0; i < splitPoints.length - 1; ++i) {
+        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
+          throw new IOException("Split points are out of order");
+        }
+      }
+      boolean natOrder =
+        conf.getBoolean(NATURAL_ORDER, true);
+      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
+        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
+            splitPoints.length, new byte[0],
+            // Now that blocks of identical splitless trie nodes are 
+            // represented reentrantly, and we develop a leaf for any trie
+            // node with only one split point, the only reason for a depth
+            // limit is to refute stack overflow or bloat in the pathological
+            // case where the split points are long and mostly look like bytes 
+            // iii...iixii...iii   .  Therefore, we make the default depth
+            // limit large but not huge.
+            conf.getInt(MAX_TRIE_DEPTH, 200));
+      } else {
+        partitions = new BinarySearchNode(splitPoints, comparator);
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  // by construction, we know if our keytype
+  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
+  public int getPartition(K key, V value, int numPartitions) {
+    return partitions.findPartition(key);
+  }
+
+  /**
+   * Set the path to the SequenceFile storing the sorted partition keyset.
+   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
+   * keys in the SequenceFile.
+   */
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set(PARTITIONER_PATH, p.toString());
+  }
+
+  /**
+   * Get the path to the SequenceFile storing the sorted partition keyset.
+   * @see #setPartitionFile(Configuration, Path)
+   */
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+  }
+
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+
+  /**
+   * Base class for trie nodes. If the keytype is memcomp-able, this builds
+   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
+   * bytes.
+   */
+  static abstract class TrieNode implements Node<BinaryComparable> {
+    private final int level;
+    TrieNode(int level) {
+      this.level = level;
+    }
+    int getLevel() {
+      return level;
+    }
+  }
+
+  /**
+   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
+   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
+   * search the partition keyset with a binary search.
+   */
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+  /**
+   * An inner trie node that contains 256 children based on the next
+   * character.
+   */
+  class InnerTrieNode extends TrieNode {
+    private TrieNode[] child = new TrieNode[256];
+
+    InnerTrieNode(int level) {
+      super(level);
+    }
+    public int findPartition(BinaryComparable key) {
+      int level = getLevel();
+      if (key.getLength() <= level) {
+        return child[0].findPartition(key);
+      }
+      return child[0xFF & key.getBytes()[level]].findPartition(key);
+    }
+  }
+  
+  /**
+   * @param level        the tree depth at this node
+   * @param splitPoints  the full split point vector, which holds
+   *                     the split point or points this leaf node
+   *                     should contain
+   * @param lower        first INcluded element of splitPoints
+   * @param upper        first EXcluded element of splitPoints
+   * @return  a leaf node.  They come in three kinds: no split points 
+   *          [and the findParttion returns a canned index], one split
+   *          point [and we compare with a single comparand], or more
+   *          than one [and we do a binary search].  The last case is
+   *          rare.
+   */
+  private TrieNode LeafTrieNodeFactory
+             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      switch (upper - lower) {
+      case 0:
+          return new UnsplitTrieNode(level, lower);
+          
+      case 1:
+          return new SinglySplitTrieNode(level, splitPoints, lower);
+          
+      default:
+          return new LeafTrieNode(level, splitPoints, lower, upper);
+      }
+  }
+
+  /**
+   * A leaf trie node that scans for the key between lower..upper.
+   * 
+   * We don't generate many of these now, since we usually continue trie-ing 
+   * when more than one split point remains at this level. and we make different
+   * objects for nodes with 0 or 1 split point.
+   */
+  private class LeafTrieNode extends TrieNode {
+    final int lower;
+    final int upper;
+    final BinaryComparable[] splitPoints;
+    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      super(level);
+      this.lower = lower;
+      this.upper = upper;
+      this.splitPoints = splitPoints;
+    }
+    public int findPartition(BinaryComparable key) {
+      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+  
+  private class UnsplitTrieNode extends TrieNode {
+      final int result;
+      
+      UnsplitTrieNode(int level, int value) {
+          super(level);
+          this.result = value;
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return result;
+      }
+  }
+  
+  private class SinglySplitTrieNode extends TrieNode {
+      final int               lower;
+      final BinaryComparable  mySplitPoint;
+      
+      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
+          super(level);
+          this.lower = lower;
+          this.mySplitPoint = splitPoints[lower];
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
+      }
+  }
+
+
+  /**
+   * Read the cut points from the given IFile.
+   * @param fs The file system
+   * @param p The path to read
+   * @param keyClass The map output key class
+   * @param job The job config
+   * @throws IOException
+   */
+                                 // matching key types enforced by passing in
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      Configuration conf) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    ArrayList<K> parts = new ArrayList<K>();
+    K key = ReflectionUtils.newInstance(keyClass, conf);
+    NullWritable value = NullWritable.get();
+    while (reader.next(key, value)) {
+      parts.add(key);
+      key = ReflectionUtils.newInstance(keyClass, conf);
+    }
+    reader.close();
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+  
+  /**
+   * 
+   * This object contains a TrieNodeRef if there is such a thing that
+   * can be repeated.  Two adjacent trie node slots that contain no 
+   * split points can be filled with the same trie node, even if they
+   * are not on the same level.  See buildTreeRec, below.
+   *
+   */  
+  private class CarriedTrieNodeRef
+  {
+      TrieNode   content;
+      
+      CarriedTrieNodeRef() {
+          content = null;
+      }
+  }
+
+  
+  /**
+   * Given a sorted set of cut points, build a trie that will find the correct
+   * partition quickly.
+   * @param splits the list of cut points
+   * @param lower the lower bound of partitions 0..numPartitions-1
+   * @param upper the upper bound of partitions 0..numPartitions-1
+   * @param prefix the prefix that we have already checked against
+   * @param maxDepth the maximum depth we will build a trie for
+   * @return the trie node that will divide the splits correctly
+   */
+  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
+          int upper, byte[] prefix, int maxDepth) {
+      return buildTrieRec
+               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
+  }
+  
+  /**
+   * This is the core of buildTrie.  The interface, and stub, above, just adds
+   * an empty CarriedTrieNodeRef.  
+   * 
+   * We build trie nodes in depth first order, which is also in key space
+   * order.  Every leaf node is referenced as a slot in a parent internal
+   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
+   * no split point, then they are not separated by a split point either, 
+   * because there's no place in key space for that split point to exist.
+   * 
+   * When that happens, the leaf nodes would be semantically identical, and
+   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
+   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
+   * leaf node for such reuse until a leaf node with a split arises, which 
+   * breaks the chain until we need to make a new unsplit leaf node.
+   * 
+   * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
+   * for internal nodes if this code is modified in any way we still need 
+   * to make or fill in the subnodes in key space order.
+   */
+  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
+      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
+    final int depth = prefix.length;
+    // We generate leaves for a single split point as well as for 
+    // no split points.
+    if (depth >= maxDepth || lower >= upper - 1) {
+        // If we have two consecutive requests for an unsplit trie node, we
+        // can deliver the same one the second time.
+        if (lower == upper && ref.content != null) {
+            return ref.content;
+        }
+        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
+        ref.content = lower == upper ? result : null;
+        return result;
+    }
+    InnerTrieNode result = new InnerTrieNode(depth);
+    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
+    // append an extra byte on to the prefix
+    int         currentBound = lower;
+    for(int ch = 0; ch < 0xFF; ++ch) {
+      trial[depth] = (byte) (ch + 1);
+      lower = currentBound;
+      while (currentBound < upper) {
+        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
+          break;
+        }
+        currentBound += 1;
+      }
+      trial[depth] = (byte) ch;
+      result.child[0xFF & ch]
+                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    }
+    // pick up the rest
+    trial[depth] = (byte)0xFF;
+    result.child[0xFF] 
+                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    
+    return result;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jun  2 00:40:48 2010
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -378,11 +379,10 @@ public class HRegion implements HConstan
    * @return True if this region has references.
    */
   boolean hasReferences() {
-    for (Map.Entry<byte [], Store> e: this.stores.entrySet()) {
-      for (Map.Entry<Long, StoreFile> ee:
-          e.getValue().getStorefiles().entrySet()) {
+    for (Store store : this.stores.values()) {
+      for (StoreFile sf : store.getStorefiles()) {
         // Found a reference, return.
-        if (ee.getValue().isReference()) return true;
+        if (sf.isReference()) return true;
       }
     }
     return false;
@@ -1883,6 +1883,23 @@ public class HRegion implements HConstan
       }
     }
   }
+  
+  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  throws IOException {
+    splitsAndClosesLock.readLock().lock();
+    try {
+      Store store = getStore(familyName);
+      if (store == null) {
+        throw new DoNotRetryIOException(
+            "No such column family " + Bytes.toStringBinary(familyName));
+      }
+      store.bulkLoadHFile(hfilePath);
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+    }
+    
+  }
+
 
   @Override
   public boolean equals(Object o) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jun  2 00:40:48 2010
@@ -1979,6 +1979,14 @@ public class HRegionServer implements HC
     }
   }
 
+  @Override
+  public void bulkLoadHFile(
+      String hfilePath, byte[] regionName, byte[] familyName)
+  throws IOException {
+    HRegion region = getRegion(regionName);
+    region.bulkLoadHFile(hfilePath, familyName);
+  }
+  
   Map<String, Integer> rowlocks =
     new ConcurrentHashMap<String, Integer>();
 
@@ -2428,4 +2436,5 @@ public class HRegionServer implements HC
         HRegionServer.class);
     doMain(args, regionServerClass);
   }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Jun  2 00:40:48 2010
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -47,11 +48,15 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +64,7 @@ import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -115,13 +121,11 @@ public class Store implements HConstants
   private final boolean inMemory;
 
   /*
-   * Sorted Map of readers keyed by maximum edit sequence id (Most recent should
-   * be last in in list).  ConcurrentSkipListMap is lazily consistent so no
-   * need to lock it down when iterating; iterator view is that of when the
-   * iterator was taken out.
+   * List of store files inside this store. This is an immutable list that
+   * is atomically replaced when its contents change.
    */
-  private final NavigableMap<Long, StoreFile> storefiles =
-    new ConcurrentSkipListMap<Long, StoreFile>();
+  private ImmutableList<StoreFile> storefiles = null;
+
 
   // All access must be synchronized.
   private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
@@ -222,7 +226,7 @@ public class Store implements HConstants
     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
 
     // loadStoreFiles calculates this.maxSeqId. as side-effect.
-    this.storefiles.putAll(loadStoreFiles());
+    this.storefiles = ImmutableList.copyOf(loadStoreFiles());
 
     this.maxSeqIdBeforeLogRecovery = this.maxSeqId;
 
@@ -395,9 +399,9 @@ public class Store implements HConstants
    * Creates a series of StoreFile loaded from the given directory.
    * @throws IOException
    */
-  private Map<Long, StoreFile> loadStoreFiles()
+  private List<StoreFile> loadStoreFiles()
   throws IOException {
-    Map<Long, StoreFile> results = new HashMap<Long, StoreFile>();
+    ArrayList<StoreFile> results = new ArrayList<StoreFile>();
     FileStatus files[] = this.fs.listStatus(this.homedir);
     for (int i = 0; files != null && i < files.length; i++) {
       // Skip directories.
@@ -422,20 +426,15 @@ public class Store implements HConstants
           "Verify!", ioe);
         continue;
       }
-      long storeSeqId = curfile.getMaxSequenceId();
-      if (storeSeqId > this.maxSeqId) {
-        this.maxSeqId = storeSeqId;
-      }
       long length = curfile.getReader().length();
       this.storeSize += length;
       if (LOG.isDebugEnabled()) {
-        LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
-          curfile.isReference() + ", sequence id=" + storeSeqId +
-          ", length=" + length + ", majorCompaction=" +
-          curfile.isMajorCompaction());
+        LOG.debug("loaded " + curfile.toStringDetailed());
       }
-      results.put(Long.valueOf(storeSeqId), curfile);
+      results.add(curfile);
     }
+    maxSeqId = StoreFile.getMaxSequenceIdInList(results);
+    Collections.sort(results, StoreFile.Comparators.FLUSH_TIME);
     return results;
   }
 
@@ -472,10 +471,77 @@ public class Store implements HConstants
   /**
    * @return All store files.
    */
-  NavigableMap<Long, StoreFile> getStorefiles() {
+  List<StoreFile> getStorefiles() {
     return this.storefiles;
   }
 
+  public void bulkLoadHFile(String srcPathStr) throws IOException {
+    Path srcPath = new Path(srcPathStr);
+    
+    HFile.Reader reader  = null;
+    try {
+      LOG.info("Validating hfile at " + srcPath + " for inclusion in "
+          + "store " + this + " region " + this.region);
+      reader = new HFile.Reader(srcPath.getFileSystem(conf),
+          srcPath, null, false);
+      reader.loadFileInfo();
+      
+      byte[] firstKey = reader.getFirstRowKey();
+      byte[] lastKey = reader.getLastRowKey();
+      
+      LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
+          " last=" + Bytes.toStringBinary(lastKey));
+      LOG.debug("Region bounds: first=" +
+          Bytes.toStringBinary(region.getStartKey()) +
+          " last=" + Bytes.toStringBinary(region.getEndKey()));
+      
+      HRegionInfo hri = region.getRegionInfo();
+      if (!hri.containsRange(firstKey, lastKey)) {
+        throw new WrongRegionException(
+            "Bulk load file " + srcPathStr + " does not fit inside region " 
+            + this.region);
+      }
+    } finally {
+      if (reader != null) reader.close();
+    }
+
+    // Move the file if it's on another filesystem
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    if (!srcFs.equals(fs)) {
+      LOG.info("File " + srcPath + " on different filesystem than " +
+          "destination store - moving to this filesystem.");
+      Path tmpDir = new Path(homedir, "_tmp");
+      Path tmpPath = StoreFile.getRandomFilename(fs, tmpDir);
+      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
+      LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
+      srcPath = tmpPath;
+    }
+    
+    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
+    LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
+    StoreFile.rename(fs, srcPath, dstPath);
+    
+    StoreFile sf = new StoreFile(fs, dstPath, blockcache,
+        this.conf, this.family.getBloomFilterType(), this.inMemory);
+    sf.createReader();
+    
+    LOG.info("Moved hfile " + srcPath + " into store directory " +
+        homedir + " - updating store file list.");
+
+    // Append the new storefile into the list
+    this.lock.writeLock().lock();
+    try {
+      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
+      newFiles.add(sf);
+      this.storefiles = ImmutableList.copyOf(newFiles);
+      notifyChangedReadersObservers();
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+    LOG.info("Successfully loaded store file " + srcPath
+        + " into store " + this + " (new location: " + dstPath + ")"); 
+  }
+
   /**
    * Close all the readers
    *
@@ -484,13 +550,14 @@ public class Store implements HConstants
    *
    * @throws IOException
    */
-  List<StoreFile> close() throws IOException {
+  ImmutableList<StoreFile> close() throws IOException {
     this.lock.writeLock().lock();
     try {
-      ArrayList<StoreFile> result =
-        new ArrayList<StoreFile>(storefiles.values());
+      ImmutableList<StoreFile> result = storefiles;
+      
       // Clear so metrics doesn't find them.
-      this.storefiles.clear();
+      storefiles = ImmutableList.of();
+      
       for (StoreFile f: result) {
         f.closeReader();
       }
@@ -591,19 +658,19 @@ public class Store implements HConstants
 
   /*
    * Change storefiles adding into place the Reader produced by this new flush.
-   * @param logCacheFlushId
    * @param sf
    * @param set That was used to make the passed file <code>p</code>.
    * @throws IOException
    * @return Whether compaction is required.
    */
-  private boolean updateStorefiles(final long logCacheFlushId,
-    final StoreFile sf, final SortedSet<KeyValue> set)
+  private boolean updateStorefiles(final StoreFile sf,
+                                   final SortedSet<KeyValue> set)
   throws IOException {
     this.lock.writeLock().lock();
     try {
-      this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
-
+      ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
+      newList.add(sf);
+      storefiles = ImmutableList.copyOf(newList);
       this.memstore.clearSnapshot(set);
 
       // Tell listeners of the change in readers.
@@ -670,15 +737,14 @@ public class Store implements HConstants
     boolean majorcompaction = mc;
     synchronized (compactLock) {
       // filesToCompact are sorted oldest to newest.
-      List<StoreFile> filesToCompact =
-        new ArrayList<StoreFile>(this.storefiles.values());
+      List<StoreFile> filesToCompact = this.storefiles;
       if (filesToCompact.isEmpty()) {
         LOG.debug(this.storeNameStr + ": no store files to compact");
         return null;
       }
 
       // Max-sequenceID is the last key of the storefiles TreeMap
-      long maxId = this.storefiles.lastKey().longValue();
+      long maxId = StoreFile.getMaxSequenceIdInList(storefiles);
 
       // Check to see if we need to do a major compaction on this region.
       // If so, change doMajorCompaction to true to skip the incremental
@@ -819,10 +885,7 @@ public class Store implements HConstants
    * @return True if we should run a major compaction.
    */
   boolean isMajorCompaction() throws IOException {
-    List<StoreFile> filesToCompact = null;
-    // filesToCompact are sorted oldest to newest.
-    filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
-    return isMajorCompaction(filesToCompact);
+    return isMajorCompaction(storefiles);
   }
 
   /*
@@ -990,16 +1053,18 @@ public class Store implements HConstants
         // delete old store files until we have sent out notification of
         // change in case old files are still being accessed by outstanding
         // scanners.
-        for (Map.Entry<Long, StoreFile> e: this.storefiles.entrySet()) {
-          if (compactedFiles.contains(e.getValue())) {
-            this.storefiles.remove(e.getKey());
+        ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
+        for (StoreFile sf : storefiles) {
+          if (!compactedFiles.contains(sf)) {
+            newStoreFiles.add(sf);
           }
         }
+        
         // If a StoreFile result, move it into place.  May be null.
         if (result != null) {
-          Long orderVal = Long.valueOf(result.getMaxSequenceId());
-          this.storefiles.put(orderVal, result);
+          newStoreFiles.add(result);
         }
+        this.storefiles = ImmutableList.copyOf(newStoreFiles);
 
         // WARN ugly hack here, but necessary sadly.
         // TODO why is this necessary? need a comment here if it's unintuitive!
@@ -1020,7 +1085,7 @@ public class Store implements HConstants
       }
       // 4. Compute new store size
       this.storeSize = 0L;
-      for (StoreFile hsf : this.storefiles.values()) {
+      for (StoreFile hsf : this.storefiles) {
         Reader r = hsf.getReader();
         if (r == null) {
           LOG.warn("StoreFile " + hsf + " has a null Reader");
@@ -1094,10 +1159,9 @@ public class Store implements HConstants
       this.memstore.getRowKeyAtOrBefore(state);
       // Check if match, if we got a candidate on the asked for 'kv' row.
       // Process each store file. Run through from newest to oldest.
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      for (Map.Entry<Long, StoreFile> e : m.entrySet()) {
+      for (StoreFile sf : Iterables.reverse(storefiles)) {
         // Update the candidate keys from the current map file
-        rowAtOrBeforeFromStoreFile(e.getValue(), state);
+        rowAtOrBeforeFromStoreFile(sf, state);
       }
       return state.getCandidate();
     } finally {
@@ -1232,9 +1296,8 @@ public class Store implements HConstants
       // Not splitable if we find a reference store file present in the store.
       boolean splitable = true;
       long maxSize = 0L;
-      Long mapIndex = Long.valueOf(0L);
-      for (Map.Entry<Long, StoreFile> e: storefiles.entrySet()) {
-        StoreFile sf = e.getValue();
+      StoreFile largestSf = null;
+      for (StoreFile sf : storefiles) {
         if (splitable) {
           splitable = !sf.isReference();
           if (!splitable) {
@@ -1254,13 +1317,12 @@ public class Store implements HConstants
         if (size > maxSize) {
           // This is the largest one so far
           maxSize = size;
-          mapIndex = e.getKey();
+          largestSf = sf;
         }
       }
-      StoreFile sf = this.storefiles.get(mapIndex);
-      HFile.Reader r = sf.getReader();
+      HFile.Reader r = largestSf.getReader();
       if (r == null) {
-        LOG.warn("Storefile " + sf + " Reader is null");
+        LOG.warn("Storefile " + largestSf + " Reader is null");
         return null;
       }
       // Get first, last, and mid keys.  Midkey is the key that starts block
@@ -1333,7 +1395,7 @@ public class Store implements HConstants
    */
   long getStorefilesSize() {
     long size = 0;
-    for (StoreFile s: storefiles.values()) {
+    for (StoreFile s: storefiles) {
       Reader r = s.getReader();
       if (r == null) {
         LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1349,7 +1411,7 @@ public class Store implements HConstants
    */
   long getStorefilesIndexSize() {
     long size = 0;
-    for (StoreFile s: storefiles.values()) {
+    for (StoreFile s: storefiles) {
       Reader r = s.getReader();
       if (r == null) {
         LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1449,7 +1511,7 @@ public class Store implements HConstants
 
       // Get storefiles for this store
       List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
-      for (StoreFile sf : this.storefiles.descendingMap().values()) {
+      for (StoreFile sf : Iterables.reverse(this.storefiles)) {
         HFile.Reader r = sf.getReader();
         if (r == null) {
           LOG.warn("StoreFile " + sf + " has a null Reader");
@@ -1565,7 +1627,7 @@ public class Store implements HConstants
       }
       // Add new file to store files.  Clear snapshot too while we have
       // the Store write lock.
-      return Store.this.updateStorefiles(cacheFlushId, storeFile, snapshot);
+      return Store.this.updateStorefiles(storeFile, snapshot);
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Wed Jun  2 00:40:48 2010
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -42,14 +41,20 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Hash;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.nio.ByteBuffer;
-import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.Random;
@@ -99,15 +104,28 @@ public class StoreFile implements HConst
   /** Constant for major compaction meta */
   public static final byte [] MAJOR_COMPACTION_KEY =
     Bytes.toBytes("MAJOR_COMPACTION_KEY");
+  
   // If true, this file was product of a major compaction.  Its then set
   // whenever you get a Reader.
   private AtomicBoolean majorCompaction = null;
 
+  /** Meta key set when store file is a result of a bulk load */
+  public static final byte[] BULKLOAD_TASK_KEY =
+    Bytes.toBytes("BULKLOAD_SOURCE_TASK");
+  public static final byte[] BULKLOAD_TIME_KEY =
+    Bytes.toBytes("BULKLOAD_TIMESTAMP");
+
+  
   static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
   static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
   static final byte[] BLOOM_FILTER_TYPE_KEY = 
     Bytes.toBytes("BLOOM_FILTER_TYPE");
 
+  /**
+   * Map of the metadata entries in the corresponding HFile
+   */
+  private Map<byte[], byte[]> metadataMap;
+
   /*
    * Regex that will work for straight filenames and for reference names.
    * If reference, then the regex has more than just one group.  Group 1 is
@@ -123,6 +141,7 @@ public class StoreFile implements HConst
   private final Configuration conf;
   private final BloomType bloomType;
 
+
   /**
    * Constructor, loads a reader and it's indices, etc. May allocate a
    * substantial amount of ram depending on the underlying files (10-20MB?).
@@ -183,7 +202,8 @@ public class StoreFile implements HConst
    * @return True if the path has format of a HStoreFile reference.
    */
   public static boolean isReference(final Path p) {
-    return isReference(p, REF_NAME_PARSER.matcher(p.getName()));
+    return !p.getName().startsWith("_") &&
+      isReference(p, REF_NAME_PARSER.matcher(p.getName()));
   }
 
   /**
@@ -244,6 +264,38 @@ public class StoreFile implements HConst
     }
     return this.sequenceid;
   }
+  
+  /**
+   * Return the highest sequence ID found across all storefiles in
+   * the given list. Store files that were created by a mapreduce
+   * bulk load are ignored, as they do not correspond to any edit
+   * log items.
+   * @return 0 if no non-bulk-load files are provided
+   */
+  public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
+    long max = 0;
+    for (StoreFile sf : sfs) {
+      if (!sf.isBulkLoadResult()) {
+        max = Math.max(max, sf.getMaxSequenceId());
+      }
+    }
+    return max;
+  }
+
+  /**
+   * @return true if this storefile was created by HFileOutputFormat
+   * for a bulk load.
+   */
+  boolean isBulkLoadResult() {
+    return metadataMap.containsKey(BULKLOAD_TIME_KEY);
+  }
+
+  /**
+   * Return the timestamp at which this bulk load file was generated.
+   */
+  public long getBulkLoadTimestamp() {
+    return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
+  }
 
   /**
    * Returns the block cache or <code>null</code> in case none should be used.
@@ -297,9 +349,9 @@ public class StoreFile implements HConst
           this.inMemory);
     }
     // Load up indices and fileinfo.
-    Map<byte [], byte []> map = this.reader.loadFileInfo();
+    metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
     // Read in our metadata.
-    byte [] b = map.get(MAX_SEQ_ID_KEY);
+    byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
     if (b != null) {
       // By convention, if halfhfile, top half has a sequence number > bottom
       // half. Thats why we add one in below. Its done for case the two halves
@@ -314,7 +366,7 @@ public class StoreFile implements HConst
       }
 
     }
-    b = map.get(MAJOR_COMPACTION_KEY);
+    b = metadataMap.get(MAJOR_COMPACTION_KEY);
     if (b != null) {
       boolean mc = Bytes.toBoolean(b);
       if (this.majorCompaction == null) {
@@ -371,10 +423,28 @@ public class StoreFile implements HConst
   }
 
   @Override
-  public String toString() {
+  public String toString() {    
     return this.path.toString() +
       (isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
   }
+  
+  /**
+   * @return a length description of this StoreFile, suitable for debug output
+   */
+  public String toStringDetailed() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.path.toString());
+    sb.append(", isReference=").append(isReference());
+    sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
+    if (isBulkLoadResult()) {
+      sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
+    } else {
+      sb.append(", seqid=").append(getMaxSequenceId());
+    }
+    sb.append(", majorCompaction=").append(isMajorCompaction());
+
+    return sb.toString();
+  }
 
   /**
    * Utility to help with rename.
@@ -813,4 +883,44 @@ public class StoreFile implements HConst
     }
     
   }
+  
+  /**
+   * Useful comparators for comparing StoreFiles.
+   */
+  abstract static class Comparators {
+    /**
+     * Comparator that compares based on the flush time of
+     * the StoreFiles. All bulk loads are placed before all non-
+     * bulk loads, and then all files are sorted by sequence ID.
+     * If there are ties, the path name is used as a tie-breaker.
+     */
+    static final Comparator<StoreFile> FLUSH_TIME =
+      Ordering.compound(ImmutableList.of(
+          Ordering.natural().onResultOf(new GetBulkTime()),
+          Ordering.natural().onResultOf(new GetSeqId()),
+          Ordering.natural().onResultOf(new GetPathName())
+      ));
+
+    private static class GetBulkTime implements Function<StoreFile, Long> {
+      @Override
+      public Long apply(StoreFile sf) {
+        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
+        return sf.getBulkLoadTimestamp();
+      }
+    }
+    private static class GetSeqId implements Function<StoreFile, Long> {
+      @Override
+      public Long apply(StoreFile sf) {
+        if (sf.isBulkLoadResult()) return -1L;
+        return sf.getMaxSequenceId();
+      }
+    }
+    private static class GetPathName implements Function<StoreFile, String> {
+      @Override
+      public String apply(StoreFile sf) {
+        return sf.getPath().getName();
+      }
+    }
+
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Jun  2 00:40:48 2010
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Sc
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableSet;
 
 /**
@@ -126,9 +125,12 @@ class StoreScanner implements KeyValueSc
    */
   private List<KeyValueScanner> getScanners() throws IOException {
     // First the store file scanners
-    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
+    
+    // TODO this used to get the store files in descending order,
+    // but now we get them in ascending order, which I think is
+    // actually more correct, since memstore get put at the end.
     List<StoreFileScanner> sfScanners = StoreFileScanner
-      .getScannersForStoreFiles(map.values(), cacheBlocks, isGet);
+      .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
     scanners.addAll(sfScanners);
@@ -143,9 +145,8 @@ class StoreScanner implements KeyValueSc
   private List<KeyValueScanner> getScanners(Scan scan, 
       final NavigableSet<byte[]> columns) throws IOException {
     // First the store file scanners
-    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
     List<StoreFileScanner> sfScanners = StoreFileScanner
-      .getScannersForStoreFiles(map.values(), cacheBlocks, isGet);
+      .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Wed Jun  2 00:40:48 2010
@@ -34,6 +34,7 @@ import java.io.UnsupportedEncodingExcept
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
+import java.util.Iterator;
 
 /**
  * Utility class that handles byte arrays, conversions to/from other types,
@@ -890,6 +891,16 @@ public class Bytes {
   }
 
   /**
+   * Return true if the byte array on the right is a prefix of the byte
+   * array on the left.
+   */
+  public static boolean startsWith(byte[] bytes, byte[] prefix) {
+    return bytes != null && prefix != null &&
+      bytes.length >= prefix.length &&
+      compareTo(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0;      
+  }
+
+  /**
    * @param b bytes to hash
    * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
    * passed in array.  This method is what {@link org.apache.hadoop.io.Text} and
@@ -1016,6 +1027,22 @@ public class Bytes {
    * @return Array of dividing values
    */
   public static byte [][] split(final byte [] a, final byte [] b, final int num) {
+    byte[][] ret = new byte[num+2][];
+    int i = 0;
+    Iterable<byte[]> iter = iterateOnSplits(a, b, num);
+    if (iter == null) return null;
+    for (byte[] elem : iter) {
+      ret[i++] = elem;
+    }
+    return ret;
+  }
+  
+  /**
+   * Iterate over keys within the passed inclusive range.
+   */
+  public static Iterable<byte[]> iterateOnSplits(
+      final byte[] a, final byte[]b, final int num)
+  {  
     byte [] aPadded;
     byte [] bPadded;
     if (a.length < b.length) {
@@ -1035,14 +1062,14 @@ public class Bytes {
       throw new IllegalArgumentException("num cannot be < 0");
     }
     byte [] prependHeader = {1, 0};
-    BigInteger startBI = new BigInteger(add(prependHeader, aPadded));
-    BigInteger stopBI = new BigInteger(add(prependHeader, bPadded));
-    BigInteger diffBI = stopBI.subtract(startBI);
-    BigInteger splitsBI = BigInteger.valueOf(num + 1);
+    final BigInteger startBI = new BigInteger(add(prependHeader, aPadded));
+    final BigInteger stopBI = new BigInteger(add(prependHeader, bPadded));
+    final BigInteger diffBI = stopBI.subtract(startBI);
+    final BigInteger splitsBI = BigInteger.valueOf(num + 1);
     if(diffBI.compareTo(splitsBI) < 0) {
       return null;
     }
-    BigInteger intervalBI;
+    final BigInteger intervalBI;
     try {
       intervalBI = diffBI.divide(splitsBI);
     } catch(Exception e) {
@@ -1050,20 +1077,42 @@ public class Bytes {
       return null;
     }
 
-    byte [][] result = new byte[num+2][];
-    result[0] = a;
+    final Iterator<byte[]> iterator = new Iterator<byte[]>() {
+      private int i = -1;
+      
+      @Override
+      public boolean hasNext() {
+        return i < num+1;
+      }
 
-    for (int i = 1; i <= num; i++) {
-      BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i)));
-      byte [] padded = curBI.toByteArray();
-      if (padded[1] == 0)
-        padded = tail(padded, padded.length - 2);
-      else
-        padded = tail(padded, padded.length - 1);
-      result[i] = padded;
-    }
-    result[num+1] = b;
-    return result;
+      @Override
+      public byte[] next() {
+        i++;
+        if (i == 0) return a;
+        if (i == num + 1) return b;
+        
+        BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i)));
+        byte [] padded = curBI.toByteArray();
+        if (padded[1] == 0)
+          padded = tail(padded, padded.length - 2);
+        else
+          padded = tail(padded, padded.length - 1);
+        return padded;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+      
+    };
+    
+    return new Iterable<byte[]>() {
+      @Override
+      public Iterator<byte[]> iterator() {
+        return iterator;
+      }
+    };
   }
 
   /**

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Wed Jun  2 00:40:48 2010
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase;
 
 import java.io.File;
 import java.io.IOException;
+import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -47,8 +49,10 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.zookeeper.ZooKeeper;
+import static org.junit.Assert.*;
 
 /**
  * Facility for testing HBase. Added as tool to abet junit4 testing.  Replaces
@@ -76,7 +80,7 @@ public class HBaseTestingUtility {
   public static final String TEST_DIRECTORY_KEY = "test.build.data";
 
   /**
-   * Default parent directory for test output.
+   * Default parent direccounttory for test output.
    */
   public static final String DEFAULT_TEST_DIRECTORY = "target/build/data";
 
@@ -484,6 +488,19 @@ public class HBaseTestingUtility {
     return count;
   }
 
+  /**
+   * Return an md5 digest of the entire contents of a table.
+   */
+  public String checksumRows(final HTable table) throws Exception {
+    Scan scan = new Scan();
+    ResultScanner results = table.getScanner(scan);
+    MessageDigest digest = MessageDigest.getInstance("MD5");
+    for (Result res : results) {
+      digest.update(res.getRow());
+    }
+    results.close();
+    return digest.toString();
+  }
   
   /**
    * Creates many regions names "aaa" to "zzz".
@@ -520,7 +537,13 @@ public class HBaseTestingUtility {
       Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
       Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
     };
-
+    return createMultiRegions(c, table, columnFamily, KEYS);
+  }
+  
+  public int createMultiRegions(final Configuration c, final HTable table,
+      final byte[] columnFamily, byte [][] startKeys)
+  throws IOException {
+    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
     HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
     HTableDescriptor htd = table.getTableDescriptor();
     if(!htd.hasFamily(columnFamily)) {
@@ -531,13 +554,13 @@ public class HBaseTestingUtility {
     // setup already has the "<tablename>,,123456789" row with an empty start
     // and end key. Adding the custom regions below adds those blindly,
     // including the new start region from empty to "bbb". lg
-    List<byte[]> rows = getMetaTableRows();
+    List<byte[]> rows = getMetaTableRows(htd.getName());
     // add custom ones
     int count = 0;
-    for (int i = 0; i < KEYS.length; i++) {
-      int j = (i + 1) % KEYS.length;
+    for (int i = 0; i < startKeys.length; i++) {
+      int j = (i + 1) % startKeys.length;
       HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(),
-        KEYS[i], KEYS[j]);
+        startKeys[i], startKeys[j]);
       Put put = new Put(hri.getRegionName());
       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
         Writables.getBytes(hri));
@@ -574,6 +597,29 @@ public class HBaseTestingUtility {
     s.close();
     return rows;
   }
+  
+  /**
+   * Returns all rows from the .META. table for a given user table
+   *
+   * @throws IOException When reading the rows fails.
+   */
+  public List<byte[]> getMetaTableRows(byte[] tableName) throws IOException {
+    HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME);
+    List<byte[]> rows = new ArrayList<byte[]>();
+    ResultScanner s = t.getScanner(new Scan());
+    for (Result result : s) {
+      HRegionInfo info = Writables.getHRegionInfo(
+          result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
+      HTableDescriptor desc = info.getTableDesc();
+      if (Bytes.compareTo(desc.getName(), tableName) == 0) {
+        LOG.info("getMetaTableRows: row -> " +
+            Bytes.toStringBinary(result.getRow()));
+        rows.add(result.getRow());
+      }
+    }
+    s.close();
+    return rows;
+  }
 
   /**
    * Starts a <code>MiniMRCluster</code> with a default number of
@@ -600,6 +646,8 @@ public class HBaseTestingUtility {
     mrCluster = new MiniMRCluster(servers,
       FileSystem.get(c).getUri().toString(), 1);
     LOG.info("Mini mapreduce cluster started");
+    c.set("mapred.job.tracker",
+        mrCluster.createJobConf().get("mapred.job.tracker"));
   }
 
   /**
@@ -610,6 +658,8 @@ public class HBaseTestingUtility {
     if (mrCluster != null) {
       mrCluster.shutdown();
     }
+    // Restore configuration to point to local jobtracker
+    conf.set("mapred.job.tracker", "local");
     LOG.info("Mini mapreduce cluster stopped");
   }
 
@@ -746,4 +796,19 @@ public class HBaseTestingUtility {
   public FileSystem getTestFileSystem() throws IOException {
     return FileSystem.get(conf);
   }
+
+  public void cleanupTestDir() throws IOException {
+    getTestDir().getFileSystem(conf).delete(getTestDir(), true);    
+  }
+
+  public void waitTableAvailable(byte[] table, long timeoutMillis)
+  throws InterruptedException, IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    long startWait = System.currentTimeMillis();
+    while (!admin.isTableAvailable(table)) {
+      assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table),
+          System.currentTimeMillis() - startWait < timeoutMillis);
+      Thread.sleep(500);
+    }
+  }
 }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,127 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Input format that creates as many map tasks as configured in
+ * mapred.map.tasks, each provided with a single row of
+ * NullWritables. This can be useful when trying to write mappers
+ * which don't have any real input (eg when the mapper is simply
+ * producing random data as output)
+ */
+public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> {
+
+  @Override
+  public RecordReader<NullWritable, NullWritable> createRecordReader(
+      InputSplit split,
+      TaskAttemptContext tac) throws IOException, InterruptedException {
+    return new SingleRecordReader<NullWritable, NullWritable>(
+        NullWritable.get(), NullWritable.get());
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    int count = context.getConfiguration().getInt("mapred.map.tasks", 1);
+    List<InputSplit> splits = new ArrayList<InputSplit>(count);
+    for (int i = 0; i < count; i++) {
+      splits.add(new NullInputSplit());
+    }
+    return splits;
+  }
+
+  private static class NullInputSplit extends InputSplit implements Writable {
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[] {};
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+  }
+  
+  private static class SingleRecordReader<K, V>
+    extends RecordReader<K, V> {
+    
+    private final K key;
+    private final V value;
+    boolean providedKey = false;
+
+    SingleRecordReader(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public K getCurrentKey() {
+      return key;
+    }
+
+    @Override
+    public V getCurrentValue(){
+      return value;
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext tac) {
+    }
+
+    @Override
+    public boolean nextKeyValue() {
+      if (providedKey) return false;
+      providedKey = true;
+      return true;
+    }
+    
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Wed Jun  2 00:40:48 2010
@@ -19,28 +19,35 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
@@ -49,144 +56,257 @@ import org.apache.hadoop.mapreduce.lib.o
  * emits keys and values like those of {@link PerformanceEvaluation}.  Makes
  * as many splits as "mapred.map.tasks" maps.
  */
-public class TestHFileOutputFormat extends HBaseTestCase {
+public class TestHFileOutputFormat  {
   private final static int ROWSPERSPLIT = 1024;
 
-  /*
-   * InputFormat that makes keys and values like those used in
-   * PerformanceEvaluation.  Makes as many splits as there are configured
-   * maps ("mapred.map.tasks").
+  private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
+  
+  private HBaseTestingUtility util = new HBaseTestingUtility();
+  
+  private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
+  
+  /**
+   * Simple mapper that makes KeyValue output.
    */
-  static class PEInputFormat extends InputFormat<ImmutableBytesWritable, ImmutableBytesWritable> {
-    /* Split that holds nothing but split index.
-     */
-    static class PEInputSplit extends InputSplit implements Writable {
-      private int index = -1;
-
-      PEInputSplit() {
-        super();
-      }
-
-      PEInputSplit(final int i) {
-        this.index = i;
-      }
-
-      int getIndex() {
-        return this.index;
-      }
-
-      public long getLength() throws IOException, InterruptedException {
-        return ROWSPERSPLIT;
-      }
-
-      public String [] getLocations() throws IOException, InterruptedException {
-        return new String [] {};
-      }
-
-      public void readFields(DataInput in) throws IOException {
-        this.index = in.readInt();
-      }
-
-      public void write(DataOutput out) throws IOException {
-        out.writeInt(this.index);
-      }
-    }
-
-    public RecordReader<ImmutableBytesWritable, ImmutableBytesWritable> createRecordReader(
-        InputSplit split, TaskAttemptContext context) throws IOException,
+  static class RandomKVGeneratingMapper
+  extends Mapper<NullWritable, NullWritable,
+                 ImmutableBytesWritable, KeyValue> {
+    
+    private int keyLength;
+    private static final int KEYLEN_DEFAULT=10;
+    private static final String KEYLEN_CONF="randomkv.key.length";
+
+    private int valLength;
+    private static final int VALLEN_DEFAULT=10;
+    private static final String VALLEN_CONF="randomkv.val.length";
+    
+    @Override
+    protected void setup(Context context) throws IOException,
         InterruptedException {
-      final int startrow = ((PEInputSplit)split).getIndex() * ROWSPERSPLIT;
-      return new RecordReader<ImmutableBytesWritable, ImmutableBytesWritable>() {
-        // Starts at a particular row
-        private int counter = startrow;
-        private ImmutableBytesWritable key;
-        private ImmutableBytesWritable value;
-        private final Random random = new Random(System.currentTimeMillis());
-
-        public void close() throws IOException {
-          // Nothing to do.
-        }
-
-        public ImmutableBytesWritable getCurrentKey()
-        throws IOException, InterruptedException {
-          return this.key;
-        }
-
-        public ImmutableBytesWritable getCurrentValue()
-        throws IOException, InterruptedException {
-          return this.value;
-        }
-
-        public float getProgress() throws IOException, InterruptedException {
-          return ((float)(ROWSPERSPLIT - this.counter) / (float)this.counter);
-        }
-
-        public void initialize(InputSplit arg0, TaskAttemptContext arg1)
-            throws IOException, InterruptedException {
-          // Nothing to do.
-
-        }
-
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-          if (this.counter - startrow > ROWSPERSPLIT) return false;
-          this.counter++;
-          this.key = new ImmutableBytesWritable(PerformanceEvaluation.format(this.counter));
-          this.value = new ImmutableBytesWritable(PerformanceEvaluation.generateValue(this.random));
-          return true;
-        }
-      };
+      super.setup(context);
+      
+      Configuration conf = context.getConfiguration();
+      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
     }
 
-    public List<InputSplit> getSplits(JobContext context)
-    throws IOException, InterruptedException {
-      int count = context.getConfiguration().getInt("mapred.map.tasks", 1);
-      List<InputSplit> splits = new ArrayList<InputSplit>(count);
-      for (int i = 0; i < count; i++) {
-        splits.add(new PEInputSplit(i));
+    protected void map(
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+               ImmutableBytesWritable,KeyValue>.Context context)
+        throws java.io.IOException ,InterruptedException
+    {
+
+      byte keyBytes[] = new byte[keyLength];
+      byte valBytes[] = new byte[valLength];
+      
+      Random random = new Random(System.currentTimeMillis());
+      for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+        random.nextBytes(keyBytes);
+        random.nextBytes(valBytes);
+        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+        KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
+            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        context.write(key, kv);
       }
-      return splits;
     }
   }
 
-  /**
-   * Simple mapper that makes KeyValue output.
-   */
-  static class PEtoKVMapper extends Mapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value,
-      org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,KeyValue>.Context context)
-    throws java.io.IOException ,InterruptedException {
-      context.write(key, new KeyValue(key.get(), PerformanceEvaluation.FAMILY_NAME,
-        PerformanceEvaluation.QUALIFIER_NAME, value.get()));
-    }
+  @Before
+  public void cleanupDir() throws IOException {
+    util.cleanupTestDir();
+  }
+  
+  
+  private void setupRandomGeneratorMapper(Job job) {
+    job.setInputFormatClass(NMapInputFormat.class);
+    job.setMapperClass(RandomKVGeneratingMapper.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(KeyValue.class);
   }
 
   /**
    * Run small MR job.
    */
+  @Test
   public void testWritingPEData() throws Exception {
+    Configuration conf = util.getConfiguration();
+    Path testDir = HBaseTestingUtility.getTestDir("testWritingPEData");
+    FileSystem fs = testDir.getFileSystem(conf);
+    
     // Set down this value or we OOME in eclipse.
-    this.conf.setInt("io.sort.mb", 20);
+    conf.setInt("io.sort.mb", 20);
     // Write a few files.
-    this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
-    Job job = new Job(this.conf, getName());
-    job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class);
-    job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    job.setMapOutputValueClass(KeyValue.class);
+    conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
+    
+    Job job = new Job(conf, "testWritingPEData");
+    setupRandomGeneratorMapper(job);
     // This partitioner doesn't work well for number keys but using it anyways
     // just to demonstrate how to configure it.
+    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
+    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
+    
+    Arrays.fill(startKey, (byte)0);
+    Arrays.fill(endKey, (byte)0xff);
+    
     job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
     // Set start and end rows for partitioner.
-    job.getConfiguration().set(SimpleTotalOrderPartitioner.START,
-      Bytes.toString(PerformanceEvaluation.format(0)));
-    int rows = this.conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
-    job.getConfiguration().set(SimpleTotalOrderPartitioner.END,
-      Bytes.toString(PerformanceEvaluation.format(rows)));
+    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
+    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
     job.setReducerClass(KeyValueSortReducer.class);
     job.setOutputFormatClass(HFileOutputFormat.class);
-    FileOutputFormat.setOutputPath(job, this.testDir);
+    job.setNumReduceTasks(4);
+    
+    FileOutputFormat.setOutputPath(job, testDir);
     assertTrue(job.waitForCompletion(false));
-    FileStatus [] files = this.fs.listStatus(this.testDir);
+    FileStatus [] files = fs.listStatus(testDir);
     assertTrue(files.length > 0);
   }
-}
\ No newline at end of file
+  
+  @Test
+  public void testJobConfiguration() throws Exception {
+    Job job = new Job();
+    HTable table = Mockito.mock(HTable.class);
+    byte[][] mockKeys = new byte[][] {
+        HConstants.EMPTY_BYTE_ARRAY,
+        Bytes.toBytes("aaa"),
+        Bytes.toBytes("ggg"),
+        Bytes.toBytes("zzz")
+    };
+    Mockito.doReturn(mockKeys).when(table).getStartKeys();
+    
+    HFileOutputFormat.configureIncrementalLoad(job, table);
+    assertEquals(job.getNumReduceTasks(), 4);
+  }
+  
+  private byte [][] generateRandomStartKeys(int numKeys) {
+    Random random = new Random();
+    byte[][] ret = new byte[numKeys][];
+    // first region start key is always empty
+    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
+    for (int i = 1; i < numKeys; i++) {
+      ret[i] = PerformanceEvaluation.generateValue(random);
+    }
+    return ret;
+  }
+  
+  @Test
+  public void testMRIncrementalLoad() throws Exception {
+    doIncrementalLoadTest(false);
+  }
+  
+  @Test
+  public void testMRIncrementalLoadWithSplit() throws Exception {
+    doIncrementalLoadTest(true);
+  }
+  
+  private void doIncrementalLoadTest(
+      boolean shouldChangeRegions) throws Exception {
+    Configuration conf = util.getConfiguration();
+    Path testDir = HBaseTestingUtility.getTestDir("testLocalMRIncrementalLoad");
+    byte[][] startKeys = generateRandomStartKeys(5);
+    
+    try {
+      util.startMiniCluster();
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
+      int numRegions = util.createMultiRegions(
+          util.getConfiguration(), table, FAMILY_NAME,
+          startKeys);
+      assertEquals("Should make 5 regions",
+          numRegions, 5);
+      assertEquals("Should start with empty table",
+          0, util.countRows(table));
+
+      // Generate the bulk load files
+      util.startMiniMapReduceCluster();
+      runIncrementalPELoad(conf, table, testDir);
+      // This doesn't write into the table, just makes files
+      assertEquals("HFOF should not touch actual table",
+          0, util.countRows(table));
+  
+      if (shouldChangeRegions) {
+        LOG.info("Changing regions in table");
+        admin.disableTable(table.getTableName());
+        byte[][] newStartKeys = generateRandomStartKeys(15);
+        util.createMultiRegions(util.getConfiguration(),
+            table, FAMILY_NAME, newStartKeys);
+        admin.enableTable(table.getTableName());
+        while (table.getRegionsInfo().size() != 15 ||
+            !admin.isTableAvailable(table.getTableName())) {
+          Thread.sleep(1000);
+          LOG.info("Waiting for new region assignment to happen");
+        }
+      }
+      
+      // Perform the actual load
+      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+      
+      // Ensure data shows up
+      int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
+      assertEquals("LoadIncrementalHFiles should put expected data in table",
+          expectedRows, util.countRows(table));
+      String tableDigestBefore = util.checksumRows(table);
+            
+      // Cause regions to reopen
+      admin.disableTable(TABLE_NAME);
+      while (table.getRegionsInfo().size() != 0) {
+        Thread.sleep(1000);
+        LOG.info("Waiting for table to disable"); 
+      }
+      admin.enableTable(TABLE_NAME);
+      util.waitTableAvailable(TABLE_NAME, 30000);
+      
+      assertEquals("Data should remain after reopening of regions",
+          tableDigestBefore, util.checksumRows(table));
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      util.shutdownMiniCluster();
+    }
+  }
+  
+  
+  
+  private void runIncrementalPELoad(
+      Configuration conf, HTable table, Path outDir)
+  throws Exception {
+    Job job = new Job(conf, "testLocalMRIncrementalLoad");
+    setupRandomGeneratorMapper(job);
+    HFileOutputFormat.configureIncrementalLoad(job, table);
+    FileOutputFormat.setOutputPath(job, outDir);
+    
+    assertEquals(table.getRegionsInfo().size(),
+        job.getNumReduceTasks());
+    
+    assertTrue(job.waitForCompletion(true));
+  }
+  
+  public static void main(String args[]) throws Exception {
+    new TestHFileOutputFormat().manualTest(args);
+  }
+  
+  public void manualTest(String args[]) throws Exception {
+    Configuration conf = HBaseConfiguration.create();    
+    util = new HBaseTestingUtility(conf);
+    if ("newtable".equals(args[0])) {
+      byte[] tname = args[1].getBytes();
+      HTable table = util.createTable(tname, FAMILY_NAME);
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      admin.disableTable(tname);
+      util.createMultiRegions(conf, table, FAMILY_NAME,
+          generateRandomStartKeys(5));
+      admin.enableTable(tname);
+    } else if ("incremental".equals(args[0])) {
+      byte[] tname = args[1].getBytes();
+      HTable table = new HTable(conf, tname);
+      Path outDir = new Path("incremental-out");
+      runIncrementalPELoad(conf, table, outDir);
+    } else {
+      throw new RuntimeException(
+          "usage: TestHFileOutputFormat newtable | incremental");
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import static org.junit.Assert.*;
+
+public class TestImportTsv {
+  @Test
+  public void testTsvParser() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d");
+    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
+    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
+    assertNull(parser.getFamily(2));
+    assertNull(parser.getQualifier(2));
+    
+    byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
+    ParsedLine parsed = parser.parse(line, line.length);
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+    assertEquals(2, parser.getRowKeyColumnIndex());
+  }
+
+  private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
+    ArrayList<String> parsedCols = new ArrayList<String>();
+    for (int i = 0; i < parsed.getColumnCount(); i++) {
+      parsedCols.add(Bytes.toString(
+          parsed.getLineBytes(),
+          parsed.getColumnOffset(i),
+          parsed.getColumnLength(i)));
+    }
+    if (!Iterables.elementsEqual(parsedCols, expected)) {
+      fail("Expected: " + Joiner.on(",").join(expected) + "\n" + 
+          "Got:" + Joiner.on(",").join(parsedCols));
+    }
+  }
+  
+  private void assertBytesEquals(byte[] a, byte[] b) {
+    assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=950321&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Wed Jun  2 00:40:48 2010
@@ -0,0 +1,188 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test cases for the "load" half of the HFileOutputFormat bulk load
+ * functionality. These tests run faster than the full MR cluster
+ * tests in TestHFileOutputFormat
+ */
+public class TestLoadIncrementalHFiles {
+
+  private static final byte[] TABLE = Bytes.toBytes("mytable");
+  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
+  private static final byte[] FAMILY = Bytes.toBytes("myfam");
+
+  private static final byte[][] SPLIT_KEYS = new byte[][] {
+    Bytes.toBytes("ddd"),
+    Bytes.toBytes("ppp")
+  };
+
+  public static int BLOCKSIZE = 64*1024;
+  public static String COMPRESSION =
+    Compression.Algorithm.NONE.getName();
+
+  private HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Test case that creates some regions and loads
+   * HFiles that fit snugly inside those regions
+   */
+  @Test
+  public void testSimpleLoad() throws Exception {
+    runTest("testSimpleLoad",
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    });
+  }
+
+  /**
+   * Test case that creates some regions and loads
+   * HFiles that cross the boundaries of those regions
+   */
+  @Test
+  public void testRegionCrossingLoad() throws Exception {
+    runTest("testRegionCrossingLoad",
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
+    });
+  }
+
+  private void runTest(String testName, byte[][][] hfileRanges)
+  throws Exception {
+    Path dir = HBaseTestingUtility.getTestDir(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
+          FAMILY, QUALIFIER, from, to, 1000);
+    }
+    int expectedRows = hfileIdx * 1000;
+
+
+    util.startMiniCluster();
+    try {
+      HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+      HTableDescriptor htd = new HTableDescriptor(TABLE);
+      htd.addFamily(new HColumnDescriptor(FAMILY));
+      admin.createTable(htd, SPLIT_KEYS);
+
+      HTable table = new HTable(TABLE);
+      util.waitTableAvailable(TABLE, 30000);
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+          util.getConfiguration());
+      loader.doBulkLoad(dir, table);
+
+      assertEquals(expectedRows, util.countRows(table));
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testSplitStoreFile() throws IOException {
+    Path dir = HBaseTestingUtility.getTestDir("testSplitHFile");
+    FileSystem fs = util.getTestFileSystem();
+    Path testIn = new Path(dir, "testhfile");
+    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
+    createHFile(fs, testIn, FAMILY, QUALIFIER,
+        Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    LoadIncrementalHFiles.splitStoreFile(
+        util.getConfiguration(), testIn,
+        familyDesc, Bytes.toBytes("ggg"),
+        bottomOut,
+        topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  private int verifyHFile(Path p) throws IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader = new HFile.Reader(
+        p.getFileSystem(conf), p, null, false);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    assertTrue(count > 0);
+    return count;
+  }
+
+
+  /**
+   * Create an HFile with the given number of rows between a given
+   * start key and end key.
+   * TODO put me in an HFileTestUtil or something?
+   */
+  static void createHFile(
+      FileSystem fs, Path path,
+      byte[] family, byte[] qualifier,
+      byte[] startKey, byte[] endKey, int numRows) throws IOException
+  {
+    HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
+        KeyValue.KEY_COMPARATOR);
+    try {
+      // subtract 2 since iterateOnSplits doesn't include boundary keys
+      for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
+        KeyValue kv = new KeyValue(key, family, qualifier, key);
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Wed Jun  2 00:40:48 2010
@@ -207,8 +207,7 @@ public class TestCompaction extends HBas
     // they were deleted.
     int count = 0;
     boolean containsStartRow = false;
-    for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles().
-        values()) {
+    for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
       HFileScanner scanner = f.getReader().getScanner(false, false);
       scanner.seekTo();
       do {
@@ -239,7 +238,7 @@ public class TestCompaction extends HBas
   private int count() throws IOException {
     int count = 0;
     for (StoreFile f: this.r.stores.
-        get(COLUMN_FAMILY_TEXT).getStorefiles().values()) {
+        get(COLUMN_FAMILY_TEXT).getStorefiles()) {
       HFileScanner scanner = f.getReader().getScanner(false, false);
       if (!scanner.seekTo()) {
         continue;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java?rev=950321&r1=950320&r2=950321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java Wed Jun  2 00:40:48 2010
@@ -19,12 +19,16 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MD5Hash;
 
-public class TestHRegionInfo extends HBaseTestCase {
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestHRegionInfo {
+  @Test
   public void testCreateHRegionInfoName() throws Exception {
     String tableName = "tablename";
     final byte [] tn = Bytes.toBytes(tableName);
@@ -47,4 +51,32 @@ public class TestHRegionInfo extends HBa
                  + id + "." + md5HashInHex + ".",
                  nameStr);
   }
+  
+  @Test
+  public void testContainsRange() {
+    HTableDescriptor tableDesc = new HTableDescriptor("testtable");
+    HRegionInfo hri = new HRegionInfo(
+        tableDesc, Bytes.toBytes("a"), Bytes.toBytes("g"));
+    // Single row range at start of region
+    assertTrue(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("a")));
+    // Fully contained range
+    assertTrue(hri.containsRange(Bytes.toBytes("b"), Bytes.toBytes("c")));
+    // Range overlapping start of region
+    assertTrue(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("c")));
+    // Fully contained single-row range
+    assertTrue(hri.containsRange(Bytes.toBytes("c"), Bytes.toBytes("c")));
+    // Range that overlaps end key and hence doesn't fit
+    assertFalse(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("g")));
+    // Single row range on end key
+    assertFalse(hri.containsRange(Bytes.toBytes("g"), Bytes.toBytes("g")));
+    // Single row range entirely outside
+    assertFalse(hri.containsRange(Bytes.toBytes("z"), Bytes.toBytes("z")));
+    
+    // Degenerate range
+    try {
+      hri.containsRange(Bytes.toBytes("z"), Bytes.toBytes("a"));
+      fail("Invalid range did not throw IAE");
+    } catch (IllegalArgumentException iae) {
+    }
+  }
 }



Mime
View raw message