hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1202368 - in /hbase/branches/0.90: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/had...
Date Tue, 15 Nov 2011 18:39:26 GMT
Author: tedyu
Date: Tue Nov 15 18:39:25 2011
New Revision: 1202368

URL: http://svn.apache.org/viewvc?rev=1202368&view=rev
Log:
HBASE-4718  Backport HBASE-4552 (multi-family bulk load) to 0.90 branch (Jonathan Hsieh)

Added:
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.90/src/main/resources/hbase-default.xml
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
    hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Tue Nov 15 18:39:25 2011
@@ -90,6 +90,7 @@ Release 0.90.5 - Unreleased
    HBASE-4695  WAL logs get deleted before region server can fully flush
               (gaojinchao)
    HBASE-4684  REST server is leaking ZK connections
+   HBASE-4718  Backport HBASE-4552 (multi-family bulk load) to 0.90 branch (Jonathan Hsieh)
 
   IMPROVEMENT
    HBASE-4205  Enhance HTable javadoc (Eric Charles)

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Tue Nov 15 18:39:25 2011
@@ -363,6 +363,14 @@ public class HRegionInfo extends Version
   }
 
   /**
+   * Gets the table name.
+   * @return Table name.
+   */
+  public byte[] getTableName() {
+    return getTableName(regionName);
+  }
+
+  /**
    * Separate elements of a regionName.
    * @param regionName
    * @return Array of byte[] containing tableName, startKey and id

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Nov 15 18:39:25 2011
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.hbase.io;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -203,6 +209,9 @@ public class HbaseObjectWritable impleme
     addToMap(Increment.class, code++);
 
     addToMap(KeyOnlyFilter.class, code++);
+    
+    // serializable
+    addToMap(Serializable.class, code++);
 
   }
 
@@ -307,6 +316,9 @@ public class HbaseObjectWritable impleme
       else if (Writable.class.isAssignableFrom(c)) {
         code = CLASS_TO_CODE.get(Writable.class);
       }
+      else if (Serializable.class.isAssignableFrom(c)){
+        code = CLASS_TO_CODE.get(Serializable.class);
+      }
     }
     if (code == null) {
       LOG.error("Unsupported type " + c);
@@ -423,6 +435,28 @@ public class HbaseObjectWritable impleme
         writeClassCode(out, c);
       }
       ((Writable)instanceObj).write(out);
+    } else if (Serializable.class.isAssignableFrom(declClass)) {
+      Class <?> c = instanceObj.getClass();
+      Byte code = CLASS_TO_CODE.get(c);
+      if (code == null) {
+        out.writeByte(NOT_ENCODED);
+        Text.writeString(out, c.getName());
+      } else {
+        writeClassCode(out, c);
+      }
+      ByteArrayOutputStream bos = null;
+      ObjectOutputStream oos = null;
+      try{
+        bos = new ByteArrayOutputStream();
+        oos = new ObjectOutputStream(bos);
+        oos.writeObject(instanceObj);
+        byte[] value = bos.toByteArray();
+        out.writeInt(value.length);
+        out.write(value);
+      } finally {
+        if(bos!=null) bos.close();
+        if(oos!=null) oos.close();
+      }
     } else {
       throw new IOException("Can't write: "+instanceObj+" as "+declClass);
     }
@@ -502,7 +536,7 @@ public class HbaseObjectWritable impleme
     } else if (declaredClass.isEnum()) {         // enum
       instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
         Text.readString(in));
-    } else {                                      // Writable
+    } else {                                      // Writable or Serializable
       Class instanceClass = null;
       Byte b = in.readByte();
       if (b.byteValue() == NOT_ENCODED) {
@@ -516,17 +550,36 @@ public class HbaseObjectWritable impleme
       } else {
         instanceClass = CODE_TO_CLASS.get(b);
       }
-      Writable writable = WritableFactories.newInstance(instanceClass, conf);
-      try {
-        writable.readFields(in);
-      } catch (Exception e) {
-        LOG.error("Error in readFields", e);
-        throw new IOException("Error in readFields" , e);
-      }
-      instance = writable;
-      if (instanceClass == NullInstance.class) {  // null
-        declaredClass = ((NullInstance)instance).declaredClass;
-        instance = null;
+      if(Writable.class.isAssignableFrom(instanceClass)){
+        Writable writable = WritableFactories.newInstance(instanceClass, conf);
+        try {
+          writable.readFields(in);
+        } catch (Exception e) {
+          LOG.error("Error in readFields", e);
+          throw new IOException("Error in readFields" , e);
+        }
+        instance = writable;
+        if (instanceClass == NullInstance.class) {  // null
+          declaredClass = ((NullInstance)instance).declaredClass;
+          instance = null;
+        }
+      } else {
+        int length = in.readInt();
+        byte[] objectBytes = new byte[length];
+        in.readFully(objectBytes);
+        ByteArrayInputStream bis = null;
+        ObjectInputStream ois = null;
+        try { 
+          bis = new ByteArrayInputStream(objectBytes);
+          ois = new ObjectInputStream(bis);
+          instance = ois.readObject();
+        } catch (ClassNotFoundException e) {
+          LOG.error("Error in readFields", e);
+          throw new IOException("Error in readFields", e);
+        } finally {
+          if(bis!=null) bis.close();
+          if(ois!=null) ois.close();
+        }
       }
     }
     if (objectWritable != null) {                 // store values

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Nov 15 18:39:25 2011
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.util.List;
-import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -304,8 +304,9 @@ public interface HRegionInterface extend
   /**
    * Bulk load an HFile into an open region
    */
+  @Deprecated
   public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName)
-  throws IOException;
+      throws IOException;
 
   // Master methods
 
@@ -419,4 +420,17 @@ public interface HRegionInterface extend
    * @throws IOException
    */
   public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
+  /**
+   * Atomically bulk load multiple HFiles (say from different column families)
+   * into an open region.
+   * 
+   * @param familyPaths List of (family, hfile path) pairs
+   * @param regionName name of region to load hfiles into
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if fails unrecoverably
+   */
+  /* TODO: Move into place above master operations after deprecation cycle */
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[] regionName)
+  throws IOException;
 }

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Tue Nov 15 18:39:25 2011
@@ -22,8 +22,24 @@ package org.apache.hadoop.hbase.mapreduc
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Deque;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +51,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -44,22 +60,29 @@ import org.apache.hadoop.hbase.client.Se
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
  * @see #usage()
  */
 public class LoadIncrementalHFiles extends Configured implements Tool {
 
-  static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  static AtomicLong regionCount = new AtomicLong(0);
 
   public static String NAME = "completebulkload";
 
@@ -86,7 +109,7 @@ public class LoadIncrementalHFiles exten
    * region boundary, and each part is added back into the queue.
    * The import process finishes when the queue is empty.
    */
-  private static class LoadQueueItem {
+  static class LoadQueueItem {
     final byte[] family;
     final Path hfilePath;
 
@@ -94,13 +117,17 @@ public class LoadIncrementalHFiles exten
       this.family = family;
       this.hfilePath = hfilePath;
     }
+
+    public String toString() {
+      return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
+    }
   }
 
   /**
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
-  private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
+  private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
   throws IOException {
     FileSystem fs = hfofDir.getFileSystem(getConf());
 
@@ -114,7 +141,6 @@ public class LoadIncrementalHFiles exten
       throw new FileNotFoundException("No families found in " + hfofDir);
     }
 
-    Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
     for (FileStatus stat : familyDirStatuses) {
       if (!stat.isDir()) {
         LOG.warn("Skipping non-directory " + stat.getPath());
@@ -130,21 +156,21 @@ public class LoadIncrementalHFiles exten
         ret.add(new LoadQueueItem(family, hfile));
       }
     }
-    return ret;
   }
 
   /**
    * Perform a bulk load of the given directory into the given
-   * pre-existing table.
+   * pre-existing table.  This method is not threadsafe.
+   * 
    * @param hfofDir the directory that was provided as the output path
    * of a job using HFileOutputFormat
    * @param table the table to load into
    * @throws TableNotFoundException if table does not yet exist
    */
-  public void doBulkLoad(Path hfofDir, HTable table)
+  public void doBulkLoad(Path hfofDir, final HTable table)
     throws TableNotFoundException, IOException
   {
-    HConnection conn = table.getConnection();
+    final HConnection conn = table.getConnection();
 
     if (!conn.isTableAvailable(table.getTableName())) {
       throw new TableNotFoundException("Table " +
@@ -152,13 +178,57 @@ public class LoadIncrementalHFiles exten
           "is not currently available.");
     }
 
-    Deque<LoadQueueItem> queue = null;
+    // initialize thread pools
+    int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
+        Runtime.getRuntime().availableProcessors());
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        builder.build());
+    ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
     try {
-      queue = discoverLoadQueue(hfofDir);
+      discoverLoadQueue(queue, hfofDir);
+      int count = 0;
+
+      if (queue.isEmpty()) {
+        LOG.warn("Bulk load operation did not find any files to load in " +
+        "directory " + hfofDir.toUri() + ".  Does it contain files in " +
+        "subdirectories that correspond to column family names?");
+      }
+
+      // Assumes that region splits can happen while this occurs.
       while (!queue.isEmpty()) {
-        LoadQueueItem item = queue.remove();
-        tryLoad(item, conn, table.getTableName(), queue);
+        // need to reload split keys each iteration.
+        final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
+        if (count != 0) {
+          LOG.info("Split occured while grouping HFiles, retry attempt " +
+              + count + " with " + queue.size() + " files remaining to group or split");
+        }
+
+        int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
+        if (maxRetries != 0 && count >= maxRetries) {
+          LOG.error("Retry attempted " + count +  " times without completing, bailing out");
+          return;
+        }
+        count++;
+
+        // Using ByteBuffer for byte[] equality semantics
+        Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
+            pool, queue, startEndKeys);
+
+        bulkLoadPhase(table, conn, pool, queue, regionGroups);
+
+        // NOTE: The next iteration's split / group could happen in parallel to
+        // atomic bulkloads assuming that there are splits and no merges, and
+        // that we can atomically pull out the groups we want to retry.
       }
+
     } finally {
       if (queue != null && !queue.isEmpty()) {
         StringBuilder err = new StringBuilder();
@@ -174,15 +244,148 @@ public class LoadIncrementalHFiles exten
   }
 
   /**
-   * Attempt to load the given load queue item into its target region server.
+   * This takes the LQI's grouped by likely regions and attempts to bulk load
+   * them.  Any failures are re-queued for another pass with the
+   * groupOrSplitPhase.
+   */
+  protected void bulkLoadPhase(final HTable table, final HConnection conn,
+      ExecutorService pool, Deque<LoadQueueItem> queue,
+      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+    // atomically bulk load the groups.
+    Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
+      final byte[] first = e.getKey().array();
+      final Collection<LoadQueueItem> lqis =  e.getValue();
+
+      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+        public List<LoadQueueItem> call() throws Exception {
+          List<LoadQueueItem> toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis);
+          return toRetry;
+        }
+      };
+      loadingFutures.add(pool.submit(call));
+    }
+
+    // get all the results.
+    for (Future<List<LoadQueueItem>> future : loadingFutures) {
+      try {
+        List<LoadQueueItem> toRetry = future.get();
+
+        // LQIs that are requeued to be regrouped.
+        queue.addAll(toRetry);
+
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          // At this point something unrecoverable has happened.
+          // TODO Implement bulk load recovery
+          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
+        }
+        LOG.error("Unexpected execution exception during bulk load", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during bulk load", e1);
+        throw new IllegalStateException(e1);
+      }
+    }
+  }
+
+  /**
+   * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
+   * bulk load region targets.
+   */
+  private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
+      ExecutorService pool, Deque<LoadQueueItem> queue,
+      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    // <region start key, LQI> need synchronized only within this scope of this
+    // phase because of the puts that happen in futures.
+    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+
+    // drain LQIs and figure out bulk load groups
+    Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
+    while (!queue.isEmpty()) {
+      final LoadQueueItem item = queue.remove();
+      
+      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+        public List<LoadQueueItem> call() throws Exception {
+          List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
+          return splits;
+        }
+      };
+      splittingFutures.add(pool.submit(call));
+    }
+    // get all the results.  All grouping and splitting must finish before
+    // we can attempt the atomic loads.
+    for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
+      try {
+        List<LoadQueueItem> splits = lqis.get();
+        if (splits != null) {
+          queue.addAll(splits);
+        }
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          LOG.error("IOException during splitting", e1);
+          throw (IOException)t; // would have been thrown if not parallelized,
+        }
+        LOG.error("Unexpected execution exception during splitting", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during splitting", e1);
+        throw new IllegalStateException(e1);
+      }
+    }
+    return regionGroups;
+  }
+
+  // unique file name for the table
+  String getUniqueName(byte[] tableName) {
+    String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
+    return name;
+  }
+
+  protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
+      final HTable table, byte[] startKey,
+      byte[] splitKey) throws IOException {
+    final Path hfilePath = item.hfilePath;
+
+    // We use a '_' prefix which is ignored when walking directory trees
+    // above.
+    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+    "region. Splitting...");
+
+    String uniqueName = getUniqueName(table.getTableName());
+    HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+    Path topOut = new Path(tmpDir, uniqueName + ".top");
+    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
+        botOut, topOut);
+
+    // Add these back at the *front* of the queue, so there's a lower
+    // chance that the region will just split again before we get there.
+    List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
+    lqis.add(new LoadQueueItem(item.family, botOut));
+    lqis.add(new LoadQueueItem(item.family, topOut));
+
+    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+    return lqis;
+  }
+
+  /**
+   * Attempt to assign the given load queue item into its target region group.
    * If the hfile boundary no longer fits into a region, physically splits
-   * the hfile such that the new bottom half will fit, and adds the two
-   * resultant hfiles back into the load queue.
+   * the hfile such that the new bottom half will fit and returns the list of
+   * LQI's corresponding to the resultant hfiles.
+   *
+   * protected for testing
    */
-  private void tryLoad(final LoadQueueItem item,
-      HConnection conn, final byte[] table,
-      final Deque<LoadQueueItem> queue)
-  throws IOException {
+  protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+      final LoadQueueItem item, final HTable table,
+      final Pair<byte[][], byte[][]> startEndKeys)
+      throws IOException {
     final Path hfilePath = item.hfilePath;
     final FileSystem fs = hfilePath.getFileSystem(getConf());
     HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
@@ -200,44 +403,87 @@ public class LoadIncrementalHFiles exten
         " last="  + Bytes.toStringBinary(last));
     if (first == null || last == null) {
       assert first == null && last == null;
+      // TODO what if this is due to a bad HFile?
       LOG.info("hfile " + hfilePath + " has no entries, skipping");
-      return;
+      return null;
+    }
+    if (Bytes.compareTo(first, last) > 0) {
+      throw new IllegalArgumentException(
+      "Invalid range: " + Bytes.toStringBinary(first) +
+      " > " + Bytes.toStringBinary(last));
+    }
+    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
+        Bytes.BYTES_COMPARATOR);
+    if (idx < 0) {
+      // not on boundary, returns -(insertion index).  Calculate region it
+      // would be in.
+      idx = -(idx + 1) - 1;
+    }
+    final int indexForCallable = idx;
+    boolean lastKeyInRange =
+      Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
+      Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
+    if (!lastKeyInRange) {
+      List<LoadQueueItem> lqis = splitStoreFile(item, table,
+          startEndKeys.getFirst()[indexForCallable],
+          startEndKeys.getSecond()[indexForCallable]);
+      return lqis;
     }
 
-    // We use a '_' prefix which is ignored when walking directory trees
-    // above.
-    final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+    // group regions.
+    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+    return null;
+  }
 
-    conn.getRegionServerWithRetries(
-      new ServerCallable<Void>(conn, table, first) {
-        @Override
-        public Void call() throws Exception {
-          LOG.debug("Going to connect to server " + location +
-              "for row " + Bytes.toStringBinary(row));
-          HRegionInfo hri = location.getRegionInfo();
-          if (!hri.containsRange(first, last)) {
-            LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
-                "region. Splitting...");
-
-            HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
-            Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
-            Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
-            splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
-                botOut, topOut);
-
-            // Add these back at the *front* of the queue, so there's a lower
-            // chance that the region will just split again before we get there.
-            queue.addFirst(new LoadQueueItem(item.family, botOut));
-            queue.addFirst(new LoadQueueItem(item.family, topOut));
-            LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
-            return null;
-          }
-
-          byte[] regionName = location.getRegionInfo().getRegionName();
-          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
-          return null;
-        }
-      });
+  /**
+   * Attempts to do an atomic load of many hfiles into a region.  If it fails,
+   * it returns a list of hfiles that need to be retried.  If it is successful
+   * it will return an empty list.
+   * 
+   * NOTE: To maintain row atomicity guarantees, region server callable should
+   * succeed atomically and fails atomically.
+   * 
+   * Protected for testing.
+   * 
+   * @return empty list if success, list of items to retry on recoverable
+   * failure
+   */
+  protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+      byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException {
+
+    final List<Pair<byte[], String>> famPaths =
+      new ArrayList<Pair<byte[], String>>(lqis.size());
+    for (LoadQueueItem lqi : lqis) {
+      famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+    }
+
+    final ServerCallable<Boolean> svrCallable = new ServerCallable<Boolean>(conn,
+        tableName, first) {
+      @Override
+      public Boolean call() throws Exception {
+        LOG.debug("Going to connect to server " + location + " for row "
+            + Bytes.toStringBinary(row));
+        byte[] regionName = location.getRegionInfo().getRegionName();
+        return server.bulkLoadHFiles(famPaths, regionName);
+      }
+    };
+
+    try {
+      List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
+      boolean success = conn.getRegionServerWithRetries(svrCallable);
+      if (!success) {
+        LOG.warn("Attempt to bulk load region containing "
+            + Bytes.toStringBinary(first) + " into table "
+            + Bytes.toStringBinary(tableName)  + " with files " + lqis
+            + " failed.  This is recoverable and they will be retried.");
+        toRetry.addAll(lqis); // return lqi's to retry
+      }
+      // success
+      return toRetry;
+    } catch (IOException e) {
+      LOG.error("Encountered unrecoverable error from region server", e);
+      throw e;
+    }
   }
 
   /**
@@ -320,7 +566,8 @@ public class LoadIncrementalHFiles exten
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+    int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+    System.exit(ret);
   }
 
 }

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Nov 15 18:39:25 2011
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -2212,24 +2213,113 @@ public class HRegion implements HeapSize
     }
     return lid;
   }
+  
+  /**
+   * Determines whether multiple column families are present
+   * Precondition: familyPaths is not null
+   *
+   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   */
+  private static boolean hasMultipleColumnFamilies(
+      List<Pair<byte[], String>> familyPaths) {
+    boolean multipleFamilies = false;
+    byte[] family = null;
+    for (Pair<byte[], String> pair : familyPaths) {
+      byte[] fam = pair.getFirst();
+      if (family == null) {
+        family = fam;
+      } else if (!Bytes.equals(family, fam)) {
+        multipleFamilies = true;
+        break;
+      }
+    }
+    return multipleFamilies;
+  }
 
-  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  /**
+   * Attempts to atomically load a group of hfiles.  This is critical for loading
+   * rows with multiple column families atomically.
+   *
+   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if failed unrecoverably.
+   */
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
   throws IOException {
-    startRegionOperation();
+    // we need writeLock for multi-family bulk load
+    startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
     try {
-      Store store = getStore(familyName);
-      if (store == null) {
-        throw new DoNotRetryIOException(
-            "No such column family " + Bytes.toStringBinary(familyName));
+      // There possibly was a split that happend between when the split keys
+      // were gathered and before the HReiogn's write lock was taken.  We need
+      // to validate the HFile region before attempting to bulk load all of them
+      List<IOException> ioes = new ArrayList<IOException>();
+      List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
+      for (Pair<byte[], String> p : familyPaths) {
+        byte[] familyName = p.getFirst();
+        String path = p.getSecond();
+
+        Store store = getStore(familyName);
+        if (store == null) {
+          IOException ioe = new DoNotRetryIOException(
+              "No such column family " + Bytes.toStringBinary(familyName));
+          ioes.add(ioe);
+          failures.add(p);
+        }
+
+        try {
+          store.assertBulkLoadHFileOk(new Path(path));
+        } catch (WrongRegionException wre) {
+          // recoverable (file doesn't fit in region)
+          failures.add(p);
+        } catch (IOException ioe) {
+          // unrecoverable (hdfs problem)
+          ioes.add(ioe);
+        }
+      }
+
+
+      // validation failed, bail out before doing anything permanent.
+      if (failures.size() != 0) {
+        StringBuilder list = new StringBuilder();
+        for (Pair<byte[], String> p : failures) {
+          list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
+            .append(p.getSecond());
+        }
+        // problem when validating
+        LOG.warn("There was a recoverable bulk load failure likely due to a" +
+            " split.  These (family, HFile) pairs were not loaded: " + list);
+        return false;
       }
-      store.bulkLoadHFile(hfilePath);
+
+      // validation failed because of some sort of IO problem.
+      if (ioes.size() != 0) {
+        LOG.error("There were IO errors when checking if bulk load is ok.  " +
+            "throwing exception!");
+        throw MultipleIOException.createIOException(ioes);
+      }
+
+      for (Pair<byte[], String> p : familyPaths) {
+        byte[] familyName = p.getFirst();
+        String path = p.getSecond();
+        Store store = getStore(familyName);
+        try {
+          store.bulkLoadHFile(path);
+        } catch (IOException ioe) {
+          // a failure here causes an atomicity violation that we currently 
+          // cannot recover from since it is likely a failed hdfs operation.
+
+          // TODO Need a better story for reverting partial failures due to HDFS.
+          LOG.error("There was a partial failure due to IO when attempting to" +
+              " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond());
+          throw ioe;
+        }
+      }
+      return true;
     } finally {
-      closeRegionOperation();
+      closeBulkRegionOperation();
     }
-
   }
 
-
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof HRegion)) {
@@ -3393,6 +3483,38 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * This method needs to be called before any public call that reads or
+   * modifies stores in bulk. It has to be called just before a try.
+   * #closeBulkRegionOperation needs to be called in the try's finally block
+   * Acquires a writelock and checks if the region is closing or closed.
+   * @throws NotServingRegionException when the region is closing or closed
+   */
+  private void startBulkRegionOperation(boolean writeLockNeeded)
+  throws NotServingRegionException {
+    if (this.closing.get()) {
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closing");
+    }
+    if (writeLockNeeded) lock.writeLock().lock();
+    else lock.readLock().lock();
+    if (this.closed.get()) {
+      if (writeLockNeeded) lock.writeLock().unlock();
+      else lock.readLock().unlock();
+      throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closed");
+    }
+  }
+
+  /**
+   * Closes the lock. This needs to be called in the finally block corresponding
+   * to the try block of #startRegionOperation
+   */
+  private void closeBulkRegionOperation(){
+    if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
+    else lock.readLock().unlock();
+  }
+
+  /**
    * A mocked list implementaion - discards all updates.
    */
   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 15 18:39:25 2011
@@ -2076,11 +2076,26 @@ public class HRegionServer implements HR
   }
 
   @Override
-  public void bulkLoadHFile(String hfilePath, byte[] regionName,
-      byte[] familyName) throws IOException {
+  public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName)
+      throws IOException {
+    checkOpen();
+    HRegion region = getRegion(regionName);
+    List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
+    familyPaths.add(new Pair<byte[], String>(familyName, hfilePath));
+    region.bulkLoadHFiles(familyPaths);
+  }
+
+  /**
+   * Atomically bulk load several HFiles into an open region
+   * @return true if successful, false is failed but recoverably (no action)
+   * @throws IOException if failed unrecoverably
+   */
+  @Override
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+      byte[] regionName) throws IOException {
     checkOpen();
     HRegion region = getRegion(regionName);
-    region.bulkLoadHFile(hfilePath, familyName);
+    return region.bulkLoadHFiles(familyPaths);
   }
 
   Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Nov 15 18:39:25 2011
@@ -319,9 +319,12 @@ public class Store implements HeapSize {
     return this.storefiles;
   }
 
-  public void bulkLoadHFile(String srcPathStr) throws IOException {
-    Path srcPath = new Path(srcPathStr);
-
+  /**
+   * This throws a WrongRegionException if the bulkHFile does not fit in this
+   * region.
+   *
+   */
+  void assertBulkLoadHFileOk(Path srcPath) throws IOException {
     HFile.Reader reader  = null;
     try {
       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
@@ -345,12 +348,21 @@ public class Store implements HeapSize {
       HRegionInfo hri = region.getRegionInfo();
       if (!hri.containsRange(firstKey, lastKey)) {
         throw new WrongRegionException(
-            "Bulk load file " + srcPathStr + " does not fit inside region "
+            "Bulk load file " + srcPath.toString() + " does not fit inside region "
             + this.region);
       }
     } finally {
       if (reader != null) reader.close();
     }
+  }
+
+  /**
+   * This method should only be called from HRegion.  It is assumed that the 
+   * ranges of values in the HFile fit within the stores assigned region. 
+   * (assertBulkLoadHFileOk checks this)
+   */
+  void bulkLoadHFile(String srcPathStr) throws IOException {
+    Path srcPath = new Path(srcPathStr);
 
     // Move the file if it's on another filesystem
     FileSystem srcFs = srcPath.getFileSystem(conf);

Modified: hbase/branches/0.90/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/resources/hbase-default.xml?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.90/src/main/resources/hbase-default.xml Tue Nov 15 18:39:25 2011
@@ -139,6 +139,14 @@
     server, getting a cell's value, starting a row update, etc.
     Default: 10.
     </description>
+  </property> 
+  <property>
+    <name>hbase.bulkload.retries.number</name>
+    <value>0</value>
+    <description>Maximum retries.  This is maximum number of iterations
+    to atomic bulk loads are attempted in the face of splitting operations
+    0 means never give up.  Default: 0.
+    </description>
   </property>
   <property>
     <name>hbase.client.scanner.caching</name>

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java Tue Nov 15 18:39:25 2011
@@ -63,7 +63,7 @@ public class TestHbaseObjectWritable ext
     // Do unsupported type.
     boolean exception = false;
     try {
-      doType(conf, new File("a"), File.class);
+      doType(conf, new Object(), Object.class);
     } catch (UnsupportedOperationException uoe) {
       exception = true;
     }
@@ -121,6 +121,17 @@ public class TestHbaseObjectWritable ext
     assertTrue(child instanceof CustomFilter);
     assertEquals("mykey", ((CustomFilter)child).getKey());
   }
+  
+  public void testCustomSerializable() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    // test proper serialization of un-encoded serialized java objects
+    CustomSerializable custom = new CustomSerializable("test phrase");
+    Object obj = doType(conf, custom, CustomSerializable.class);
+    assertTrue(obj instanceof Serializable);
+    assertTrue(obj instanceof CustomSerializable);
+    assertEquals("test phrase", ((CustomSerializable)obj).getValue());
+  }
 
   private Object doType(final Configuration conf, final Object value,
       final Class<?> clazz)
@@ -136,6 +147,27 @@ public class TestHbaseObjectWritable ext
     dis.close();
     return product;
   }
+  
+  public static class CustomSerializable implements Serializable {
+    private static final long serialVersionUID = 1048445561865740632L;
+    private String value = null;
+    
+    public CustomSerializable() {
+    }
+    
+    public CustomSerializable(String value) {
+      this.value = value;
+    }
+    
+    public String getValue() {
+      return value;
+    }
+    
+    public void setValue(String value) {
+      this.value = value;
+    }
+    
+  }
 
   public static class CustomWritable implements Writable {
     private String value = null;
@@ -180,4 +212,4 @@ public class TestHbaseObjectWritable ext
       this.key = Text.readString(in);
     }
   }
-}
\ No newline at end of file
+}

Added: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java?rev=1202368&view=auto
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (added)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java Tue Nov 15 18:39:25 2011
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+public class TestLoadIncrementalHFilesSplitRecovery {
+  final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+  private static HBaseTestingUtility util;
+
+  final static int NUM_CFS = 10;
+  final static byte[] QUAL = Bytes.toBytes("qual");
+  final static int ROWCOUNT = 100;
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  static byte[] value(int i) {
+    return Bytes.toBytes(String.format("%010d", i));
+  }
+
+  public static void buildHFiles(FileSystem fs, Path dir, int value)
+      throws IOException {
+    byte[] val = value(value);
+    for (int i = 0; i < NUM_CFS; i++) {
+      Path testIn = new Path(dir, family(i));
+
+      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+          Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+    }
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column
+   * families if the table does not already exist.
+   */
+  private void setupTable(String table, int cfs) throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      HTableDescriptor htd = new HTableDescriptor(table);
+      for (int i = 0; i < 10; i++) {
+        htd.addFamily(new HColumnDescriptor(family(i)));
+      }
+
+      HBaseAdmin admin = util.getHBaseAdmin();
+      admin.createTable(htd);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  private Path buildBulkFiles(String table, int value) throws Exception {
+    Path dir = HBaseTestingUtility.getTestDir(table);
+    Path bulk1 = new Path(dir, table+value);
+    FileSystem fs = util.getTestFileSystem();
+    buildHFiles(fs, bulk1, value);
+    return bulk1;
+  }
+
+  /**
+   * Populate table with known values.
+   */
+  private void populateTable(String table, int value) throws Exception {
+    // create HFiles for different column families
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration());
+    Path bulk1 = buildBulkFiles(table, value);
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(bulk1, t);
+  }
+
+  /**
+   * Split the known table in half.  (this is hard coded for this test suite)
+   */
+  private void forceSplit(String table) {
+    try {
+      // need to call regions server to by synchronous but isn't visible.
+      HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
+          .toBytes(table));
+
+      for (HRegionInfo hri : hrs.getOnlineRegions()) {
+        if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+          // splitRegion doesn't work if startkey/endkey are null
+          hrs.splitRegion(hri, rowkey(ROWCOUNT / 2)); // hard code split
+        }
+      }
+
+      // verify that split completed.
+      int regions;
+      do {
+        regions = 0;
+        for (HRegionInfo hri : hrs.getOnlineRegions()) {
+          if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
+            regions++;
+          }
+        }
+        if (regions != 2) {
+          LOG.info("Taking some time to complete split...");
+          Thread.sleep(250);
+        }
+      } while (regions != 2);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    util.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  /**
+   * Checks that all columns have the expected value and that there is the
+   * expected number of rows.
+   */
+  void assertExpectedTable(String table, int count, int value) {
+    try {
+      HTable t = new HTable(util.getConfiguration(), table);
+      Scan s = new Scan();
+      ResultScanner sr = t.getScanner(s);
+      int i = 0;
+      for (Result r : sr) {
+        i++;
+        for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
+          for (byte[] val : nm.values()) {
+            assertTrue(Bytes.equals(val, value(value)));
+          }
+        }
+      }
+      assertEquals(count, i);
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+
+  /**
+   * Test that shows that exception thrown from the RS side will result in an
+   * exception on the LIHFile client.
+   */
+  @Test(expected=IOException.class)
+  public void testBulkLoadPhaseFailure() throws Exception {
+    String table = "bulkLoadPhaseFailure";
+    setupTable(table, 10);
+
+    final AtomicInteger attmptedCalls = new AtomicInteger();
+    final AtomicInteger failedCalls = new AtomicInteger();
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+
+      protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
+          byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException {
+        int i = attmptedCalls.incrementAndGet();
+        if (i == 1) {
+          HConnection errConn = mock(HConnection.class);
+          try {
+            doThrow(new IOException("injecting bulk load error")).when(errConn)
+                .getRegionServerWithRetries((ServerCallable) anyObject());
+          } catch (Exception e) {
+            LOG.fatal("mocking cruft, should never happen", e);
+            throw new RuntimeException("mocking cruft, should never happen");
+          }
+          failedCalls.incrementAndGet();
+          return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
+        }
+
+        return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+      }
+    };
+
+    // create HFiles for different column families
+    Path dir = buildBulkFiles(table, 1);
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(dir, t);
+
+    fail("doBulkLoad should have thrown an exception");
+  }
+
+  /**
+   * This test exercises the path where there is a split after initial
+   * validation but before the atomic bulk load call. We cannot use presplitting
+   * to test this path, so we actually inject a split just before the atomic
+   * region load.
+   */
+  @Test
+  public void testSplitWhileBulkLoadPhase() throws Exception {
+    final String table = "splitWhileBulkloadPhase";
+    setupTable(table, 10);
+    populateTable(table,1);
+    assertExpectedTable(table, ROWCOUNT, 1);
+
+    // Now let's cause trouble.  This will occur after checks and cause bulk
+    // files to fail when attempt to atomically import.  This is recoverable.
+    final AtomicInteger attemptedCalls = new AtomicInteger();
+    LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+
+      protected void bulkLoadPhase(final HTable htable, final HConnection conn,
+          ExecutorService pool, Deque<LoadQueueItem> queue,
+          final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+        int i = attemptedCalls.incrementAndGet();
+        if (i == 1) {
+          // On first attempt force a split.
+          forceSplit(table);
+        }
+
+        super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
+      }
+    };
+
+    // create HFiles for different column families
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    Path bulk = buildBulkFiles(table, 2);
+    lih2.doBulkLoad(bulk, t);
+
+    // check that data was loaded
+    // The three expected attempts are 1) failure because need to split, 2)
+    // load of split top 3) load of split bottom
+    assertEquals(attemptedCalls.get(), 3);
+    assertExpectedTable(table, ROWCOUNT, 2);
+  }
+
+  /**
+   * This test splits a table and attempts to bulk load.  The bulk import files
+   * should be split before atomically importing.
+   */
+  @Test
+  public void testGroupOrSplitPresplit() throws Exception {
+    final String table = "groupOrSplitPresplit";
+    setupTable(table, 10);
+    populateTable(table, 1);
+    assertExpectedTable(table, ROWCOUNT, 1);
+    forceSplit(table);
+
+    final AtomicInteger countedLqis= new AtomicInteger();
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+      protected List<LoadQueueItem> groupOrSplit(
+          Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+          final LoadQueueItem item, final HTable htable,
+          final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+        List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+        if (lqis != null) {
+          countedLqis.addAndGet(lqis.size());
+        }
+        return lqis;
+      }
+    };
+
+    // create HFiles for different column families
+    Path bulk = buildBulkFiles(table, 2);
+    HTable ht = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(bulk, ht);
+
+    assertExpectedTable(table, ROWCOUNT, 2);
+    assertEquals(20, countedLqis.get());
+  }
+
+  /**
+   * This simulates an remote exception which should cause LIHF to exit with an
+   * exception.
+   */
+  @Test(expected = IOException.class)
+  public void testGroupOrSplitFailure() throws Exception {
+    String table = "groupOrSplitFailure";
+    setupTable(table, 10);
+
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+      int i = 0;
+
+      protected List<LoadQueueItem> groupOrSplit(
+          Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+          final LoadQueueItem item, final HTable table,
+          final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+        i++;
+
+        if (i == 5) {
+          throw new IOException("failure");
+        }
+        return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+      }
+    };
+
+    // create HFiles for different column families
+    Path dir = buildBulkFiles(table,1);
+    HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
+    lih.doBulkLoad(dir, t);
+
+    fail("doBulkLoad should have thrown an exception");
+  }
+}

Added: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1202368&view=auto
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (added)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java Tue Nov 15 18:39:25 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
+ * the region server's bullkLoad functionality.
+ */
+public class TestHRegionServerBulkLoad {
+  final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private final static Configuration conf = UTIL.getConfiguration();
+  private final static byte[] QUAL = Bytes.toBytes("qual");
+  private final static int NUM_CFS = 10;
+  public static int BLOCKSIZE = 64 * 1024;
+  public static String COMPRESSION = Compression.Algorithm.NONE.getName();
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  /**
+   * Create an HFile with the given number of rows with a specified value.
+   */
+  public static void createHFile(FileSystem fs, Path path, byte[] family,
+      byte[] qualifier, byte[] value, int numRows) throws IOException {
+    HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
+        KeyValue.KEY_COMPARATOR);
+    long now = System.currentTimeMillis();
+    try {
+      // subtract 2 since iterateOnSplits doesn't include boundary keys
+      for (int i = 0; i < numRows; i++) {
+        KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Thread that does full scans of the table looking for any partially
+   * completed rows.
+   * 
+   * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
+   * handles. So every 10 iterations (500 file handles) it does a region
+   * compaction to reduce the number of open file handles.
+   */
+  public static class AtomicHFileLoader extends RepeatingTestThread {
+    final AtomicLong numBulkLoads = new AtomicLong();
+    final AtomicLong numCompactions = new AtomicLong();
+    private String tableName;
+
+    public AtomicHFileLoader(String tableName, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.tableName = tableName;
+    }
+
+    public void doAnAction() throws Exception {
+      long iteration = numBulkLoads.getAndIncrement();
+      Path dir = new Path(HBaseTestingUtility.getTestDir() + "/" +
+          String.format("bulkLoad_%08d", iteration));
+
+      // create HFiles for different column families
+      FileSystem fs = UTIL.getTestFileSystem();
+      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+      final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
+          NUM_CFS);
+      for (int i = 0; i < NUM_CFS; i++) {
+        Path hfile = new Path(dir, family(i));
+        byte[] fam = Bytes.toBytes(family(i));
+        createHFile(fs, hfile, fam, QUAL, val, 1000);
+        famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
+      }
+
+      // bulk load HFiles
+      HConnection conn = UTIL.getHBaseAdmin().getConnection();
+      byte[] tbl = Bytes.toBytes(tableName);
+      conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl, Bytes
+          .toBytes("aaa")) {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Going to connect to server " + location + " for row "
+              + Bytes.toStringBinary(row));
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          server.bulkLoadHFiles(famPaths, regionName);
+          return null;
+        }
+      });
+
+      // Periodically do compaction to reduce the number of open file handles.
+      if (numBulkLoads.get() % 10 == 0) {
+        // 10 * 50 = 500 open file handles!
+        conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl,
+            Bytes.toBytes("aaa")) {
+          @Override
+          public Void call() throws Exception {
+            LOG.debug("compacting " + location + " for row "
+                + Bytes.toStringBinary(row));
+            server.compactRegion(location.getRegionInfo(), true);
+            numCompactions.incrementAndGet();
+            return null;
+          }
+        });
+      }
+    }
+  }
+
+  /**
+   * Thread that does full scans of the table looking for any partially
+   * completed rows.
+   */
+  public static class AtomicScanReader extends RepeatingTestThread {
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numScans = new AtomicLong();
+    AtomicLong numRowsScanned = new AtomicLong();
+    String TABLE_NAME;
+
+    public AtomicScanReader(String TABLE_NAME, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.TABLE_NAME = TABLE_NAME;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(conf, TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Scan s = new Scan();
+      for (byte[] family : targetFamilies) {
+        s.addFamily(family);
+      }
+      ResultScanner scanner = table.getScanner(s);
+
+      for (Result res : scanner) {
+        byte[] lastRow = null, lastFam = null, lastQual = null;
+        byte[] gotValue = null;
+        for (byte[] family : targetFamilies) {
+          byte qualifier[] = QUAL;
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (gotValue != null && thisValue != null
+              && !Bytes.equals(gotValue, thisValue)) {
+
+            StringBuilder msg = new StringBuilder();
+            msg.append("Failed on scan ").append(numScans)
+                .append(" after scanning ").append(numRowsScanned)
+                .append(" rows!\n");
+            msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
+                + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+                + " = " + Bytes.toString(thisValue) + "\n");
+            msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
+                + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
+                + " = " + Bytes.toString(gotValue));
+            throw new RuntimeException(msg.toString());
+          }
+
+          lastFam = family;
+          lastQual = qualifier;
+          lastRow = res.getRow();
+          gotValue = thisValue;
+        }
+        numRowsScanned.getAndIncrement();
+      }
+      numScans.getAndIncrement();
+    }
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column
+   * families if the table does not already exist.
+   */
+  private void setupTable(String table, int cfs) throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      HTableDescriptor htd = new HTableDescriptor(table);
+      for (int i = 0; i < 10; i++) {
+        htd.addFamily(new HColumnDescriptor(family(i)));
+      }
+
+      HBaseAdmin admin = UTIL.getHBaseAdmin();
+      admin.createTable(htd);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  /**
+   * Atomic bulk load.
+   */
+  @Test
+  public void testAtomicBulkLoad() throws Exception {
+    String TABLE_NAME = "atomicBulkLoad";
+
+    int millisToRun = 30000;
+    int numScanners = 50;
+
+    UTIL.startMiniCluster(1);
+    try {
+      runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
+      throws Exception {
+    setupTable(tableName, 10);
+
+    TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+    ctx.addThread(loader);
+
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+
+    LOG.info("Loaders:");
+    LOG.info("  loaded " + loader.numBulkLoads.get());
+    LOG.info("  compations " + loader.numCompactions.get());
+
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+
+  /**
+   * Run test on an HBase instance for 5 minutes. This assumes that the table
+   * under test only has a single region.
+   */
+  public static void main(String args[]) throws Exception {
+    try {
+      Configuration c = HBaseConfiguration.create();
+      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
+      test.setConf(c);
+      test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
+    } finally {
+      System.exit(0); // something hangs (believe it is lru threadpool)
+    }
+  }
+
+  private void setConf(Configuration c) {
+    UTIL = new HBaseTestingUtility(c);
+  }
+}

Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1202368&r1=1202367&r2=1202368&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Nov 15 18:39:25 2011
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -193,7 +195,9 @@ public class TestWALReplay {
     byte [] row = Bytes.toBytes(tableNameStr);
     writer.append(new KeyValue(row, family, family, row));
     writer.close();
-    region.bulkLoadHFile(f.toString(), family);
+    List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
+    hfs.add(Pair.newPair(family, f.toString()));
+    region.bulkLoadHFiles(hfs);
     // Add an edit so something in the WAL
     region.put((new Put(row)).add(family, family, family));
     wal.sync();
@@ -509,4 +513,4 @@ public class TestWALReplay {
     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
     return wal;
   }
-}
\ No newline at end of file
+}



Mime
View raw message