hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1350105 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/rest/client/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apach...
Date Thu, 14 Jun 2012 06:54:19 GMT
Author: tedyu
Date: Thu Jun 14 06:54:18 2012
New Revision: 1350105

URL: http://svn.apache.org/viewvc?rev=1350105&view=rev
Log:
HBASE-5924 In the client code, don't wait for all the requests to be executed before resubmitting a request in error (N Keywal)

Submitted by:	N Keywal
Reviewed by:	Stack, Ted

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Thu Jun 14 06:54:18 2012
@@ -290,7 +290,9 @@ public interface HConnection extends Abo
    * processed successfully.
    * @throws IOException if there are problems talking to META. Per-item
    * exceptions are stored in the results array.
+   * @deprecated since 0.96 - Use {@link HTableInterface#batch} instead
    */
+  @Deprecated
   public void processBatch(List<? extends Row> actions, final byte[] tableName,
       ExecutorService pool, Object[] results)
       throws IOException, InterruptedException;
@@ -298,7 +300,9 @@ public interface HConnection extends Abo
   /**
    * Parameterized batch processing, allowing varying return types for different
    * {@link Row} implementations.
+   * @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead
    */
+  @Deprecated
   public <R> void processBatchCallback(List<? extends Row> list,
       byte[] tableName,
       ExecutorService pool,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Thu Jun 14 06:54:18 2012
@@ -27,16 +27,8 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -67,8 +59,6 @@ import org.apache.hadoop.hbase.ServerNam
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.AdminProtocol;
-import org.apache.hadoop.hbase.client.ClientProtocol;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -82,13 +72,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.SoftValueSortedMap;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.util.*;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@@ -1709,23 +1694,25 @@ public class HConnectionManager {
       // way around.
       final HConnection connection = this;
       return new Callable<MultiResponse>() {
-       public MultiResponse call() throws IOException {
-         ServerCallable<MultiResponse> callable =
-           new ServerCallable<MultiResponse>(connection, tableName, null) {
-             public MultiResponse call() throws IOException {
-               return ProtobufUtil.multi(server, multi);
-            }
-             @Override
-             public void connect(boolean reload) throws IOException {
-               server = connection.getClient(
-                 loc.getHostname(), loc.getPort());
-             }
-           };
-         return callable.withoutRetries();
-       }
-     };
+        public MultiResponse call() throws IOException {
+          ServerCallable<MultiResponse> callable =
+            new ServerCallable<MultiResponse>(connection, tableName, null) {
+              public MultiResponse call() throws IOException {
+                return ProtobufUtil.multi(server, multi);
+              }
+
+              @Override
+              public void connect(boolean reload) throws IOException {
+                server = connection.getClient(
+                  loc.getHostname(), loc.getPort());
+              }
+            };
+          return callable.withoutRetries();
+        }
+      };
    }
 
+
     void updateCachedLocation(HRegionLocation hrl, String hostname, int port) {
       HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port);
       synchronized (this.cachedRegionLocations) {
@@ -1741,30 +1728,20 @@ public class HConnectionManager {
       }
     }
 
-    private void updateCachedLocations(
-      UpdateHistory updateHistory, HRegionLocation hrl, Object t) {
-      updateCachedLocations(updateHistory, hrl, null, null, t);
-    }
-
-    private void updateCachedLocations(UpdateHistory updateHistory, byte[] tableName, Row row,
-      Object t) {
-      updateCachedLocations(updateHistory, null, tableName, row, t);
+    private void updateCachedLocations(byte[] tableName, Row row, Object t) {
+      updateCachedLocations(null, tableName, row, t);
     }
 
     /**
      * Update the location with the new value (if the exception is a RegionMovedException) or delete
      *  it from the cache.
-     * We need to keep an history of the modifications, because we can have first an update then a
-     *  delete. The delete would remove the update.
-     * @param updateHistory - The set used for the history
      * @param hrl - can be null. If it's the case, tableName and row should not be null
      * @param tableName - can be null if hrl is not null.
      * @param row  - can be null if hrl is not null.
      * @param exception - An object (to simplify user code) on which we will try to find a nested
      *                  or wrapped or both RegionMovedException
      */
-    private void updateCachedLocations(
-      UpdateHistory updateHistory, final HRegionLocation hrl, final byte[] tableName,
+    private void updateCachedLocations(final HRegionLocation hrl, final byte[] tableName,
       Row row, final Object exception) {
 
       if ((row == null || tableName == null) && hrl == null){
@@ -1781,15 +1758,6 @@ public class HConnectionManager {
         return;
       }
 
-      final String regionName = myLoc.getRegionInfo().getRegionNameAsString();
-
-      if (updateHistory.contains(regionName)) {
-        // Already updated/deleted => nothing to do
-        return;
-      }
-
-      updateHistory.add(regionName);
-
       final RegionMovedException rme = RegionMovedException.find(exception);
       if (rme != null) {
         LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " +
@@ -1814,11 +1782,332 @@ public class HConnectionManager {
         throw new IllegalArgumentException(
           "argument results must be the same size as argument list");
       }
-
       processBatchCallback(list, tableName, pool, results, null);
     }
 
     /**
+     * Send the queries in parallel on the different region servers. Retries on failures.
+     * If the method returns it means that there is no error, and the 'results' array will
+     * contain no exception. On error, an exception is thrown, and the 'results' array will
+     * contain results and exceptions.
+     * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
+     */
+    @Override
+    @Deprecated
+    public <R> void processBatchCallback(
+      List<? extends Row> list,
+      byte[] tableName,
+      ExecutorService pool,
+      Object[] results,
+      Batch.Callback<R> callback)
+      throws IOException, InterruptedException {
+
+      Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
+      p.processBatchCallback();
+    }
+
+
+    /**
+     * Methods and attributes to manage a batch process are grouped into this single class.
+     * This allows, by creating a Process<R> per batch process to ensure multithread safety.
+     *
+     * This code should be move to HTable once processBatchCallback is not supported anymore in
+     * the HConnection interface.
+     */
+    private static class Process<R> {
+      // Info on the queries and their context
+      private final HConnectionImplementation hci;
+      private final List<? extends Row> rows;
+      private final byte[] tableName;
+      private final ExecutorService pool;
+      private final Object[] results;
+      private final Batch.Callback<R> callback;
+
+      // Error management: these lists are filled by the errors on the final try. Indexes
+      //  are consistent, i.e. exceptions[i] matches failedActions[i] and failedAddresses[i]
+      private final List<Throwable> exceptions;
+      private final List<Row> failedActions;
+      private final List<String> failedAddresses;
+
+      // Used during the batch process
+      private final List<Action<R>> toReplay;
+      private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
+        inProgress;
+      private int curNumRetries;
+
+      // Notified when a tasks is done
+      private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
+
+      private Process(HConnectionImplementation hci, List<? extends Row> list,
+                       byte[] tableName, ExecutorService pool, Object[] results,
+                       Batch.Callback<R> callback){
+        this.hci = hci;
+        this.rows = list;
+        this.tableName = tableName;
+        this.pool = pool;
+        this.results = results;
+        this.callback = callback;
+        this.toReplay = new ArrayList<Action<R>>();
+        this.inProgress =
+          new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
+        this.exceptions = new ArrayList<Throwable>();
+        this.failedActions = new ArrayList<Row>();
+        this.failedAddresses = new ArrayList<String>();
+        this.curNumRetries = 0;
+      }
+
+
+      /**
+       * Group a list of actions per region servers, and send them. The created MultiActions are
+       *  added to the inProgress list.
+       * @param actionsList
+       * @param sleepTime - sleep time before actually executing the actions. Can be zero.
+       * @throws IOException - if we can't locate a region after multiple retries.
+       */
+      private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
+        // group per location => regions server
+        final Map<HRegionLocation, MultiAction<R>> actionsByServer =
+          new HashMap<HRegionLocation, MultiAction<R>>();
+        for (Action<R> aAction : actionsList) {
+          final Row row = aAction.getAction();
+
+          if (row != null) {
+            final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow(), true);
+            if (loc == null) {
+              throw new IOException("No location found, aborting submit.");
+            }
+
+            final byte[] regionName = loc.getRegionInfo().getRegionName();
+            MultiAction<R> actions = actionsByServer.get(loc);
+            if (actions == null) {
+              actions = new MultiAction<R>();
+              actionsByServer.put(loc, actions);
+            }
+            actions.add(regionName, aAction);
+          }
+        }
+
+        // Send the queries and add them to the inProgress list
+        for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
+          Callable<MultiResponse> callable =
+            createDelayedCallable(sleepTime, e.getKey(), e.getValue());
+          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
+            new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
+              e.getValue(), e.getKey(), this.pool.submit(callable));
+          this.inProgress.addLast(p);
+        }
+      }
+
+
+      private void addToErrorsLists(Exception ex, Row row, Triple<MultiAction<R>,
+          HRegionLocation, Future<MultiResponse>> obj) {
+        this.exceptions.add(ex);
+        this.failedActions.add(row);
+        this.failedAddresses.add(obj.getSecond().getHostnamePort());
+      }
+
+     /**
+      * Resubmit the actions which have failed, after a sleep time.
+      * @throws IOException
+      */
+      private void doRetry() throws IOException{
+          final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries);
+          submit(this.toReplay, sleepTime);
+          this.toReplay.clear();
+      }
+
+      /**
+       * Parameterized batch processing, allowing varying return types for
+       * different {@link Row} implementations.
+       * Throws an exception on error. If there are no exceptions, it means that the 'results'
+       *  array is clean.
+       */
+      private void processBatchCallback() throws IOException, InterruptedException {
+        if (this.results.length != this.rows.size()) {
+          throw new IllegalArgumentException(
+            "argument results (size="+results.length+") must be the same size as " +
+              "argument list (size="+this.rows.size()+")");
+        }
+        if (this.rows.isEmpty()) {
+          return;
+        }
+
+        // We keep the number of retry per action.
+        int[] nbRetries = new int[this.results.length];
+
+        // Build the action list. This list won't change after being created, hence the
+        //  indexes will remain constant, allowing a direct lookup.
+        final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
+        for (int i = 0; i < this.rows.size(); i++) {
+          Action<R> action = new Action<R>(this.rows.get(i), i);
+          listActions.add(action);
+        }
+
+        // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
+        submit(listActions, 0);
+
+        // LastRetry is true if, either:
+        //  we had an exception 'DoNotRetry'
+        //  we had more than numRetries for any action
+        //  In this case, we will finish the current retries but we won't start new ones.
+        boolean lastRetry = false;
+        // despite its name numRetries means number of tries. So if numRetries == 1 it means we
+        //  won't retry. And we compare vs. 2 in case someone set it to zero.
+        boolean noRetry = (hci.numRetries < 2);
+
+        // Analyze and resubmit until all actions are done successfully or failed after numRetries
+        while (!this.inProgress.isEmpty()) {
+
+          // We need the original multi action to find out what actions to replay if
+          //  we have a 'total' failure of the Future<MultiResponse>
+          // We need the HRegionLocation as we give it back if we go out of retries
+          Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
+            removeFirstDone();
+
+          // Get the answer, keep the exception if any as we will use it for the analysis
+          MultiResponse responses = null;
+          ExecutionException exception = null;
+          try {
+            responses = currentTask.getThird().get();
+          } catch (ExecutionException e) {
+            exception = e;
+          }
+
+          // Error case: no result at all for this multi action. We need to redo all actions
+          if (responses == null) {
+            for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
+              for (Action<R> action : actions) {
+                Row row = action.getAction();
+                hci.updateCachedLocations(this.tableName, row, exception);
+                if (noRetry) {
+                  addToErrorsLists(exception, row, currentTask);
+                } else {
+                  lastRetry = addToReplay(nbRetries, action);
+                }
+              }
+            }
+          } else { // Success or partial success
+            // Analyze detailed results. We can still have individual failures to be redo.
+            // two specific exceptions are managed:
+            //  - DoNotRetryIOException: we continue to retry for other actions
+            //  - RegionMovedException: we update the cache with the new region location
+            for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
+                responses.getResults().entrySet()) {
+              for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
+                Action<R> correspondingAction = listActions.get(regionResult.getFirst());
+                Object result = regionResult.getSecond();
+                this.results[correspondingAction.getOriginalIndex()] = result;
+
+                // Failure: retry if it's make sense else update the errors lists
+                if (result == null || result instanceof Throwable) {
+                  Row row = correspondingAction.getAction();
+                  hci.updateCachedLocations(this.tableName, row, result);
+                  if (result instanceof DoNotRetryIOException || noRetry) {
+                    addToErrorsLists((Exception)result, row, currentTask);
+                  } else {
+                    lastRetry = addToReplay(nbRetries, correspondingAction);
+                  }
+                } else // success
+                  if (callback != null) {
+                    this.callback.update(resultsForRS.getKey(),
+                      this.rows.get(regionResult.getFirst()).getRow(),
+                      (R) result);
+                }
+              }
+            }
+          }
+
+          // Retry all actions in toReplay then clear it.
+          if (!noRetry && !toReplay.isEmpty()) {
+            doRetry();
+            if (lastRetry) {
+              noRetry = true;
+            }
+          }
+        }
+
+        if (!exceptions.isEmpty()) {
+          throw new RetriesExhaustedWithDetailsException(this.exceptions,
+            this.failedActions,
+            this.failedAddresses);
+        }
+      }
+
+      /**
+       * Put the action that has to be retried in the Replay list.
+       * @return true if we're out of numRetries and it's the last retry.
+       */
+      private boolean addToReplay(int[] nbRetries, Action<R> action) {
+        this.toReplay.add(action);
+        nbRetries[action.getOriginalIndex()]++;
+        if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
+          this.curNumRetries = nbRetries[action.getOriginalIndex()];
+        }
+        // numRetries means number of tries, while curNumRetries means current number of retries. So
+        //  we need to add 1 to make them comparable. And as we look for the last try we compare
+        //  with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
+        //  to initialize it to 1.
+        return ( (this.curNumRetries +1) >= hci.numRetries);
+      }
+
+      /**
+       * Wait for one of tasks to be done, and remove it from the list.
+       * @return the tasks done.
+       */
+      private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
+      removeFirstDone() throws InterruptedException {
+        while (true) {
+          synchronized (finishedTasks) {
+            if (!finishedTasks.isEmpty()) {
+              MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
+
+              // We now need to remove it from the inProgress part.
+              Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
+                inProgress.iterator();
+              while (it.hasNext()) {
+                Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
+                if (task.getFirst() == done) { // We have the exact object. No java equals here.
+                  it.remove();
+                  return task;
+                }
+              }
+              LOG.error("Development error: We didn't see a task in the list. " +
+                done.getRegions());
+            }
+            finishedTasks.wait(10);
+          }
+        }
+      }
+
+      private Callable<MultiResponse> createDelayedCallable(
+        final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
+
+        final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
+
+        return new Callable<MultiResponse>() {
+          private final long creationTime = System.currentTimeMillis();
+
+          @Override
+          public MultiResponse call() throws Exception {
+            try {
+              final long waitingTime = delay + creationTime - System.currentTimeMillis();
+              if (waitingTime > 0) {
+                Thread.sleep(waitingTime);
+              }
+              return delegate.call();
+            } finally {
+              synchronized (finishedTasks) {
+                finishedTasks.add(multi);
+                finishedTasks.notifyAll();
+              }
+            }
+          }
+        };
+      }
+    }
+
+
+    /**
      * Executes the given
      * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
      * callable for each row in the
@@ -1884,182 +2173,6 @@ public class HConnectionManager {
       }
     }
 
-    private static class UpdateHistory{
-      private final Set<String> updateHistory = new HashSet<String>(100); // size: if we're doing a
-      // rolling restart we may have 100 regions with a wrong location if we're really unlucky
-
-      public boolean contains(String regionName) {
-        return updateHistory.contains(regionName);
-      }
-
-      public void add(String regionName) {
-        updateHistory.add(regionName);
-      }
-    }
-
-    /**
-     * Parameterized batch processing, allowing varying return types for
-     * different {@link Row} implementations.
-     */
-    @Override
-    public <R> void processBatchCallback(
-        List<? extends Row> list,
-        byte[] tableName,
-        ExecutorService pool,
-        Object[] results,
-        Batch.Callback<R> callback)
-    throws IOException, InterruptedException {
-      // This belongs in HTable!!! Not in here.  St.Ack
-
-      // results must be the same size as list
-      if (results.length != list.size()) {
-        throw new IllegalArgumentException(
-            "argument results must be the same size as argument list");
-      }
-      if (list.isEmpty()) {
-        return;
-      }
-
-      // Keep track of the most recent servers for any given item for better
-      // exceptional reporting.  We keep HRegionLocation to save on parsing.
-      // Later below when we use lastServers, we'll pull what we need from
-      // lastServers.
-      HRegionLocation [] lastServers = new HRegionLocation[results.length];
-      List<Row> workingList = new ArrayList<Row>(list);
-      boolean retry = true;
-      // count that helps presize actions array
-      int actionCount = 0;
-
-      for (int tries = 0; tries < numRetries && retry; ++tries) {
-        UpdateHistory updateHistory = new UpdateHistory();
-
-        // sleep first, if this is a retry
-        if (tries >= 1) {
-          long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
-          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
-          Thread.sleep(sleepTime);
-        }
-        // step 1: break up into regionserver-sized chunks and build the data structs
-        Map<HRegionLocation, MultiAction<R>> actionsByServer =
-          new HashMap<HRegionLocation, MultiAction<R>>();
-        for (int i = 0; i < workingList.size(); i++) {
-          Row row = workingList.get(i);
-          if (row != null) {
-            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
-            byte[] regionName = loc.getRegionInfo().getRegionName();
-
-            MultiAction<R> actions = actionsByServer.get(loc);
-            if (actions == null) {
-              actions = new MultiAction<R>();
-              actionsByServer.put(loc, actions);
-            }
-
-            Action<R> action = new Action<R>(row, i);
-            lastServers[i] = loc;
-            actions.add(regionName, action);
-          }
-        }
-
-        // step 2: make the requests
-
-        Map<HRegionLocation, Future<MultiResponse>> futures =
-            new HashMap<HRegionLocation, Future<MultiResponse>>(
-                actionsByServer.size());
-
-        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
-          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
-        }
-
-        // step 3: collect the failures and successes and prepare for retry
-
-        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
-             : futures.entrySet()) {
-          HRegionLocation loc = responsePerServer.getKey();
-
-          try {
-            Future<MultiResponse> future = responsePerServer.getValue();
-            MultiResponse resp = future.get();
-
-            if (resp == null) {
-              // Entire server failed
-              LOG.debug("Failed all for server: " + loc.getHostnamePort() +
-                ", removing from cache");
-              continue;
-            }
-
-            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
-              byte[] regionName = e.getKey();
-              List<Pair<Integer, Object>> regionResults = e.getValue();
-              for (Pair<Integer, Object> regionResult : regionResults) {
-                if (regionResult == null) {
-                  // if the first/only record is 'null' the entire region failed.
-                  LOG.debug("Failures for region: " +
-                      Bytes.toStringBinary(regionName) +
-                      ", removing from cache");
-                } else {
-                  // Result might be an Exception, including DNRIOE
-                  results[regionResult.getFirst()] = regionResult.getSecond();
-                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
-                    callback.update(e.getKey(),
-                        list.get(regionResult.getFirst()).getRow(),
-                        (R)regionResult.getSecond());
-                  }
-                }
-              }
-            }
-          } catch (ExecutionException e) {
-            LOG.debug("Failed all from " + loc, e);
-            updateCachedLocations(updateHistory, loc, e);
-          }
-        }
-
-        // step 4: identify failures and prep for a retry (if applicable).
-
-        // Find failures (i.e. null Result), and add them to the workingList (in
-        // order), so they can be retried.
-        retry = false;
-        workingList.clear();
-        actionCount = 0;
-        for (int i = 0; i < results.length; i++) {
-          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
-          // then retry that row. else dont.
-          if (results[i] == null ||
-              (results[i] instanceof Throwable &&
-                  !(results[i] instanceof DoNotRetryIOException))) {
-
-            retry = true;
-            actionCount++;
-            Row row = list.get(i);
-            workingList.add(row);
-            updateCachedLocations(updateHistory, tableName, row, results[i]);
-          } else {
-            if (results[i] != null && results[i] instanceof Throwable) {
-              actionCount++;
-            }
-            // add null to workingList, so the order remains consistent with the original list argument.
-            workingList.add(null);
-          }
-        }
-      }
-
-      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
-      List<Row> actions = new ArrayList<Row>(actionCount);
-      List<String> addresses = new ArrayList<String>(actionCount);
-
-      for (int i = 0 ; i < results.length; i++) {
-        if (results[i] == null || results[i] instanceof Throwable) {
-          exceptions.add((Throwable)results[i]);
-          actions.add(list.get(i));
-          addresses.add(lastServers[i].getHostnamePort());
-        }
-      }
-
-      if (!exceptions.isEmpty()) {
-        throw new RetriesExhaustedWithDetailsException(exceptions,
-            actions,
-            addresses);
-      }
-    }
 
     /*
      * Return the number of cached region for a table. It will only be called
@@ -2078,6 +2191,8 @@ public class HConnectionManager {
       }
     }
 
+
+
     /**
      * Check the region cache to see whether a region is cached yet or not.
      * Called by unit tests.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Jun 14 06:54:18 2012
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -707,22 +708,33 @@ public class HTable implements HTableInt
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  public synchronized void batch(final List<?extends Row> actions, final Object[] results)
+  public void batch(final List<?extends Row> actions, final Object[] results)
       throws InterruptedException, IOException {
-    connection.processBatch(actions, tableName, pool, results);
+    connection.processBatchCallback(actions, tableName, pool, results, null);
+  }
+
+  @Override
+  public Object[] batch(final List<? extends Row> actions)
+     throws InterruptedException, IOException {
+    Object[] results = new Object[actions.size()];
+    connection.processBatchCallback(actions, tableName, pool, results, null);
+    return results;
+  }
+
+  @Override
+  public <R> void batchCallback(
+    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException {
+    connection.processBatchCallback(actions, tableName, pool, results, callback);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  public synchronized Object[] batch(final List<? extends Row> actions) throws InterruptedException, IOException {
+  public <R> Object[] batchCallback(
+    final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
+      InterruptedException {
     Object[] results = new Object[actions.size()];
-    connection.processBatch(actions, tableName, pool, results);
+    connection.processBatchCallback(actions, tableName, pool, results, callback);
     return results;
   }
 
@@ -985,41 +997,63 @@ public class HTable implements HTableInt
    */
   @Override
   public void flushCommits() throws IOException {
+    Object[] results = new Object[writeBuffer.size()];
+    boolean success = false;
     try {
-      Object[] results = new Object[writeBuffer.size()];
-      try {
-        this.connection.processBatch(writeBuffer, tableName, pool, results);
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      } finally {
-        // mutate list so that it is empty for complete success, or contains
-        // only failed records results are returned in the same order as the
-        // requests in list walk the list backwards, so we can remove from list
-        // without impacting the indexes of earlier members
-        for (int i = results.length - 1; i>=0; i--) {
-          if (results[i] instanceof Result) {
-            // successful Puts are removed from the list here.
-            writeBuffer.remove(i);
-          }
-        }
-      }
+      this.connection.processBatch(writeBuffer, tableName, pool, results);
+      success = true;
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
     } finally {
-      if (clearBufferOnFail) {
+      // mutate list so that it is empty for complete success, or contains
+      // only failed records. Results are returned in the same order as the
+      // requests in list. Walk the list backwards, so we can remove from list
+      // without impacting the indexes of earlier members
+      currentWriteBufferSize = 0;
+      if (success || clearBufferOnFail) {
         writeBuffer.clear();
-        currentWriteBufferSize = 0;
       } else {
-        // the write buffer was adjusted by processBatchOfPuts
-        currentWriteBufferSize = 0;
-        for (Put aPut : writeBuffer) {
-          currentWriteBufferSize += aPut.heapSize();
+        for (int i = results.length - 1; i >= 0; i--) {
+          if (results[i] instanceof Result) {
+            writeBuffer.remove(i);
+          } else {
+            currentWriteBufferSize += writeBuffer.get(i).heapSize();
+          }
         }
       }
     }
   }
 
   /**
-   * {@inheritDoc}
+   * Process a mixed batch of Get, Put and Delete actions. All actions for a
+   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
+   *
+   *
+   * @param actions The collection of actions.
+   * @param results An empty array, same size as list. If an exception is thrown,
+   * you can test here for partial results, and to determine which actions
+   * processed successfully.
+   * @throws IOException if there are problems talking to META. Per-item
+   * exceptions are stored in the results array.
    */
+  public <R> void processBatchCallback(
+    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException {
+    connection.processBatchCallback(list, tableName, pool, results, callback);
+  }
+
+
+  /**
+   * Parameterized batch processing, allowing varying return types for different
+   * {@link Row} implementations.
+   */
+  public void processBatch(final List<? extends Row> list, final Object[] results)
+    throws IOException, InterruptedException {
+
+    this.processBatchCallback(list, results, null);
+  }
+
+
   @Override
   public void close() throws IOException {
     if (this.closed) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Jun 14 06:54:18 2012
@@ -107,6 +107,23 @@ public interface HTableInterface extends
   Object[] batch(final List<? extends Row> actions) throws IOException, InterruptedException;
 
   /**
+   * Same as {@link #batch(List, Object[])}, but with a callback.
+   * @since 0.96.0
+   */
+  public <R> void batchCallback(
+    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Same as {@link #batch(List)}, but with a callback.
+   * @since 0.96.0
+   */
+  public <R> Object[] batchCallback(
+    List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
+    InterruptedException;
+
+  /**
    * Extracts certain cells from a given row.
    * @param get The object that specifies what data to fetch and from which row.
    * @return The data coming from the specified row, if it exists.  If the row

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Jun 14 06:54:18 2012
@@ -501,6 +501,18 @@ public abstract class CoprocessorHost<E 
       }
 
       @Override
+      public <R> void batchCallback(List<? extends Row> actions, Object[] results,
+          Batch.Callback<R> callback) throws IOException, InterruptedException {
+        table.batchCallback(actions, results, callback);
+      }
+
+      @Override
+      public <R> Object[] batchCallback(List<? extends Row> actions,
+          Batch.Callback<R> callback) throws IOException, InterruptedException {
+        return table.batchCallback(actions, callback);
+      }
+
+      @Override
       public Result[] get(List<Get> gets) throws IOException {
         return table.get(gets);
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Thu Jun 14 06:54:18 2012
@@ -694,6 +694,18 @@ public class RemoteHTable implements HTa
   }
 
   @Override
+  public <R> void batchCallback(List<? extends Row> actions, Object[] results,
+      Batch.Callback<R> callback) throws IOException, InterruptedException {
+    throw new IOException("batchCallback not supported");
+  }
+
+  @Override
+  public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
+   throws IOException, InterruptedException {
+    throw new IOException("batchCallback not supported");
+  }
+
+  @Override
   public Result[] get(List<Get> gets) throws IOException {
     throw new IOException("get(List<Get>) not supported");
   }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java?rev=1350105&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java Thu Jun 14 06:54:18 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Utility class to manage a triple.
+ */
+public class Triple<A, B, C> {
+  private A first;
+  private B second;
+  private C third;
+
+  public Triple(A first, B second, C third) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+  }
+
+  public int hashCode() {
+    int hashFirst = (first != null ? first.hashCode() : 0);
+    int hashSecond = (second != null ? second.hashCode() : 0);
+    int hashThird = (third != null ? third.hashCode() : 0);
+
+    return (hashFirst >> 1) ^ hashSecond ^ (hashThird << 1);
+  }
+
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Triple)) {
+      return false;
+    }
+
+    Triple<?, ?, ?> otherTriple = (Triple<?, ?, ?>) obj;
+
+    if (first != otherTriple.first && (first != null && !(first.equals(otherTriple.first))))
+      return false;
+    if (second != otherTriple.second && (second != null && !(second.equals(otherTriple.second))))
+      return false;
+    if (third != otherTriple.third && (third != null && !(third.equals(otherTriple.third))))
+      return false;
+
+    return true;
+  }
+
+  public String toString() {
+    return "(" + first + ", " + second + "," + third + " )";
+  }
+
+  public A getFirst() {
+    return first;
+  }
+
+  public void setFirst(A first) {
+    this.first = first;
+  }
+
+  public B getSecond() {
+    return second;
+  }
+
+  public void setSecond(B second) {
+    this.second = second;
+  }
+
+  public C getThird() {
+    return third;
+  }
+
+  public void setThird(C third) {
+    this.third = third;
+  }
+}
+
+
+

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Thu Jun 14 06:54:18 2012
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertFal
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.SynchronousQueue;
@@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.rest.protobuf.generated.ScannerMessage;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -222,7 +222,12 @@ public class TestHCM {
       table.put(put3);
       Assert.assertFalse("Unreachable point", true);
     }catch (Throwable e){
-      LOG.info("Put done, expected exception caught: "+e.getClass());
+      LOG.info("Put done, exception caught: "+e.getClass());
+      // Now check that we have the exception we wanted
+      Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException);
+      RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException)e;
+      Assert.assertTrue(re.getNumExceptions() == 1);
+      Assert.assertTrue(Arrays.equals(re.getRow(0).getRow(), ROW));
     }
     Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
     Assert.assertEquals(
@@ -452,6 +457,7 @@ public class TestHCM {
     conn.close();
   }
 
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java Thu Jun 14 06:54:18 2012
@@ -204,6 +204,8 @@ public class TestMasterCoprocessorExcept
     String loadedCoprocessors = master.getLoadedCoprocessors();
     assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]"));
 
+
+
     // Verify that BuggyMasterObserver has been removed due to its misbehavior
     // by creating another table: should not have a problem this time.
     HTableDescriptor htd2 = new HTableDescriptor(TEST_TABLE2);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1350105&r1=1350104&r2=1350105&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Thu Jun 14 06:54:18 2012
@@ -21,7 +21,9 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 
+import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -49,47 +51,16 @@ import static org.junit.Assert.*;
  */
 @Category(MediumTests.class)
 public class TestRegionServerCoprocessorExceptionWithAbort {
-  static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
-
-  private class zkwAbortable implements Abortable {
-    @Override
-    public void abort(String why, Throwable e) {
-      throw new RuntimeException("Fatal ZK rs tracker error, why=", e);
-    }
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  };
-
-  private class RSTracker extends ZooKeeperNodeTracker {
-    public boolean regionZKNodeWasDeleted = false;
-    public String rsNode;
-    private Thread mainThread;
-
-    public RSTracker(ZooKeeperWatcher zkw, String rsNode, Thread mainThread) {
-      super(zkw, rsNode, new zkwAbortable());
-      this.rsNode = rsNode;
-      this.mainThread = mainThread;
-    }
-
-    @Override
-    public synchronized void nodeDeleted(String path) {
-      if (path.equals(rsNode)) {
-        regionZKNodeWasDeleted = true;
-        mainThread.interrupt();
-      }
-    }
-  }
-  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  static final int timeout = 30000;
+  static final Log LOG = LogFactory.getLog(TestRegionServerCoprocessorExceptionWithAbort.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final String TABLE_NAME = "observed_table";
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
     // set configure to indicate which cp should be loaded
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-        BuggyRegionObserver.class.getName());
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);  // Let's fail fast.
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName());
     conf.set("hbase.coprocessor.abortonerror", "true");
     TEST_UTIL.startMiniCluster(2);
   }
@@ -101,59 +72,31 @@ public class TestRegionServerCoprocessor
 
   @Test
   public void testExceptionFromCoprocessorDuringPut()
-      throws IOException {
+    throws IOException {
     // When we try to write to TEST_TABLE, the buggy coprocessor will
     // cause a NullPointerException, which will cause the regionserver (which
     // hosts the region we attempted to write to) to abort.
-    byte[] TEST_TABLE = Bytes.toBytes("observed_table");
+    byte[] TEST_TABLE = Bytes.toBytes(TABLE_NAME);
     byte[] TEST_FAMILY = Bytes.toBytes("aaa");
 
     HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
-    TEST_UTIL.waitUntilAllRegionsAssigned(
-        TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
+    TEST_UTIL.waitUntilAllRegionsAssigned(TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
 
     // Note which regionServer will abort (after put is attempted).
-    final HRegionServer regionServer =
-        TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);
+    final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);
 
-    // add watch so we can know when this regionserver aborted.
-    ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
-        "unittest", new zkwAbortable());
-
-    RSTracker rsTracker = new RSTracker(zkw,
-        "/hbase/rs/"+regionServer.getServerName(), Thread.currentThread());
-    rsTracker.start();
-    zkw.registerListener(rsTracker);
+    final byte[] ROW = Bytes.toBytes("aaa");
+    Put put = new Put(ROW);
+    put.add(TEST_FAMILY, ROW, ROW);
 
-    boolean caughtInterruption = false;
+    Assert.assertFalse("The region server should be available", regionServer.isAborted());
     try {
-      final byte[] ROW = Bytes.toBytes("aaa");
-      Put put = new Put(ROW);
-      put.add(TEST_FAMILY, ROW, ROW);
       table.put(put);
-    } catch (IOException e) {
-      // Depending on exact timing of the threads involved, zkw's interruption
-      // might be caught here ...
-      if (e.getCause().getClass().equals(InterruptedException.class)) {
-	LOG.debug("caught interruption here (during put()).");
-        caughtInterruption = true;
-      } else {
-        fail("put() failed: " + e);
-      }
-    }
-    if (caughtInterruption == false) {
-      try {
-        Thread.sleep(timeout);
-        fail("RegionServer did not abort within 30 seconds.");
-      } catch (InterruptedException e) {
-        // .. or it might be caught here.
-	LOG.debug("caught interruption here (during sleep()).");
-        caughtInterruption = true;
-      }
+      fail("The put should have failed, as the coprocessor is buggy");
+    } catch (IOException ignored) {
+      // Expected.
     }
-    assertTrue("Main thread caught interruption.",caughtInterruption);
-    assertTrue("RegionServer aborted on coprocessor exception, as expected.",
-        rsTracker.regionZKNodeWasDeleted);
+    Assert.assertTrue("The region server should have aborted", regionServer.isAborted());
     table.close();
   }
 
@@ -162,11 +105,9 @@ public class TestRegionServerCoprocessor
     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
                        final Put put, final WALEdit edit,
                        final boolean writeToWAL) {
-      String tableName =
-          c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
-      if (tableName.equals("observed_table")) {
-        Integer i = null;
-        i = i + 1;
+      String tableName = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
+      if (TABLE_NAME.equals(tableName)) {
+        throw new NullPointerException("Buggy coprocessor");
       }
     }
   }
@@ -175,4 +116,3 @@ public class TestRegionServerCoprocessor
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
 }
-



Mime
View raw message