hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r1033321 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/rest/client/ src/test/java/org/apache/hadoop/hbase/client/
Date Wed, 10 Nov 2010 01:54:14 GMT
Author: rawson
Date: Wed Nov 10 01:54:13 2010
New Revision: 1033321

URL: http://svn.apache.org/viewvc?rev=1033321&view=rev
Log:
HBASE-2898  MultiPut makes proper error handling impossible and leads to corrupted data


Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Nov 10 01:54:13 2010
@@ -667,6 +667,8 @@ Release 0.90.0 - Unreleased
    HBASE-3199  large response handling: some fixups and cleanups
    HBASE-3212  More testing of enable/disable uncovered base condition not in
                place; i.e. that only one enable/disable runs at a time
+   HBASE-2898  MultiPut makes proper error handling impossible and leads to 
+   	       corrupted data
 
 
   IMPROVEMENTS

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Nov 10 01:54:13
2010
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -243,45 +242,25 @@ public interface HConnection extends Abo
   /**
    * Process a mixed batch of Get, Put and Delete actions. All actions for a
    * RegionServer are forwarded in one RPC call.
-   * 
+   *
+   *
    * @param actions The collection of actions.
    * @param tableName Name of the hbase table
    * @param pool thread pool for parallel execution
    * @param results An empty array, same size as list. If an exception is thrown,
    * you can test here for partial results, and to determine which actions
    * processed successfully.
-   * @throws IOException
+   * @throws IOException if there are problems talking to META. Per-item
+   * exceptions are stored in the results array.
    */
   public void processBatch(List<Row> actions, final byte[] tableName,
-      ExecutorService pool, Result[] results)
-  throws IOException;
-
-  /**
-   * Process a batch of Puts. Does the retries.
-   * @param list A batch of Puts to process.
-   * @param tableName The name of the table
-   * @return Count of committed Puts.  On fault, < list.size().
-   * @throws IOException if a remote or network exception occurs
-   * @deprecated Use HConnectionManager::processBatch instead.
-   */
-  public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, ExecutorService
pool)
-  throws IOException;
-
-  /**
-   * Process a batch of Deletes. Does the retries.
-   * @param list A batch of Deletes to process.
-   * @param tableName The name of the table
-   * @return Count of committed Deletes. On fault, < list.size().
-   * @throws IOException if a remote or network exception occurs
-   * @deprecated Use HConnectionManager::processBatch instead.
-   */
-  public int processBatchOfDeletes(List<Delete> list, byte[] tableName, ExecutorService
pool)
-  throws IOException;
+      ExecutorService pool, Object[] results)
+      throws IOException, InterruptedException;
 
   /**
    * Process a batch of Puts.
    *
-   * @param list The collection of actions. The list is mutated: all successful Puts 
+   * @param list The collection of actions. The list is mutated: all successful Puts
    * are removed from the list.
    * @param tableName Name of the hbase table
    * @param pool Thread pool for parallel execution
@@ -289,7 +268,8 @@ public interface HConnection extends Abo
    * @deprecated Use HConnectionManager::processBatch instead.
    */
   public void processBatchOfPuts(List<Put> list,
-                                 final byte[] tableName, ExecutorService pool) throws IOException;
+                                 final byte[] tableName, ExecutorService pool)
+      throws IOException;
 
   /**
    * Enable or disable region cache prefetch for the table. It will be

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Nov
10 01:54:13 2010
@@ -1029,39 +1029,6 @@ public class HConnectionManager {
       }
     }
 
-    /**
-     * @deprecated Use HConnectionManager::processBatch instead.
-     */
-    public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName,
ExecutorService pool)
-    throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-      int count = 0;
-      for (Result r : results) {
-        if (r != null) {
-          count++;
-        }
-      }
-      return (count == list.size() ? -1 : count);
-    }
-
-    /**
-     * @deprecated Use HConnectionManager::processBatch instead.
-     */
-    public int processBatchOfDeletes(final List<Delete> list,
-      final byte[] tableName, ExecutorService pool)
-    throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-      int count = 0;
-      for (Result r : results) {
-        if (r != null) {
-          count++;
-        }
-      }
-      return (count == list.size() ? -1 : count);
-    }
-
     void close(boolean stopProxy) {
       if (master != null) {
         if (stopProxy) {
@@ -1088,28 +1055,28 @@ public class HConnectionManager {
         final HServerAddress address,
         final MultiAction multi,
         final byte [] tableName) {
-  	  final HConnection connection = this;
-  	  return new Callable<MultiResponse>() {
-  	    public MultiResponse call() throws IOException {
-  	      return getRegionServerWithoutRetries(
-  	          new ServerCallable<MultiResponse>(connection, tableName, null) {
-  	            public MultiResponse call() throws IOException {
-  	              return server.multi(multi);
-  	            }
-  	            @Override
-  	            public void instantiateServer(boolean reload) throws IOException {
-  	              server = connection.getHRegionConnection(address);
-  	            }
-  	          }
-  	      );
-  	    }
-  	  };
-  	}
+      final HConnection connection = this;
+      return new Callable<MultiResponse>() {
+        public MultiResponse call() throws IOException {
+          return getRegionServerWithoutRetries(
+              new ServerCallable<MultiResponse>(connection, tableName, null) {
+                public MultiResponse call() throws IOException {
+                  return server.multi(multi);
+                }
+                @Override
+                public void instantiateServer(boolean reload) throws IOException {
+                  server = connection.getHRegionConnection(address);
+                }
+              }
+          );
+        }
+      };
+    }
 
-    public void processBatch(List<Row> list, 
+    public void processBatch(List<Row> list,
         final byte[] tableName,
         ExecutorService pool,
-        Result[] results) throws IOException {
+        Object[] results) throws IOException, InterruptedException {
 
       // results must be the same size as list
       if (results.length != list.size()) {
@@ -1120,8 +1087,10 @@ public class HConnectionManager {
         return;
       }
 
+      // Keep track of the most recent servers for any given item for better
+      // exceptional reporting.
+      HServerAddress [] lastServers = new HServerAddress[results.length];
       List<Row> workingList = new ArrayList<Row>(list);
-      final boolean singletonList = (list.size() == 1);
       boolean retry = true;
       Throwable singleRowCause = null;
 
@@ -1131,19 +1100,13 @@ public class HConnectionManager {
         if (tries >= 1) {
           long sleepTime = getPauseTime(tries);
           LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
-          try {
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException ignore) {
-            LOG.debug("Interupted");
-            Thread.currentThread().interrupt();
-            break;
-          }
+          Thread.sleep(sleepTime);
         }
 
         // step 1: break up into regionserver-sized chunks and build the data structs
 
         Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress,
MultiAction>();
-        for (int i=0; i<workingList.size(); i++) {
+        for (int i = 0; i < workingList.size(); i++) {
           Row row = workingList.get(i);
           if (row != null) {
             HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
@@ -1157,6 +1120,7 @@ public class HConnectionManager {
             }
 
             Action action = new Action(regionName, row, i);
+            lastServers[i] = address;
             actions.add(regionName, action);
           }
         }
@@ -1176,58 +1140,50 @@ public class HConnectionManager {
           HServerAddress address = responsePerServer.getKey();
 
           try {
-            // Gather the results for one server
             Future<MultiResponse> future = responsePerServer.getValue();
-
-            // Not really sure what a reasonable timeout value is. Here's a first try.
-
             MultiResponse resp = future.get();
 
             if (resp == null) {
               // Entire server failed
               LOG.debug("Failed all for server: " + address + ", removing from cache");
-            } else {
-              // For each region
-              for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet())
{
-                byte[] regionName = e.getKey();
-                List<Pair<Integer, Result>> regionResults = e.getValue();
-                for (Pair<Integer, Result> regionResult : regionResults) {
-                  if (regionResult == null) {
-                    // if the first/only record is 'null' the entire region failed.
-                    LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName)
+ ", removing from cache");
-                  } else {
-                    // success
-                    results[regionResult.getFirst()] = regionResult.getSecond();
-                  }
+              continue;
+            }
+
+            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet())
{
+              byte[] regionName = e.getKey();
+              List<Pair<Integer, Object>> regionResults = e.getValue();
+              for (Pair<Integer, Object> regionResult : regionResults) {
+                if (regionResult == null) {
+                  // if the first/only record is 'null' the entire region failed.
+                  LOG.debug("Failures for region: " +
+                      Bytes.toStringBinary(regionName) +
+                      ", removing from cache");
+                } else {
+                  // Result might be an Exception, including DNRIOE
+                  results[regionResult.getFirst()] = regionResult.getSecond();
                 }
               }
             }
-          } catch (InterruptedException e) {
-            LOG.debug("Failed all from " + address, e);
-            Thread.currentThread().interrupt();
-            break;
           } catch (ExecutionException e) {
             LOG.debug("Failed all from " + address, e);
-
-            // Just give up, leaving the batch incomplete
-            if (e.getCause() instanceof DoNotRetryIOException) {
-              throw (DoNotRetryIOException) e.getCause();
-            }
-
-            if (singletonList) {
-              // be richer for reporting in a 1 row case.
-              singleRowCause = e.getCause();
-            }
           }
         }
 
+        // step 4: identify failures and prep for a retry (if applicable).
+
         // Find failures (i.e. null Result), and add them to the workingList (in
         // order), so they can be retried.
         retry = false;
         workingList.clear();
         for (int i = 0; i < results.length; i++) {
-          if (results[i] == null) {
+          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
+          // then retry that row. else dont.
+          if (results[i] == null ||
+              (results[i] instanceof Throwable &&
+                  !(results[i] instanceof DoNotRetryIOException))) {
+
             retry = true;
+
             Row row = list.get(i);
             workingList.add(row);
             deleteCachedLocation(tableName, row.getRow());
@@ -1238,19 +1194,31 @@ public class HConnectionManager {
         }
       }
 
-      if (Thread.currentThread().isInterrupted()) {
-        throw new IOException("Aborting attempt because of a thread interruption");
-      }
-
       if (retry) {
-        // ran out of retries and didn't successfully finish everything!
+        // Simple little check for 1 item failures.
         if (singleRowCause != null) {
           throw new IOException(singleRowCause);
-        } else {
-          throw new RetriesExhaustedException("Still had " + workingList.size()
-              + " actions left after retrying " + numRetries + " times.");
         }
       }
+
+
+      List<Throwable> exceptions = new ArrayList<Throwable>();
+      List<Row> actions = new ArrayList<Row>();
+      List<HServerAddress> addresses = new ArrayList<HServerAddress>();
+
+      for (int i = 0 ; i < results.length; i++) {
+        if (results[i] == null || results[i] instanceof Throwable) {
+          exceptions.add((Throwable)results[i]);
+          actions.add(list.get(i));
+          addresses.add(lastServers[i]);
+        }
+      }
+
+      if (!exceptions.isEmpty()) {
+        throw new RetriesExhaustedWithDetailsException(exceptions,
+            actions,
+            addresses);
+      }
     }
 
     /**
@@ -1259,16 +1227,21 @@ public class HConnectionManager {
     public void processBatchOfPuts(List<Put> list,
         final byte[] tableName,
         ExecutorService pool) throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-
-      // mutate list so that it is empty for complete success, or contains only failed records
-      // results are returned in the same order as the requests in list
-      // walk the list backwards, so we can remove from list without impacting the indexes
of earlier members
-      for (int i = results.length - 1; i>=0; i--) {
-        // if result is not null, it succeeded
-        if (results[i] != null) {
-          list.remove(i);
+      Object[] results = new Object[list.size()];
+      try {
+        processBatch((List) list, tableName, pool, results);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } finally {
+
+        // mutate list so that it is empty for complete success, or contains only failed
records
+        // results are returned in the same order as the requests in list
+        // walk the list backwards, so we can remove from list without impacting the indexes
of earlier members
+        for (int i = results.length - 1; i>=0; i--) {
+          if (results[i] instanceof Result) {
+            // successful Puts are removed from the list here.
+            list.remove(i);
+          }
         }
       }
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov 10 01:54:13
2010
@@ -553,8 +553,22 @@ public class HTable implements HTableInt
   }
 
    public Result[] get(List<Get> gets) throws IOException {
-    return batch((List) gets);
-  }
+     try {
+       Object [] r1 = batch((List)gets);
+
+       // translate.
+       Result [] results = new Result[r1.length];
+       int i=0;
+       for (Object o : r1) {
+         // batch ensures if there is a failure we get an exception instead
+         results[i++] = (Result) o;
+       }
+
+       return results;
+     } catch (InterruptedException e) {
+       throw new IOException(e);
+     }
+   }
 
   /**
    * Method that does a batch call on Deletes, Gets and Puts.  The ordering of
@@ -563,13 +577,15 @@ public class HTable implements HTableInt
    * guaranteed that the Get returns what the Put had put.
    *
    * @param actions list of Get, Put, Delete objects
-   * @param results Empty Result[], same size as actions. Provides access to partial
-   * results, in case an exception is thrown. A null in the result array means that
-   * the call for that action failed, even after retries
+   * @param results Empty Result[], same size as actions. Provides access to
+   * partial results, in case an exception is thrown. If there are any failures,
+   * there will be a null or Throwable will be in the results array, AND an
+   * exception will be thrown.
    * @throws IOException
    */
   @Override
-  public synchronized void batch(final List<Row> actions, final Result[] results) throws
IOException {
+  public synchronized void batch(final List<Row> actions, final Object[] results)
+      throws InterruptedException, IOException {
     connection.processBatch(actions, tableName, pool, results);
   }
 
@@ -582,8 +598,8 @@ public class HTable implements HTableInt
    * @throws IOException
    */
   @Override
-  public synchronized Result[] batch(final List<Row> actions) throws IOException {
-    Result[] results = new Result[actions.size()];
+  public synchronized Object[] batch(final List<Row> actions) throws InterruptedException,
IOException {
+    Object[] results = new Object[actions.size()];
     connection.processBatch(actions, tableName, pool, results);
     return results;
   }
@@ -616,20 +632,25 @@ public class HTable implements HTableInt
    * the {@code deletes} argument will contain the {@link Delete} instances
    * that have not be successfully applied.
    * @since 0.20.1
+   * @see {@link #batch(java.util.List, Object[])}
    */
   @Override
   public void delete(final List<Delete> deletes)
   throws IOException {
-    Result[] results = new Result[deletes.size()];
-    connection.processBatch((List) deletes, tableName, pool, results);
-
-    // mutate list so that it is empty for complete success, or contains only failed records
-    // results are returned in the same order as the requests in list
-    // walk the list backwards, so we can remove from list without impacting the indexes
of earlier members
-    for (int i = results.length - 1; i>=0; i--) {
-      // if result is not null, it succeeded
-      if (results[i] != null) {
-        deletes.remove(i);
+    Object[] results = new Object[deletes.size()];
+    try {
+      connection.processBatch((List) deletes, tableName, pool, results);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      // mutate list so that it is empty for complete success, or contains only failed records
+      // results are returned in the same order as the requests in list
+      // walk the list backwards, so we can remove from list without impacting the indexes
of earlier members
+      for (int i = results.length - 1; i>=0; i--) {
+        // if result is not null, it succeeded
+        if (results[i] instanceof Result) {
+          deletes.remove(i);
+        }
       }
     }
   }
@@ -806,7 +827,7 @@ public class HTable implements HTableInt
   }
 
   @Override
-  public void close() throws IOException{
+  public void close() throws IOException {
     flushCommits();
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Nov
10 01:54:13 2010
@@ -74,24 +74,25 @@ public interface HTableInterface {
    * Method that does a batch call on Deletes, Gets and Puts.
    *
    * @param actions list of Get, Put, Delete objects
-   * @param results Empty Result[], same size as actions. Provides access to partial
+   * @param results Empty Object[], same size as actions. Provides access to partial
    *                results, in case an exception is thrown. A null in the result array means
that
    *                the call for that action failed, even after retries
    * @throws IOException
    * @since 0.90.0
    */
-  void batch(final List<Row> actions, final Result[] results) throws IOException;
+  void batch(final List<Row> actions, final Object[] results) throws IOException, InterruptedException;
 
   /**
    * Method that does a batch call on Deletes, Gets and Puts.
    *
+   *
    * @param actions list of Get, Put, Delete objects
    * @return the results from the actions. A null in the return array means that
    *         the call for that action failed, even after retries
    * @throws IOException
    * @since 0.90.0
    */
-  Result[] batch(final List<Row> actions) throws IOException;
+  Object[] batch(final List<Row> actions) throws IOException, InterruptedException;
 
   /**
    * Extracts certain cells from a given row.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Wed Nov 10
01:54:13 2010
@@ -25,10 +25,14 @@ import org.apache.hadoop.hbase.io.HbaseO
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.DataInput;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -42,8 +46,8 @@ public class MultiResponse implements Wr
 
   // map of regionName to list of (Results paired to the original index for that
   // Result)
-  private Map<byte[], List<Pair<Integer, Result>>> results =
-      new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
+  private Map<byte[], List<Pair<Integer, Object>>> results =
+      new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
 
   public MultiResponse() {
   }
@@ -68,32 +72,52 @@ public class MultiResponse implements Wr
    *          (request). Second item is the Result. Result will be empty for
    *          successful Put and Delete actions.
    */
-  public void add(byte[] regionName, Pair<Integer, Result> r) {
-    List<Pair<Integer, Result>> rs = results.get(regionName);
+  public void add(byte[] regionName, Pair<Integer, Object> r) {
+    List<Pair<Integer, Object>> rs = results.get(regionName);
     if (rs == null) {
-      rs = new ArrayList<Pair<Integer, Result>>();
+      rs = new ArrayList<Pair<Integer, Object>>();
       results.put(regionName, rs);
     }
     rs.add(r);
   }
 
-  public Map<byte[], List<Pair<Integer, Result>>> getResults() {
+  public void add(byte []regionName, int originalIndex, Object resOrEx) {
+    add(regionName, new Pair<Integer,Object>(originalIndex, resOrEx));
+  }
+
+  public Map<byte[], List<Pair<Integer, Object>>> getResults() {
     return results;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(results.size());
-    for (Map.Entry<byte[], List<Pair<Integer, Result>>> e : results.entrySet())
{
+    for (Map.Entry<byte[], List<Pair<Integer, Object>>> e : results.entrySet())
{
       Bytes.writeByteArray(out, e.getKey());
-      List<Pair<Integer, Result>> lst = e.getValue();
+      List<Pair<Integer, Object>> lst = e.getValue();
       out.writeInt(lst.size());
-      for (Pair<Integer, Result> r : lst) {
+      for (Pair<Integer, Object> r : lst) {
         if (r == null) {
           out.writeInt(-1); // Cant have index -1; on other side we recognize -1 as 'null'
         } else {
           out.writeInt(r.getFirst()); // Can this can npe!?!
-          HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null);
+          Object obj = r.getSecond();
+          if (obj instanceof Throwable) {
+            out.writeBoolean(true); // true, Throwable/exception.
+
+            Throwable t = (Throwable) obj;
+            // serialize exception
+            WritableUtils.writeString(out, t.getClass().getName());
+            WritableUtils.writeString(out,
+                StringUtils.stringifyException(t));
+
+          } else {
+            out.writeBoolean(false); // no exception
+
+            if (! (obj instanceof Writable))
+              obj = null; // squash all non-writables to null.
+            HbaseObjectWritable.writeObject(out, obj, Result.class, null);
+          }
         }
       }
     }
@@ -106,15 +130,33 @@ public class MultiResponse implements Wr
     for (int i = 0; i < mapSize; i++) {
       byte[] key = Bytes.readByteArray(in);
       int listSize = in.readInt();
-      List<Pair<Integer, Result>> lst = new ArrayList<Pair<Integer, Result>>(
+      List<Pair<Integer, Object>> lst = new ArrayList<Pair<Integer, Object>>(
           listSize);
       for (int j = 0; j < listSize; j++) {
         Integer idx = in.readInt();
         if (idx == -1) {
           lst.add(null);
         } else {
-          Result r = (Result) HbaseObjectWritable.readObject(in, null);
-          lst.add(new Pair<Integer, Result>(idx, r));
+          boolean isException = in.readBoolean();
+          Object o = null;
+          if (isException) {
+            String klass = WritableUtils.readString(in);
+            String desc = WritableUtils.readString(in);
+            try {
+              // the type-unsafe insertion, but since we control what klass is..
+              Class<? extends Throwable> c = (Class<? extends Throwable>) Class.forName(klass);
+              Constructor<? extends Throwable> cn = c.getDeclaredConstructor(String.class);
+              o = cn.newInstance(desc);
+            } catch (ClassNotFoundException ignored) {
+            } catch (NoSuchMethodException ignored) {
+            } catch (InvocationTargetException ignored) {
+            } catch (InstantiationException ignored) {
+            } catch (IllegalAccessException ignored) {
+            }
+          } else {
+            o = HbaseObjectWritable.readObject(in, null);
+          }
+          lst.add(new Pair<Integer, Object>(idx, o));
         }
       }
       results.put(key, lst);

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1033321&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
Wed Nov 10 01:54:13 2010
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This subclass of {@link org.apache.hadoop.hbase.client.RetriesExhaustedException}
+ * is thrown when we have more information about which rows were causing which
+ * exceptions on what servers.  You can call {@link #mayHaveClusterIssues()}
+ * and if the result is false, you have input error problems, otherwise you
+ * may have cluster issues.  You can iterate over the causes, rows and last
+ * known server addresses via {@link #getNumExceptions()} and
+ * {@link #getCause(int)}, {@link #getRow(int)} and {@link #getAddress(int)}.
+ */
+public class RetriesExhaustedWithDetailsException extends RetriesExhaustedException {
+
+  List<Throwable> exceptions;
+  List<Row> actions;
+  List<HServerAddress> addresses;
+
+  public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
+                                              List<Row> actions,
+                                              List<HServerAddress> addresses) {
+    super("Failed " + exceptions.size() + " action" +
+        pluralize(exceptions) + ": " +
+        getDesc(exceptions,actions,addresses));
+
+    this.exceptions = exceptions;
+    this.actions = actions;
+    this.addresses = addresses;
+  }
+
+  public List<Throwable> getCauses() {
+    return exceptions;
+  }
+
+  public int getNumExceptions() {
+    return exceptions.size();
+  }
+
+  public Throwable getCause(int i) {
+    return exceptions.get(i);
+  }
+
+  public Row getRow(int i) {
+    return actions.get(i);
+  }
+
+  public HServerAddress getAddress(int i) {
+    return addresses.get(i);
+  }
+
+  public boolean mayHaveClusterIssues() {
+    boolean res = false;
+
+    // If all of the exceptions are DNRIOE not exception
+    for (Throwable t : exceptions) {
+      if ( !(t instanceof DoNotRetryIOException)) {
+        res = true;
+      }
+    }
+    return res;
+  }
+
+
+  public static String pluralize(Collection<?> c) {
+    return pluralize(c.size());
+  }
+
+  public static String pluralize(int c) {
+    return c > 1 ? "s" : "";
+  }
+
+  public static String getDesc(List<Throwable> exceptions,
+                               List<Row> actions,
+                               List<HServerAddress> addresses) {
+    String s = getDesc(classifyExs(exceptions));
+    s += "servers with issues: ";
+    Set<HServerAddress> uniqAddr = new HashSet<HServerAddress>();
+    uniqAddr.addAll(addresses);
+    for(HServerAddress addr : uniqAddr) {
+      s += addr + ", ";
+    }
+    return s;
+  }
+
+  public static Map<String, Integer> classifyExs(List<Throwable> ths) {
+    Map<String, Integer> cls = new HashMap<String, Integer>();
+    for (Throwable t : ths) {
+      String name = t.getClass().getSimpleName();
+      Integer i = cls.get(name);
+      if (i == null) {
+        i = 0;
+      }
+      i += 1;
+      cls.put(name, i);
+    }
+    return cls;
+  }
+
+  public static String getDesc(Map<String,Integer> classificaton) {
+    String s = "";
+    for (Map.Entry<String, Integer> e : classificaton.entrySet()) {
+      s += e.getKey() + ": " + e.getValue() + " time" +
+          pluralize(e.getValue()) + ", ";
+    }
+    return s;
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed
Nov 10 01:54:13 2010
@@ -46,12 +46,14 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -1884,7 +1886,7 @@ public class HRegionServer implements HR
     String lockName = String.valueOf(lockId);
     Integer rl = rowlocks.get(lockName);
     if (rl == null) {
-      throw new IOException("Invalid row lock");
+      throw new UnknownRowLockException("Invalid row lock");
     }
     this.leases.renewLease(lockName);
     return rl;
@@ -2374,7 +2376,9 @@ public class HRegionServer implements HR
   @SuppressWarnings("unchecked")
   @Override
   public MultiResponse multi(MultiAction multi) throws IOException {
+
     MultiResponse response = new MultiResponse();
+
     for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
       byte[] regionName = e.getKey();
       List<Action> actionsForRegion = e.getValue();
@@ -2382,71 +2386,81 @@ public class HRegionServer implements HR
       // end of a region, so that we don't have to try the rest of the
       // actions in the list.
       Collections.sort(actionsForRegion);
-      Row action = null;
+      Row action;
       List<Action> puts = new ArrayList<Action>();
-      try {
-        for (Action a : actionsForRegion) {
-          action = a.getAction();
-          // TODO catch exceptions so we can report them on a per-item basis.
+      for (Action a : actionsForRegion) {
+        action = a.getAction();
+        int originalIndex = a.getOriginalIndex();
+
+        try {
           if (action instanceof Delete) {
             delete(regionName, (Delete) action);
-            response.add(regionName, new Pair<Integer, Result>(
-                a.getOriginalIndex(), new Result()));
+            response.add(regionName, originalIndex, new Result());
           } else if (action instanceof Get) {
-            response.add(regionName, new Pair<Integer, Result>(
-                a.getOriginalIndex(), get(regionName, (Get) action)));
+            response.add(regionName, originalIndex, get(regionName, (Get) action));
           } else if (action instanceof Put) {
-            puts.add(a);
+            puts.add(a);  // wont throw.
           } else {
             LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
-            throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete
or Put.");
+            throw new DoNotRetryIOException("Invalid Action, row must be a Get, Delete or
Put.");
           }
+        } catch (IOException ex) {
+          response.add(regionName, originalIndex, ex);
         }
+      }
 
-        // We do the puts with result.put so we can get the batching efficiency
-        // we so need. All this data munging doesn't seem great, but at least
-        // we arent copying bytes or anything.
-        if (!puts.isEmpty()) {
+      // We do the puts with result.put so we can get the batching efficiency
+      // we so need. All this data munging doesn't seem great, but at least
+      // we arent copying bytes or anything.
+      if (!puts.isEmpty()) {
+        try {
           HRegion region = getRegion(regionName);
+
           if (!region.getRegionInfo().isMetaTable()) {
             this.cacheFlusher.reclaimMemStoreMemory();
           }
 
-          Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
-          int i = 0;
+          List<Pair<Put,Integer>> putsWithLocks =
+              Lists.newArrayListWithCapacity(puts.size());
           for (Action a : puts) {
             Put p = (Put) a.getAction();
 
-            Integer lock = getLockFromId(p.getLockId());
-            putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+            Integer lock;
+            try {
+              lock = getLockFromId(p.getLockId());
+            } catch (UnknownRowLockException ex) {
+              response.add(regionName, a.getOriginalIndex(), ex);
+              continue;
+            }
+            putsWithLocks.add(new Pair<Put, Integer>(p, lock));
           }
 
           this.requestCount.addAndGet(puts.size());
 
-          OperationStatusCode[] codes = region.put(putsWithLocks);
-          for( i = 0 ; i < codes.length ; i++) {
+          OperationStatusCode[] codes =
+              region.put(putsWithLocks.toArray(new Pair[]{}));
+
+          for( int i = 0 ; i < codes.length ; i++) {
             OperationStatusCode code = codes[i];
 
             Action theAction = puts.get(i);
-            Result result = null;
+            Object result = null;
 
             if (code == OperationStatusCode.SUCCESS) {
               result = new Result();
+            } else if (code == OperationStatusCode.BAD_FAMILY) {
+              result = new NoSuchColumnFamilyException();
             }
-            // TODO turning the alternate exception into a different result
+            // FAILURE && NOT_RUN becomes null, aka: need to run again.
 
-            response.add(regionName,
-                new Pair<Integer, Result>(
-                    theAction.getOriginalIndex(), result));
+            response.add(regionName, theAction.getOriginalIndex(), result);
+          }
+        } catch (IOException ioe) {
+          // fail all the puts with the ioe in question.
+          for (Action a: puts) {
+            response.add(regionName, a.getOriginalIndex(), ioe);
           }
         }
-      } catch (IOException ioe) {
-        if (multi.size() == 1) throw ioe;
-        LOG.debug("Exception processing " +
-          org.apache.commons.lang.StringUtils.abbreviate(action.toString(), 64) +
-          "; " + ioe.getMessage());
-        response.add(regionName,null);
-        // stop processing on this region, continue to the next.
       }
     }
     return response;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerStoppedException.java
Wed Nov 10 01:54:13 2010
@@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.DoNotRetr
 
 /**
  * Thrown by the region server when it is shutting down state.
+ *
+ * Should NEVER be thrown to HBase clients, they will abort the call chain
+ * and not retry even though regions will transition to new servers.
  */
 @SuppressWarnings("serial")
 public class RegionServerStoppedException extends DoNotRetryIOException {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Wed Nov
10 01:54:13 2010
@@ -604,12 +604,12 @@ public class RemoteHTable implements HTa
   }
 
   @Override
-  public void batch(List<Row> actions, Result[] results) throws IOException {
+  public void batch(List<Row> actions, Object[] results) throws IOException {
     throw new IOException("batch not supported");
   }
 
   @Override
-  public Result[] batch(List<Row> actions) throws IOException {
+  public Object[] batch(List<Row> actions) throws IOException {
     throw new IOException("batch not supported");
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1033321&r1=1033320&r2=1033321&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Wed Nov
10 01:54:13 2010
@@ -35,6 +35,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestMultiParallel {
   private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -137,6 +139,35 @@ public class TestMultiParallel {
     }
   }
 
+  @Test
+  public void testBadFam() throws Exception {
+    LOG.info("test=testBadFam");
+    HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
+
+    List<Row> actions = new ArrayList<Row>();
+    Put p = new Put(Bytes.toBytes("row1"));
+    p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
+    actions.add(p);
+    p = new Put(Bytes.toBytes("row2"));
+    p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
+    actions.add(p);
+
+    // row1 and row2 should be in the same region.
+
+    Object [] r = new Object[actions.size()];
+    try {
+      table.batch(actions, r);
+      fail();
+    } catch (RetriesExhaustedWithDetailsException ex) {
+      LOG.debug(ex);
+      // good!
+      assertFalse(ex.mayHaveClusterIssues());
+    }
+    assertEquals(2, r.length);
+    assertTrue(r[0] instanceof Throwable);
+    assertTrue(r[1] instanceof Result);
+  }
+
   /**
    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
    * unit tests will take an unnecessarily long time to run.
@@ -208,7 +239,7 @@ public class TestMultiParallel {
     // put multiple rows using a batch
     List<Row> puts = constructPutRequests();
 
-    Result[] results = table.batch(puts);
+    Object[] results = table.batch(puts);
     validateSizeAndEmpty(results, KEYS.length);
 
     if (true) {
@@ -228,7 +259,7 @@ public class TestMultiParallel {
 
     // Load some data
     List<Row> puts = constructPutRequests();
-    Result[] results = table.batch(puts);
+    Object[] results = table.batch(puts);
     validateSizeAndEmpty(results, KEYS.length);
 
     // Deletes
@@ -256,7 +287,7 @@ public class TestMultiParallel {
 
     // Load some data
     List<Row> puts = constructPutRequests();
-    Result[] results = table.batch(puts);
+    Object[] results = table.batch(puts);
     validateSizeAndEmpty(results, KEYS.length);
 
     // Deletes
@@ -289,7 +320,7 @@ public class TestMultiParallel {
       put.add(BYTES_FAMILY, qual, VALUE);
       puts.add(put);
     }
-    Result[] results = table.batch(puts);
+    Object[] results = table.batch(puts);
 
     // validate
     validateSizeAndEmpty(results, 100);
@@ -303,10 +334,10 @@ public class TestMultiParallel {
       gets.add(get);
     }
 
-    Result[] multiRes = table.batch(gets);
+    Object[] multiRes = table.batch(gets);
 
     int idx = 0;
-    for (Result r : multiRes) {
+    for (Object r : multiRes) {
       byte[] qual = Bytes.toBytes("column" + idx);
       validateResult(r, qual, VALUE);
       idx++;
@@ -319,7 +350,7 @@ public class TestMultiParallel {
     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
 
     // Load some data to start
-    Result[] results = table.batch(constructPutRequests());
+    Object[] results = table.batch(constructPutRequests());
     validateSizeAndEmpty(results, KEYS.length);
 
     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
@@ -383,11 +414,13 @@ public class TestMultiParallel {
 
   // // Helper methods ////
 
-  private void validateResult(Result r) {
+  private void validateResult(Object r) {
     validateResult(r, QUALIFIER, VALUE);
   }
 
-  private void validateResult(Result r, byte[] qual, byte[] val) {
+  private void validateResult(Object r1, byte[] qual, byte[] val) {
+    // TODO provide nice assert here or something.
+    Result r = (Result)r1;
     Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
     Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
   }
@@ -415,16 +448,17 @@ public class TestMultiParallel {
     }
   }
 
-  private void validateEmpty(Result result) {
+  private void validateEmpty(Object r1) {
+    Result result = (Result)r1;
     Assert.assertTrue(result != null);
     Assert.assertTrue(result.getRow() == null);
     Assert.assertEquals(0, result.raw().length);
   }
 
-  private void validateSizeAndEmpty(Result[] results, int expectedSize) {
+  private void validateSizeAndEmpty(Object[] results, int expectedSize) {
     // Validate got back the same number of Result objects, all empty
     Assert.assertEquals(expectedSize, results.length);
-    for (Result result : results) {
+    for (Object result : results) {
       validateEmpty(result);
     }
   }



Mime
View raw message