hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r991289 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/ha...
Date Tue, 31 Aug 2010 18:53:33 GMT
Author: rawson
Date: Tue Aug 31 18:53:32 2010
New Revision: 991289

URL: http://svn.apache.org/viewvc?rev=991289&view=rev
Log:
HBASE-1845  MultiGet, MultiDelete, and MultiPut - batched to the 
            appropriate region servers (Marc Limotte via Ryan)

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Aug 31 18:53:32 2010
@@ -855,6 +855,8 @@ Release 0.21.0 - Unreleased
    HBASE-2904  Smart seeking using filters (Pranav via Ryan)
    HBASE-2922  HLog preparation and cleanup are done under the updateLock, 
                major slowdown
+   HBASE-1845  MultiGet, MultiDelete, and MultiPut - batched to the 
+   	       appropriate region servers (Marc Limotte via Ryan)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java?rev=991289&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Action.java Tue Aug 31 18:53:32 2010
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/*
+ * A Get, Put or Delete associated with it's region.  Used internally by  
+ * {@link HTable::batch} to associate the action with it's region and maintain 
+ * the index from the original request. 
+ */
+public class Action implements Writable, Comparable {
+
+  private byte[] regionName;
+  private Row action;
+  private int originalIndex;
+  private Result result;
+
+  public Action() {
+    super();
+  }
+
+  public Action(byte[] regionName, Row action, int originalIndex) {
+    super();
+    this.regionName = regionName;
+    this.action = action;
+    this.originalIndex = originalIndex;
+  }
+
+  public byte[] getRegionName() {
+    return regionName;
+  }
+
+  public void setRegionName(byte[] regionName) {
+    this.regionName = regionName;
+  }
+
+  public Result getResult() {
+    return result;
+  }
+
+  public void setResult(Result result) {
+    this.result = result;
+  }
+
+  public Row getAction() {
+    return action;
+  }
+
+  public int getOriginalIndex() {
+    return originalIndex;
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    return action.compareTo(((Action) o).getAction());
+  }
+
+  // ///////////////////////////////////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////////////////////////////////
+
+  public void write(final DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, regionName);
+    HbaseObjectWritable.writeObject(out, action, Row.class, null);
+    out.writeInt(originalIndex);
+    HbaseObjectWritable.writeObject(out, result, Result.class, null);
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.regionName = Bytes.readByteArray(in);
+    this.action = (Row) HbaseObjectWritable.readObject(in, null);
+    this.originalIndex = in.readInt();
+    this.result = (Result) HbaseObjectWritable.readObject(in, null);
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Get.java Tue Aug 31 18:53:32 2010
@@ -60,7 +60,7 @@ import java.util.TreeSet;
  * <p>
  * To add a filter, execute {@link #setFilter(Filter) setFilter}.
  */
-public class Get implements Writable {
+public class Get implements Writable, Row, Comparable<Row> {
   private static final byte GET_VERSION = (byte)1;
 
   private byte [] row = null;
@@ -325,6 +325,11 @@ public class Get implements Writable {
     return sb.toString();
   }
 
+  //Row
+  public int compareTo(Row other) {
+    return Bytes.compareTo(this.getRow(), other.getRow());
+  }
+  
   //Writable
   public void readFields(final DataInput in)
   throws IOException {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Tue Aug 31 18:53:32 2010
@@ -194,9 +194,24 @@ public interface HConnection {
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable) 
+  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
   throws IOException, RuntimeException;
 
+  /**
+   * Process a mixed batch of Get, Put and Delete actions. All actions for a
+   * RegionServer are forwarded in one RPC call.
+   * 
+   * @param actions The collection of actions.
+   * @param tableName Name of the hbase table
+   * @param pool thread pool for parallel execution
+   * @param results An empty array, same size as list. If an exception is thrown,
+   * you can test here for partial results, and to determine which actions
+   * processed successfully.
+   * @throws IOException
+   */
+  public void processBatch(List<Row> actions, final byte[] tableName,
+      ExecutorService pool, Result[] results)
+  throws IOException;
 
   /**
    * Process a batch of Puts. Does the retries.
@@ -204,20 +219,32 @@ public interface HConnection {
    * @param tableName The name of the table
    * @return Count of committed Puts.  On fault, < list.size().
    * @throws IOException if a remote or network exception occurs
+   * @deprecated Use HConnectionManager::processBatch instead.
    */
-  public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
+  public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, ExecutorService pool)
   throws IOException;
 
   /**
    * Process a batch of Deletes. Does the retries.
    * @param list A batch of Deletes to process.
-   * @return Count of committed Deletes. On fault, < list.size().
    * @param tableName The name of the table
+   * @return Count of committed Deletes. On fault, < list.size().
    * @throws IOException if a remote or network exception occurs
+   * @deprecated Use HConnectionManager::processBatch instead.
    */
-  public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
+  public int processBatchOfDeletes(List<Delete> list, byte[] tableName, ExecutorService pool)
   throws IOException;
 
+  /**
+   * Process a batch of Puts.
+   *
+   * @param list The collection of actions. The list is mutated: all successful Puts 
+   * are removed from the list.
+   * @param tableName Name of the hbase table
+   * @param pool Thread pool for parallel execution
+   * @throws IOException
+   * @deprecated Use HConnectionManager::processBatch instead.
+   */
   public void processBatchOfPuts(List<Put> list,
                                  final byte[] tableName, ExecutorService pool) throws IOException;
 
@@ -234,7 +261,7 @@ public interface HConnection {
   /**
    * Check whether region cache prefetch is enabled or not.
    * @param tableName name of table to check
-   * @return true if table's region cache prefecth is enabled. Otherwise
+   * @return true if table's region cache prefetch is enabled. Otherwise
    * it is disabled.
    */
   public boolean getRegionCachePrefetch(final byte[] tableName);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Aug 31 18:53:32 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
@@ -59,11 +60,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.CopyOnWriteArraySet;
 
@@ -994,7 +998,7 @@ public class HConnectionManager {
      * Allows flushing the region cache.
      */
     public void clearRegionCache() {
-     cachedRegionLocations.clear();
+      cachedRegionLocations.clear();
     }
 
     /*
@@ -1210,168 +1214,38 @@ public class HConnectionManager {
       return location;
     }
 
-    /*
-     * Helper class for batch updates.
-     * Holds code shared doing batch puts and batch deletes.
+    /**
+     * @deprecated Use HConnectionManager::processBatch instead.
      */
-    private abstract class Batch {
-      final HConnection c;
-
-      private Batch(final HConnection c) {
-        this.c = c;
-      }
-
-      /**
-       * This is the method subclasses must implement.
-       * @param currentList current list of rows
-       * @param tableName table we are processing
-       * @param row row
-       * @return Count of items processed or -1 if all.
-       * @throws IOException if a remote or network exception occurs
-       * @throws RuntimeException other undefined exception
-       */
-      abstract int doCall(final List<? extends Row> currentList,
-        final byte [] row, final byte [] tableName)
-      throws IOException, RuntimeException;
-
-      /**
-       * Process the passed <code>list</code>.
-       * @param list list of rows to process
-       * @param tableName table we are processing
-       * @return Count of how many added or -1 if all added.
-       * @throws IOException if a remote or network exception occurs
-       */
-      int process(final List<? extends Row> list, final byte[] tableName)
-      throws IOException {
-        byte [] region = getRegionName(tableName, list.get(0).getRow(), false);
-        byte [] currentRegion = region;
-        boolean isLastRow;
-        boolean retryOnlyOne = false;
-        List<Row> currentList = new ArrayList<Row>();
-        int i, tries;
-        for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) {
-          Row row = list.get(i);
-          currentList.add(row);
-          // If the next record goes to a new region, then we are to clear
-          // currentList now during this cycle.
-          isLastRow = (i + 1) == list.size();
-          if (!isLastRow) {
-            region = getRegionName(tableName, list.get(i + 1).getRow(), false);
-          }
-          if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) {
-            int index = doCall(currentList, row.getRow(), tableName);
-            // index is == -1 if all processed successfully, else its index
-            // of last record successfully processed.
-            if (index != -1) {
-              if (tries == numRetries - 1) {
-                throw new RetriesExhaustedException("Some server, retryOnlyOne=" +
-                  retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow +
-                  ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i +
-                  ", listsize=" + list.size() + ", region=" +
-                  Bytes.toStringBinary(region), currentRegion, row.getRow(),
-                  tries, new ArrayList<Throwable>());
-              }
-              tries = doBatchPause(currentRegion, tries);
-              i = i - currentList.size() + index;
-              retryOnlyOne = true;
-              // Reload location.
-              region = getRegionName(tableName, list.get(i + 1).getRow(), true);
-            } else {
-              // Reset these flags/counters on successful batch Put
-              retryOnlyOne = false;
-              tries = 0;
-            }
-            currentRegion = region;
-            currentList.clear();
-          }
-        }
-        return i;
-      }
-
-      /*
-       * @param t
-       * @param r
-       * @param re
-       * @return Region name that holds passed row <code>r</code>
-       * @throws IOException
-       */
-      private byte [] getRegionName(final byte [] t, final byte [] r,
-        final boolean re)
-      throws IOException {
-        HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re);
-        return location.getRegionInfo().getRegionName();
-      }
-
-      /*
-       * Do pause processing before retrying...
-       * @param currentRegion
-       * @param tries
-       * @return New value for tries.
-       */
-      private int doBatchPause(final byte [] currentRegion, final int tries) {
-        int localTries = tries;
-        long sleepTime = getPauseTime(tries);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
-            " location because regionserver didn't accept updates; tries=" +
-            tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms");
-        }
-        try {
-          Thread.sleep(sleepTime);
-          localTries++;
-        } catch (InterruptedException e) {
-          // continue
-        }
-        return localTries;
-      }
-    }
-
-    public int processBatchOfRows(final ArrayList<Put> list,
-      final byte[] tableName)
+    public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName, ExecutorService pool)
     throws IOException {
-      if (list.isEmpty()) return 0;
-      if (list.size() > 1) Collections.sort(list);
-      Batch b = new Batch(this) {
-        @SuppressWarnings("unchecked")
-        @Override
-        int doCall(final List<? extends Row> currentList, final byte [] row,
-          final byte [] tableName)
-        throws IOException, RuntimeException {
-          final List<Put> puts = (List<Put>)currentList;
-          return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
-              tableName, row) {
-            public Integer call() throws IOException {
-              return server.put(location.getRegionInfo().getRegionName(), puts);
-            }
-          });
+      Result[] results = new Result[list.size()];
+      processBatch((List) list, tableName, pool, results);
+      int count = 0;
+      for (Result r : results) {
+        if (r != null) {
+          count++;
         }
-      };
-      return b.process(list, tableName);
+      }
+      return (count == list.size() ? -1 : count);
     }
 
+    /**
+     * @deprecated Use HConnectionManager::processBatch instead.
+     */
     public int processBatchOfDeletes(final List<Delete> list,
-      final byte[] tableName)
+      final byte[] tableName, ExecutorService pool)
     throws IOException {
-      if (list.isEmpty()) return 0;
-      if (list.size() > 1) Collections.sort(list);
-      Batch b = new Batch(this) {
-        @SuppressWarnings("unchecked")
-        @Override
-        int doCall(final List<? extends Row> currentList, final byte [] row,
-          final byte [] tableName)
-        throws IOException, RuntimeException {
-          final List<Delete> deletes = (List<Delete>)currentList;
-          return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
-                tableName, row) {
-              public Integer call() throws IOException {
-                return server.delete(location.getRegionInfo().getRegionName(),
-                  deletes);
-              }
-            });
-          }
-        };
-        return b.process(list, tableName);
+      Result[] results = new Result[list.size()];
+      processBatch((List) list, tableName, pool, results);
+      int count = 0;
+      for (Result r : results) {
+        if (r != null) {
+          count++;
+        }
       }
+      return (count == list.size() ? -1 : count);
+    }
 
     void close(boolean stopProxy) {
       if (master != null) {
@@ -1388,108 +1262,133 @@ public class HConnectionManager {
       }
     }
 
-    /**
-     * Process a batch of Puts on the given executor service.
-     *
-     * @param list the puts to make - successful puts will be removed.
-     * @param pool thread pool to execute requests on
-     *
-     * In the case of an exception, we take different actions depending on the
-     * situation:
-     *  - If the exception is a DoNotRetryException, we rethrow it and leave the
-     *    'list' parameter in an indeterminate state.
-     *  - If the 'list' parameter is a singleton, we directly throw the specific
-     *    exception for that put.
-     *  - Otherwise, we throw a generic exception indicating that an error occurred.
-     *    The 'list' parameter is mutated to contain those puts that did not succeed.
-     */
-    public void processBatchOfPuts(List<Put> list,
-                                   final byte[] tableName, ExecutorService pool) throws IOException {
-      boolean singletonList = list.size() == 1;
+    private Callable<MultiResponse> createCallable(
+        final HServerAddress address,
+        final MultiAction multi,
+        final byte [] tableName) {
+  	  final HConnection connection = this;
+  	  return new Callable<MultiResponse>() {
+  	    public MultiResponse call() throws IOException {
+  	      return getRegionServerWithoutRetries(
+  	          new ServerCallable<MultiResponse>(connection, tableName, null) {
+  	            public MultiResponse call() throws IOException {
+  	              return server.multi(multi);
+  	            }
+  	            @Override
+  	            public void instantiateServer(boolean reload) throws IOException {
+  	              server = connection.getHRegionConnection(address);
+  	            }
+  	          }
+  	      );
+  	    }
+  	  };
+  	}
+
+    public void processBatch(List<Row> list,
+        final byte[] tableName,
+        ExecutorService pool,
+        Result[] results) throws IOException {
+
+      // results must be the same size as list
+      if (results.length != list.size()) {
+        throw new IllegalArgumentException("argument results must be the same size as argument list");
+      }
+
+      if (list.size() == 0) {
+        return;
+      }
+
+      List<Row> workingList = new ArrayList<Row>(list);
+      final boolean singletonList = (list.size() == 1);
+      boolean retry = true;
       Throwable singleRowCause = null;
-      for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
-        Collections.sort(list);
-        Map<HServerAddress, MultiPut> regionPuts =
-            new HashMap<HServerAddress, MultiPut>();
-        // step 1:
-        //  break up into regionserver-sized chunks and build the data structs
-        for ( Put put : list ) {
-          byte [] row = put.getRow();
-
-          HRegionLocation loc = locateRegion(tableName, row, true);
-          HServerAddress address = loc.getServerAddress();
-          byte [] regionName = loc.getRegionInfo().getRegionName();
-
-          MultiPut mput = regionPuts.get(address);
-          if (mput == null) {
-            mput = new MultiPut(address);
-            regionPuts.put(address, mput);
-          }
-          mput.add(regionName, put);
-        }
-
-        // step 2:
-        //  make the requests
-        // Discard the map, just use a list now, makes error recovery easier.
-        List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
-
-        List<Future<MultiPutResponse>> futures =
-            new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
-        for ( MultiPut put : multiPuts ) {
-          futures.add(pool.submit(createPutCallable(put.address,
-              put,
-              tableName)));
-        }
-        // RUN!
-        List<Put> failed = new ArrayList<Put>();
-
-        // step 3:
-        //  collect the failures and tries from step 1.
-        for (int i = 0; i < futures.size(); i++ ) {
-          Future<MultiPutResponse> future = futures.get(i);
-          MultiPut request = multiPuts.get(i);
+
+      for (int tries = 0; tries < numRetries && retry; ++tries) {
+
+        // sleep first, if this is a retry
+        if (tries >= 1) {
+          long sleepTime = getPauseTime(tries);
+          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
           try {
-            MultiPutResponse resp = future.get();
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException ignore) {
+            LOG.debug("Interupted");
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+
+        // step 1: break up into regionserver-sized chunks and build the data structs
+
+        Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
+        for (int i=0; i<workingList.size(); i++) {
+          Row row = workingList.get(i);
+          if (row != null) {
+            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
+            HServerAddress address = loc.getServerAddress();
+            byte[] regionName = loc.getRegionInfo().getRegionName();
+
+            MultiAction actions = actionsByServer.get(address);
+            if (actions == null) {
+              actions = new MultiAction();
+              actionsByServer.put(address, actions);
+            }
+
+            Action action = new Action(regionName, row, i);
+            actions.add(regionName, action);
+          }
+        }
+
+        // step 2: make the requests
+
+        Map<HServerAddress,Future<MultiResponse>> futures =
+            new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
+
+        for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
+          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
+        }
 
-            // For each region
-            for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
-              Integer result = resp.getAnswer(e.getKey());
-              if (result == null) {
-                // failed
-                LOG.debug("Failed all for region: " +
-                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
-                failed.addAll(e.getValue());
-              } else if (result >= 0) {
-                // some failures
-                List<Put> lst = e.getValue();
-                failed.addAll(lst.subList(result, lst.size()));
-                LOG.debug("Failed past " + result + " for region: " +
-                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
+        // step 3: collect the failures and successes and prepare for retry
+
+        for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
+          HServerAddress address = responsePerServer.getKey();
+
+          try {
+            // Gather the results for one server
+            Future<MultiResponse> future = responsePerServer.getValue();
+
+            // Not really sure what a reasonable timeout value is. Here's a first try.
+
+            MultiResponse resp = future.get();
+
+            if (resp == null) {
+              // Entire server failed
+              LOG.debug("Failed all for server: " + address + ", removing from cache");
+            } else {
+              // For each region
+              for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet()) {
+                byte[] regionName = e.getKey();
+                List<Pair<Integer, Result>> regionResults = e.getValue();
+                for (int i = 0; i < regionResults.size(); i++) {
+                  Pair<Integer, Result> regionResult = regionResults.get(i);
+                  if (regionResult.getSecond() == null) {
+                    // failed
+                    LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
+                  } else {
+                    // success
+                    results[regionResult.getFirst()] = regionResult.getSecond();
+                  }
+                }
               }
             }
           } catch (InterruptedException e) {
-            // go into the failed list.
-            LOG.debug("Failed all from " + request.address, e);
-            failed.addAll(request.allPuts());
+            LOG.debug("Failed all from " + address, e);
+            Thread.currentThread().interrupt();
+            break;
           } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            // Don't print stack trace if NSRE; NSRE is 'normal' operation.
-            if (cause instanceof NotServingRegionException) {
-              String msg = cause.getMessage();
-              if (msg != null && msg.length() > 0) {
-                // msg is the exception as a String... we just want first line.
-                msg = msg.split("[\\n\\r]+\\s*at")[0];
-              }
-              LOG.debug("Failed execution of all on " + request.address +
-                " because: " + msg);
-            } else {
-              // all go into the failed list.
-              LOG.debug("Failed execution of all on " + request.address,
-                e.getCause());
-            }
-            failed.addAll(request.allPuts());
+            LOG.debug("Failed all from " + address, e);
 
-            // Just give up, leaving the batch put list in an untouched/semi-committed state
+            // Just give up, leaving the batch incomplete
             if (e.getCause() instanceof DoNotRetryIOException) {
               throw (DoNotRetryIOException) e.getCause();
             }
@@ -1500,56 +1399,57 @@ public class HConnectionManager {
             }
           }
         }
-        list.clear();
-        if (!failed.isEmpty()) {
-          for (Put failedPut: failed) {
-            deleteCachedLocation(tableName, failedPut.getRow());
-          }
-
-          list.addAll(failed);
 
-          long sleepTime = getPauseTime(tries);
-          LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
-              " ms!");
-          try {
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException ignored) {
+        // Find failures (i.e. null Result), and add them to the workingList (in
+        // order), so they can be retried.
+        retry = false;
+        workingList.clear();
+        for (int i = 0; i < results.length; i++) {
+          if (results[i] == null) {
+            retry = true;
+            Row row = list.get(i);
+            workingList.add(row);
+            deleteCachedLocation(tableName, row.getRow());
+          } else {
+            // add null to workingList, so the order remains consistent with the original list argument.
+            workingList.add(null);
           }
         }
       }
-      if (!list.isEmpty()) {
-        if (singletonList && singleRowCause != null) {
+
+      if (Thread.currentThread().isInterrupted()) {
+        throw new IOException("Aborting attempt because of a thread interruption");
+      }
+
+      if (retry) {
+        // ran out of retries and didn't successfully finish everything!
+        if (singleRowCause != null) {
           throw new IOException(singleRowCause);
+        } else {
+          throw new RetriesExhaustedException("Still had " + workingList.size()
+              + " actions left after retrying " + numRetries + " times.");
         }
-
-        // ran out of retries and didnt succeed everything!
-        throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
-            numRetries + " times.");
       }
     }
 
-
-    private Callable<MultiPutResponse> createPutCallable(
-        final HServerAddress address, final MultiPut puts,
-        final byte [] tableName) {
-      final HConnection connection = this;
-      return new Callable<MultiPutResponse>() {
-        public MultiPutResponse call() throws IOException {
-          return getRegionServerWithoutRetries(
-              new ServerCallable<MultiPutResponse>(connection, tableName, null) {
-                public MultiPutResponse call() throws IOException {
-                  MultiPutResponse resp = server.multiPut(puts);
-                  resp.request = puts;
-                  return resp;
-                }
-                @Override
-                public void instantiateServer(boolean reload) throws IOException {
-                  server = connection.getHRegionConnection(address);
-                }
-              }
-          );
+    /**
+     * @deprecated Use HConnectionManager::processBatch instead.
+     */
+    public void processBatchOfPuts(List<Put> list,
+        final byte[] tableName,
+        ExecutorService pool) throws IOException {
+      Result[] results = new Result[list.size()];
+      processBatch((List) list, tableName, pool, results);
+
+      // mutate list so that it is empty for complete success, or contains only failed records
+      // results are returned in the same order as the requests in list
+      // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
+      for (int i = results.length - 1; i>=0; i--) {
+        // if result is not null, it succeeded
+        if (results[i] != null) {
+          list.remove(i);
         }
-      };
+      }
     }
 
     private Throwable translateException(Throwable t) throws IOException {
@@ -1616,4 +1516,4 @@ public class HConnectionManager {
       }
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 31 18:53:32 2010
@@ -45,6 +45,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -76,7 +77,7 @@ public class HTable implements HTableInt
   private long currentWriteBufferSize;
   protected int scannerCaching;
   private int maxKeyValueSize;
-
+  private ExecutorService pool;  // For Multi
   private long maxScannerResultSize;
 
   /**
@@ -143,13 +144,11 @@ public class HTable implements HTableInt
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-
-    int nrHRS = getCurrentNrHRS();
-    if (nrHRS == 0) {
-      // No servers running -- set default of 10 threads.
-      nrHRS = 10;
+    
+    int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
+    if (nrThreads == 0) {
+      nrThreads = 1; // is there a better default?
     }
-    int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
 
     // Unfortunately Executors.newCachedThreadPool does not allow us to
     // set the maximum size of the pool, so we have to do it ourselves.
@@ -176,9 +175,6 @@ public class HTable implements HTableInt
       .getRSDirectoryCount();
   }
 
-  // For multiput
-  private ExecutorService pool;
-
   /**
    * Tells whether or not a table is enabled or not.
    * @param tableName Name of table to check.
@@ -509,6 +505,40 @@ public class HTable implements HTableInt
     );
   }
 
+  /**
+   * Method that does a batch call on Deletes, Gets and Puts.
+   *
+   * @param actions list of Get, Put, Delete objects
+   * @param results Empty Result[], same size as actions. Provides access to partial
+   * results, in case an exception is thrown. A null in the result array means that
+   * the call for that action failed, even after retries
+   * @throws IOException
+   */
+  public synchronized void batch(final List<Row> actions, final Result[] results) throws IOException {
+    connection.processBatch(actions, tableName, pool, results);
+  }
+
+  /**
+   * Method that does a batch call on Deletes, Gets and Puts.
+   * 
+   * @param actions list of Get, Put, Delete objects
+   * @return the results from the actions. A null in the return array means that
+   * the call for that action failed, even after retries
+   * @throws IOException
+   */
+  public synchronized Result[] batch(final List<Row> actions) throws IOException {
+    Result[] results = new Result[actions.size()];
+    connection.processBatch(actions, tableName, pool, results);
+    return results;
+  }
+
+  /**
+   * Deletes the specified cells/row.
+   * 
+   * @param delete The object that specifies what to delete.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
   public void delete(final Delete delete)
   throws IOException {
     connection.getRegionServerWithRetries(
@@ -521,13 +551,28 @@ public class HTable implements HTableInt
     );
   }
 
+  /**
+   * Deletes the specified cells/rows in bulk.
+   * @param deletes List of things to delete. As a side effect, it will be modified:
+   * successful {@link Delete}s are removed. The ordering of the list will not change. 
+   * @throws IOException if a remote or network exception occurs. In that case
+   * the {@code deletes} argument will contain the {@link Delete} instances
+   * that have not be successfully applied.
+   * @since 0.20.1
+   */
   public void delete(final List<Delete> deletes)
   throws IOException {
-    int last = 0;
-    try {
-      last = connection.processBatchOfDeletes(deletes, this.tableName);
-    } finally {
-      deletes.subList(0, last).clear();
+    Result[] results = new Result[deletes.size()];
+    connection.processBatch((List) deletes, tableName, pool, results);
+
+    // mutate list so that it is empty for complete success, or contains only failed records
+    // results are returned in the same order as the requests in list
+    // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
+    for (int i = results.length - 1; i>=0; i--) {
+      // if result is not null, it succeeded
+      if (results[i] != null) {
+        deletes.remove(i);
+      }
     }
   }
 
@@ -659,10 +704,17 @@ public class HTable implements HTableInt
     );
   }
 
+  /**
+   * Executes all the buffered {@link Put} operations.
+   * <p>
+   * This method gets called once automatically for every {@link Put} or batch
+   * of {@link Put}s (when {@link #batch(List)} is used) when
+   * {@link #isAutoFlush()} is {@code true}.
+   * @throws IOException if a remote or network exception occurs.
+   */
   public void flushCommits() throws IOException {
     try {
-      connection.processBatchOfPuts(writeBuffer,
-          tableName, pool);
+      connection.processBatchOfPuts(writeBuffer, tableName, pool);
     } finally {
       // the write buffer was adjusted by processBatchOfPuts
       currentWriteBufferSize = 0;

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=991289&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Tue Aug 31 18:53:32 2010
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+/**
+ * Container for Actions (i.e. Get, Delete, or Put), which are grouped by
+ * regionName. Intended to be used with HConnectionManager.processBatch()
+ */
+public final class MultiAction implements Writable {
+
+  // map of regions to lists of puts/gets/deletes for that region.
+  public Map<byte[], List<Action>> actions = new TreeMap<byte[], List<Action>>(
+      Bytes.BYTES_COMPARATOR);
+
+  public MultiAction() {
+  }
+
+  /**
+   * Get the total number of Actions
+   * 
+   * @return total number of Actions for all groups in this container.
+   */
+  public int size() {
+    int size = 0;
+    for (List l : actions.values()) {
+      size += l.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   * 
+   * @param regionName
+   * @param a
+   */
+  public void add(byte[] regionName, Action a) {
+    List<Action> rsActions = actions.get(regionName);
+    if (rsActions == null) {
+      rsActions = new ArrayList<Action>();
+      actions.put(regionName, rsActions);
+    }
+    rsActions.add(a);
+  }
+
+  /**
+   * @return All actions from all regions in this container
+   */
+  public List<Action> allActions() {
+    List<Action> res = new ArrayList<Action>();
+    for (List<Action> lst : actions.values()) {
+      res.addAll(lst);
+    }
+    return res;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(actions.size());
+    for (Map.Entry<byte[], List<Action>> e : actions.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+      List<Action> lst = e.getValue();
+      out.writeInt(lst.size());
+      for (Action a : lst) {
+        HbaseObjectWritable.writeObject(out, a, Action.class, null);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    actions.clear();
+    int mapSize = in.readInt();
+    for (int i = 0; i < mapSize; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      int listSize = in.readInt();
+      List<Action> lst = new ArrayList<Action>(listSize);
+      for (int j = 0; j < listSize; j++) {
+        lst.add((Action) HbaseObjectWritable.readObject(in, null));
+      }
+      actions.put(key, lst);
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Tue Aug 31 18:53:32 2010
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
+ * @deprecated Use MultiAction instead
  * Data type class for putting multiple regions worth of puts in one RPC.
  */
 public class MultiPut implements Writable {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Tue Aug 31 18:53:32 2010
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
+ * @deprecated Replaced by MultiResponse
  * Response class for MultiPut.
  */
 public class MultiPutResponse implements Writable {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=991289&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Tue Aug 31 18:53:32 2010
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+/**
+ * A container for Result objects, grouped by regionName.
+ */
+public class MultiResponse implements Writable {
+
+  // map of regionName to list of (Results paired to the original index for that
+  // Result)
+  private Map<byte[], List<Pair<Integer, Result>>> results = new TreeMap<byte[], List<Pair<Integer, Result>>>(
+      Bytes.BYTES_COMPARATOR);
+
+  public MultiResponse() {
+  }
+
+  /**
+   * @return Number of pairs in this container
+   */
+  public int size() {
+    int size = 0;
+    for (Collection<?> c : results.values()) {
+      size += c.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add the pair to the container, grouped by the regionName
+   * 
+   * @param regionName
+   * @param r
+   *          First item in the pair is the original index of the Action
+   *          (request). Second item is the Result. Result will be empty for
+   *          successful Put and Delete actions.
+   */
+  public void add(byte[] regionName, Pair<Integer, Result> r) {
+    List<Pair<Integer, Result>> rs = results.get(regionName);
+    if (rs == null) {
+      rs = new ArrayList<Pair<Integer, Result>>();
+      results.put(regionName, rs);
+    }
+    rs.add(r);
+  }
+
+  public Map<byte[], List<Pair<Integer, Result>>> getResults() {
+    return results;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(results.size());
+    for (Map.Entry<byte[], List<Pair<Integer, Result>>> e : results.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+      List<Pair<Integer, Result>> lst = e.getValue();
+      out.writeInt(lst.size());
+      for (Pair<Integer, Result> r : lst) {
+        out.writeInt(r.getFirst());
+        HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    results.clear();
+    int mapSize = in.readInt();
+    for (int i = 0; i < mapSize; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      int listSize = in.readInt();
+      List<Pair<Integer, Result>> lst = new ArrayList<Pair<Integer, Result>>(
+          listSize);
+      for (int j = 0; j < listSize; j++) {
+        Integer idx = in.readInt();
+        Result r = (Result) HbaseObjectWritable.readObject(in, null);
+        lst.add(new Pair<Integer, Result>(idx, r));
+      }
+      results.put(key, lst);
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Row.java Tue Aug 31 18:53:32 2010
@@ -19,12 +19,15 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Has a row.
  */
-interface Row {
+public interface Row extends WritableComparable<Row> {
   /**
    * @return The row.
    */
   public byte [] getRow();
-}
\ No newline at end of file
+  
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Aug 31 18:53:32 2010
@@ -45,8 +45,12 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.MultiResponse;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -172,6 +176,13 @@ public class HbaseObjectWritable impleme
     // List
     addToMap(List.class, code++);
     addToMap(ColumnPrefixFilter.class, code++);
+    
+    // Multi
+    addToMap(Row.class, code++);
+    addToMap(Action.class, code++);
+    addToMap(MultiAction.class, code++);
+    addToMap(MultiResponse.class, code++);
+    
   }
 
   private Class<?> declaredClass;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Aug 31 18:53:32 2010
@@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiResponse;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
 import org.apache.hadoop.hbase.client.Put;
@@ -271,6 +273,13 @@ public interface HRegionInterface extend
    */
   public HServerInfo getHServerInfo() throws IOException;
 
+  /**
+   * Method used for doing multiple actions(Deletes, Gets and Puts) in one call
+   * @param multi
+   * @return MultiResult
+   * @throws IOException
+   */
+  public MultiResponse multi(MultiAction multi) throws IOException;
 
   /**
    * Multi put for putting multiple regions worth of puts at once.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Aug 31 18:53:32 2010
@@ -81,12 +81,16 @@ import org.apache.hadoop.hbase.YouAreDea
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiResponse;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
@@ -1687,7 +1691,6 @@ public class HRegionServer implements HR
   throws IOException {
     if (put.getRow() == null)
       throw new IllegalArgumentException("update has null row");
-
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
@@ -2376,19 +2379,64 @@ public class HRegionServer implements HR
   public HServerInfo getHServerInfo() throws IOException {
     return serverInfo;
   }
+  
+  @Override
+  public MultiResponse multi(MultiAction multi) throws IOException {
+    MultiResponse response = new MultiResponse();
+    for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
+      byte[] regionName = e.getKey();
+      List<Action> actionsForRegion = e.getValue();
+      // sort based on the row id - this helps in the case where we reach the
+      // end of a region, so that we don't have to try the rest of the 
+      // actions in the list.
+      Collections.sort(actionsForRegion);
+      Row action = null;
+      try {
+        for (Action a : actionsForRegion) {
+          action = a.getAction();
+          if (action instanceof Delete) {
+            delete(regionName, (Delete) action);
+            response.add(regionName, new Pair<Integer, Result>(
+                a.getOriginalIndex(), new Result()));
+          } else if (action instanceof Get) {
+            response.add(regionName, new Pair<Integer, Result>(
+                a.getOriginalIndex(), get(regionName, (Get) action)));
+          } else if (action instanceof Put) {
+            put(regionName, (Put) action);
+            response.add(regionName, new Pair<Integer, Result>(
+                a.getOriginalIndex(), new Result()));
+          } else {
+            LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
+            throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put.");
+          }
+        }
+      } catch (IOException ioe) {
+          if (multi.size() == 1) {
+            throw ioe;
+          } else {
+            LOG.error("Exception found while attempting " + action.toString()
+                + " " + StringUtils.stringifyException(ioe));
+            response.add(regionName,null);
+            // stop processing on this region, continue to the next.
+          }
+        }
+      }
+      
+      return response;
+    }
 
+  /**
+   * @deprecated Use HRegionServer.multi( MultiAction action) instead
+   */
   @Override
   public MultiPutResponse multiPut(MultiPut puts) throws IOException {
     MultiPutResponse resp = new MultiPutResponse();
-
     // do each region as it's own.
     for( Map.Entry<byte[], List<Put>> e: puts.puts.entrySet()) {
       int result = put(e.getKey(), e.getValue());
       resp.addResult(e.getKey(), result);
-
       e.getValue().clear(); // clear some RAM
     }
-
     return resp;
   }
 
@@ -2403,7 +2451,7 @@ public class HRegionServer implements HR
   public int getThreadWakeFrequency() {
     return threadWakeFrequency;
   }
-
+  
   //
   // Main program and support routines
   //

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java?rev=991289&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java Tue Aug 31 18:53:32 2010
@@ -0,0 +1,406 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestMultiParallel extends MultiRegionTable {
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
+  private static final String FAMILY = "family";
+  private static final String TEST_TABLE = "multi_test_table";
+  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
+  private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
+
+  List<byte[]> keys = new ArrayList<byte[]>();
+
+  public TestMultiParallel() {
+    super(2, FAMILY);
+    desc = new HTableDescriptor(TEST_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    makeKeys();
+  }
+
+  private void makeKeys() {
+    // Create a "non-uniform" test set with the following characteristics:
+    // a) Unequal number of keys per region
+
+    // Don't use integer as a multiple, so that we have a number of keys that is
+    // not a multiple of the number of regions
+    int numKeys = (int) ((float) KEYS.length * 10.33F);
+
+    for (int i = 0; i < numKeys; i++) {
+      int kIdx = i % KEYS.length;
+      byte[] k = KEYS[kIdx];
+      byte[] cp = new byte[k.length + 1];
+      System.arraycopy(k, 0, cp, 0, k.length);
+      cp[k.length] = new Integer(i % 256).byteValue();
+      keys.add(cp);
+    }
+
+    // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
+    // should work)
+    // c) keys are not in sorted order (within a region), to ensure that the
+    // sorting code and index mapping doesn't break the functionality
+    for (int i = 0; i < 100; i++) {
+      int kIdx = i % KEYS.length;
+      byte[] k = KEYS[kIdx];
+      byte[] cp = new byte[k.length + 1];
+      System.arraycopy(k, 0, cp, 0, k.length);
+      cp[k.length] = new Integer(i % 256).byteValue();
+      keys.add(cp);
+    }
+  }
+
+  public void testBatchWithGet() throws Exception {
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    // load test data
+    List<Row> puts = constructPutRequests();
+    table.batch(puts);
+
+    // create a list of gets and run it
+    List<Row> gets = new ArrayList<Row>();
+    for (byte[] k : keys) {
+      Get get = new Get(k);
+      get.addColumn(BYTES_FAMILY, QUALIFIER);
+      gets.add(get);
+    }
+    Result[] multiRes = new Result[gets.size()];
+    table.batch(gets, multiRes);
+
+    // Same gets using individual call API
+    List<Result> singleRes = new ArrayList<Result>();
+    for (Row get : gets) {
+      singleRes.add(table.get((Get) get));
+    }
+
+    // Compare results
+    assertEquals(singleRes.size(), multiRes.length);
+    for (int i = 0; i < singleRes.size(); i++) {
+      assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
+      KeyValue[] singleKvs = singleRes.get(i).raw();
+      KeyValue[] multiKvs = multiRes[i].raw();
+      for (int j = 0; j < singleKvs.length; j++) {
+        assertEquals(singleKvs[j], multiKvs[j]);
+        assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j]
+            .getValue()));
+      }
+    }
+  }
+
+  /**
+   * Only run one Multi test with a forced RegionServer abort. Otherwise, the
+   * unit tests will take an unnecessarily long time to run.
+   * 
+   * @throws Exception
+   */
+  public void testFlushCommitsWithAbort() throws Exception {
+    doTestFlushCommits(true);
+  }
+
+  public void testFlushCommitsNoAbort() throws Exception {
+    doTestFlushCommits(false);
+  }
+
+  public void doTestFlushCommits(boolean doAbort) throws Exception {
+    // Load the data
+    HTable table = new HTable(conf, TEST_TABLE);
+    table.setAutoFlush(false);
+    table.setWriteBufferSize(10 * 1024 * 1024);
+
+    List<Row> puts = constructPutRequests();
+    for (Row put : puts) {
+      table.put((Put) put);
+    }
+    table.flushCommits();
+
+    if (doAbort) {
+      cluster.abortRegionServer(0);
+
+      // try putting more keys after the abort. same key/qual... just validating
+      // no exceptions thrown
+      puts = constructPutRequests();
+      for (Row put : puts) {
+        table.put((Put) put);
+      }
+
+      table.flushCommits();
+    }
+
+    validateLoadedData(table);
+
+    // Validate server and region count
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    ClusterStatus cs = admin.getClusterStatus();
+    assertEquals((doAbort ? 1 : 2), cs.getServers());
+    for (HServerInfo info : cs.getServerInfo()) {
+      System.out.println(info);
+      assertTrue(info.getLoad().getNumberOfRegions() > 10);
+    }
+  }
+
+  public void testBatchWithPut() throws Exception {
+
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    // put multiple rows using a batch
+    List<Row> puts = constructPutRequests();
+
+    Result[] results = table.batch(puts);
+    validateSizeAndEmpty(results, keys.size());
+
+    if (true) {
+      cluster.abortRegionServer(0);
+
+      puts = constructPutRequests();
+      results = table.batch(puts);
+      validateSizeAndEmpty(results, keys.size());
+    }
+
+    validateLoadedData(table);
+  }
+
+  public void testBatchWithDelete() throws Exception {
+
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    // Load some data
+    List<Row> puts = constructPutRequests();
+    Result[] results = table.batch(puts);
+    validateSizeAndEmpty(results, keys.size());
+
+    // Deletes
+    List<Row> deletes = new ArrayList<Row>();
+    for (int i = 0; i < keys.size(); i++) {
+      Delete delete = new Delete(keys.get(i));
+      delete.deleteFamily(BYTES_FAMILY);
+      deletes.add(delete);
+    }
+    results = table.batch(deletes);
+    validateSizeAndEmpty(results, keys.size());
+
+    // Get to make sure ...
+    for (byte[] k : keys) {
+      Get get = new Get(k);
+      get.addColumn(BYTES_FAMILY, QUALIFIER);
+      assertFalse(table.exists(get));
+    }
+
+  }
+
+  public void testHTableDeleteWithList() throws Exception {
+
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    // Load some data
+    List<Row> puts = constructPutRequests();
+    Result[] results = table.batch(puts);
+    validateSizeAndEmpty(results, keys.size());
+
+    // Deletes
+    ArrayList<Delete> deletes = new ArrayList<Delete>();
+    for (int i = 0; i < keys.size(); i++) {
+      Delete delete = new Delete(keys.get(i));
+      delete.deleteFamily(BYTES_FAMILY);
+      deletes.add(delete);
+    }
+    table.delete(deletes);
+    assertTrue(deletes.isEmpty());
+
+    // Get to make sure ...
+    for (byte[] k : keys) {
+      Get get = new Get(k);
+      get.addColumn(BYTES_FAMILY, QUALIFIER);
+      assertFalse(table.exists(get));
+    }
+
+  }
+
+  public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    List<Row> puts = new ArrayList<Row>();
+    for (int i = 0; i < 100; i++) {
+      Put put = new Put(ONE_ROW);
+      byte[] qual = Bytes.toBytes("column" + i);
+      put.add(BYTES_FAMILY, qual, VALUE);
+      puts.add(put);
+    }
+    Result[] results = table.batch(puts);
+
+    // validate
+    validateSizeAndEmpty(results, 100);
+
+    // get the data back and validate that it is correct
+    List<Row> gets = new ArrayList<Row>();
+    for (int i = 0; i < 100; i++) {
+      Get get = new Get(ONE_ROW);
+      byte[] qual = Bytes.toBytes("column" + i);
+      get.addColumn(BYTES_FAMILY, qual);
+      gets.add(get);
+    }
+
+    Result[] multiRes = table.batch(gets);
+
+    int idx = 0;
+    for (Result r : multiRes) {
+      byte[] qual = Bytes.toBytes("column" + idx);
+      validateResult(r, qual, VALUE);
+      idx++;
+    }
+
+  }
+
+  public void testBatchWithMixedActions() throws Exception {
+    HTable table = new HTable(conf, TEST_TABLE);
+
+    // Load some data to start
+    Result[] results = table.batch(constructPutRequests());
+    validateSizeAndEmpty(results, keys.size());
+
+    // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
+    // put
+    List<Row> actions = new ArrayList<Row>();
+
+    byte[] qual2 = Bytes.toBytes("qual2");
+    byte[] val2 = Bytes.toBytes("putvalue2");
+
+    // 0 get
+    Get get = new Get(keys.get(10));
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
+    // 1 get
+    get = new Get(keys.get(11));
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
+    // 2 put of new column
+    Put put = new Put(keys.get(10));
+    put.add(BYTES_FAMILY, qual2, val2);
+    actions.add(put);
+
+    // 3 delete
+    Delete delete = new Delete(keys.get(20));
+    delete.deleteFamily(BYTES_FAMILY);
+    actions.add(delete);
+
+    // 4 get
+    get = new Get(keys.get(30));
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
+    // 5 get of the put in #2 (entire family)
+    get = new Get(keys.get(10));
+    get.addFamily(BYTES_FAMILY);
+    actions.add(get);
+
+    // 6 get of the delete from #3
+    get = new Get(keys.get(20));
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
+    // 7 put of new column
+    put = new Put(keys.get(40));
+    put.add(BYTES_FAMILY, qual2, val2);
+    actions.add(put);
+
+    results = table.batch(actions);
+
+    // Validation
+
+    validateResult(results[0]);
+    validateResult(results[1]);
+    validateEmpty(results[2]);
+    validateEmpty(results[3]);
+    validateResult(results[4]);
+    validateResult(results[5]);
+    validateResult(results[5], qual2, val2); // testing second column in #5
+    validateEmpty(results[6]); // deleted
+    validateEmpty(results[7]);
+
+    // validate last put, externally from the batch
+    get = new Get(keys.get(40));
+    get.addColumn(BYTES_FAMILY, qual2);
+    Result r = table.get(get);
+    validateResult(r, qual2, val2);
+  }
+
+  // // Helper methods ////
+
+  private void validateResult(Result r) {
+    validateResult(r, QUALIFIER, VALUE);
+  }
+
+  private void validateResult(Result r, byte[] qual, byte[] val) {
+    assertTrue(r.containsColumn(BYTES_FAMILY, qual));
+    assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
+  }
+
+  private List<Row> constructPutRequests() {
+    List<Row> puts = new ArrayList<Row>();
+    for (byte[] k : keys) {
+      Put put = new Put(k);
+      put.add(BYTES_FAMILY, QUALIFIER, VALUE);
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  private void validateLoadedData(HTable table) throws IOException {
+    // get the data back and validate that it is correct
+    for (byte[] k : keys) {
+      Get get = new Get(k);
+      get.addColumn(BYTES_FAMILY, QUALIFIER);
+      Result r = table.get(get);
+      assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
+      assertEquals(0, Bytes.compareTo(VALUE, r
+          .getValue(BYTES_FAMILY, QUALIFIER)));
+    }
+  }
+
+  private void validateEmpty(Result result) {
+    assertTrue(result != null);
+    assertTrue(result.getRow() == null);
+    assertEquals(0, result.raw().length);
+  }
+
+  private void validateSizeAndEmpty(Result[] results, int expectedSize) {
+    // Validate got back the same number of Result objects, all empty
+    assertEquals(expectedSize, results.length);
+    for (Result result : results) {
+      validateEmpty(result);
+    }
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java?rev=991289&r1=991288&r2=991289&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java Tue Aug 31 18:53:32 2010
@@ -1,116 +0,0 @@
-/*
- * Copyright 2009 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestMultiParallelPut extends MultiRegionTable {
-  final Log LOG = LogFactory.getLog(getClass());
-  private static final byte[] VALUE = Bytes.toBytes("value");
-  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
-  private static final String FAMILY = "family";
-  private static final String TEST_TABLE = "test_table";
-  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
-
-
-  public TestMultiParallelPut() {
-    super(2, FAMILY);
-    desc = new HTableDescriptor(TEST_TABLE);
-    desc.addFamily(new HColumnDescriptor(FAMILY));
-    makeKeys();
-  }
-
-  private void makeKeys() {
-    for (byte [] k : KEYS) {
-      byte [] cp = new byte[k.length+1];
-      System.arraycopy(k, 0, cp, 0, k.length);
-      cp[k.length] = 1;
-      keys.add(cp);
-    }
-  }
-
-  List<byte[]> keys = new ArrayList<byte[]>();
-
-  public void testParallelPut() throws Exception {
-    LOG.info("Starting testParallelPut");
-    doATest(false);
-  }
-
-  public void testParallelPutWithRSAbort() throws Exception {
-    LOG.info("Starting testParallelPutWithRSAbort");
-    doATest(true);
-  }
-
-  public void doATest(boolean doAbort) throws Exception {
-    HTable table = new HTable(TEST_TABLE);
-    table.setAutoFlush(false);
-    table.setWriteBufferSize(10 * 1024 * 1024);
-    for ( byte [] k : keys ) {
-      Put put = new Put(k);
-      put.add(BYTES_FAMILY, QUALIFIER, VALUE);
-      table.put(put);
-    }
-    table.flushCommits();
-
-    if (doAbort) {
-      LOG.info("Aborting...");
-      cluster.abortRegionServer(0);
-      // try putting more keys after the abort.
-      for ( byte [] k : keys ) {
-        Put put = new Put(k);
-        put.add(BYTES_FAMILY, QUALIFIER, VALUE);
-        table.put(put);
-      }
-      table.flushCommits();
-    }
-
-    for (byte [] k : keys ) {
-      Get get = new Get(k);
-      get.addColumn(BYTES_FAMILY, QUALIFIER);
-      Result r = table.get(get);
-      assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
-      assertEquals(0,
-          Bytes.compareTo(VALUE,
-              r.getValue(BYTES_FAMILY, QUALIFIER)));
-    }
-
-    HBaseAdmin admin = new HBaseAdmin(conf);
-    ClusterStatus cs = admin.getClusterStatus();
-    int expectedServerCount = 2;
-    if (doAbort)  expectedServerCount = 1;
-    LOG.info("Clusterstatus servers count " + cs.getServers());
-    assertEquals(expectedServerCount, cs.getServers());
-    for ( HServerInfo info : cs.getServerInfo()) {
-      LOG.info("Info from clusterstatus=" + info);
-      assertTrue(info.getLoad().getNumberOfRegions() > 8);
-    }
-  }
-}
\ No newline at end of file



Mime
View raw message