hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r991397 [3/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/or...
Date Tue, 31 Aug 2010 23:51:50 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Aug 31 23:51:44 2010
@@ -19,9 +19,28 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -30,10 +49,12 @@ import org.apache.hadoop.hbase.HRegionLo
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterAddressTracker;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
@@ -41,35 +62,13 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MetaUtils;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * A non-instantiable class that manages connections to multiple tables in
@@ -111,16 +110,15 @@ public class HConnectionManager {
       }
   };
 
-  private static final Map<String, ClientZKWatcher> ZK_WRAPPERS =
-    new HashMap<String, ClientZKWatcher>();
-
   /**
    * Get the connection object for the instance specified by the configuration
    * If no current connection exists, create a new connection for that instance
    * @param conf configuration
    * @return HConnection object for the instance specified by the configuration
+   * @throws ZooKeeperConnectionException
    */
-  public static HConnection getConnection(Configuration conf) {
+  public static HConnection getConnection(Configuration conf)
+  throws ZooKeeperConnectionException {
     TableServers connection;
     Integer key = HBaseConfiguration.hashCode(conf);
     synchronized (HBASE_INSTANCES) {
@@ -152,6 +150,7 @@ public class HConnectionManager {
   /**
    * Delete information for all connections.
    * @param stopProxy stop the proxy as well
+   * @throws IOException
    */
   public static void deleteAllConnections(boolean stopProxy) {
     synchronized (HBASE_INSTANCES) {
@@ -161,100 +160,16 @@ public class HConnectionManager {
         }
       }
     }
-    synchronized (ZK_WRAPPERS) {
-      for (ClientZKWatcher watch : ZK_WRAPPERS.values()) {
-        watch.resetZooKeeper();
-      }
-    }
-  }
-
-  /**
-   * Get a watcher of a zookeeper connection for a given quorum address.
-   * If the connection isn't established, a new one is created.
-   * This acts like a multiton.
-   * @param conf configuration
-   * @return ZKW watcher
-   * @throws IOException if a remote or network exception occurs
-   */
-  public static synchronized ClientZKWatcher getClientZooKeeperWatcher(
-      Configuration conf) throws IOException {
-    if (!ZK_WRAPPERS.containsKey(
-        ZooKeeperWrapper.getZookeeperClusterKey(conf))) {
-      ZK_WRAPPERS.put(ZooKeeperWrapper.getZookeeperClusterKey(conf),
-          new ClientZKWatcher(conf));
-    }
-    return ZK_WRAPPERS.get(ZooKeeperWrapper.getZookeeperClusterKey(conf));
-  }
-
-  /**
-   * This class is responsible to handle connection and reconnection
-   * to a zookeeper quorum.
-   *
-   */
-  public static class ClientZKWatcher implements Watcher {
-
-    static final Log LOG = LogFactory.getLog(ClientZKWatcher.class);
-    private ZooKeeperWrapper zooKeeperWrapper;
-    private Configuration conf;
-
-    /**
-     * Takes a configuration to pass it to ZKW but won't instanciate it
-     * @param conf configuration
-     */
-    public ClientZKWatcher(Configuration conf) {
-      this.conf = conf;
-    }
-
-    /**
-     * Called by ZooKeeper when an event occurs on our connection. We use this to
-     * detect our session expiring. When our session expires, we have lost our
-     * connection to ZooKeeper. Our handle is dead, and we need to recreate it.
-     *
-     * See http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions
-     * for more information.
-     *
-     * @param event WatchedEvent witnessed by ZooKeeper.
-     */
-    public void process(final WatchedEvent event) {
-      final KeeperState state = event.getState();
-      if (!state.equals(KeeperState.SyncConnected)) {
-        LOG.warn("No longer connected to ZooKeeper, current state: " + state);
-        resetZooKeeper();
-      }
-    }
-
-    /**
-     * Get this watcher's ZKW, instantiate it if necessary.
-     * @return ZKW
-     * @throws java.io.IOException if a remote or network exception occurs
-     */
-    public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
-      if (zooKeeperWrapper == null) {
-        zooKeeperWrapper =
-            ZooKeeperWrapper.createInstance(conf, HConnectionManager.class.getName());
-        zooKeeperWrapper.registerListener(this);
-      }
-      return zooKeeperWrapper;
-    }
-
-    /**
-     * Clear this connection to zookeeper.
-     */
-    private synchronized void resetZooKeeper() {
-      if (zooKeeperWrapper != null) {
-        zooKeeperWrapper.close();
-        zooKeeperWrapper = null;
-      }
-    }
   }
 
   /**
    * It is provided for unit test cases which verify the behavior of region
    * location cache prefetch.
    * @return Number of cached regions for the table.
+   * @throws ZooKeeperConnectionException
    */
   static int getCachedRegionCount(Configuration conf,
-      byte[] tableName) {
+      byte[] tableName) throws ZooKeeperConnectionException {
     TableServers connection = (TableServers)getConnection(conf);
     return connection.getNumberOfCachedRegionLocations(tableName);
   }
@@ -263,15 +178,16 @@ public class HConnectionManager {
    * It's provided for unit test cases which verify the behavior of region
    * location cache prefetch.
    * @return true if the region where the table and row reside is cached.
+   * @throws ZooKeeperConnectionException
    */
   static boolean isRegionCached(Configuration conf,
-      byte[] tableName, byte[] row) {
+      byte[] tableName, byte[] row) throws ZooKeeperConnectionException {
     TableServers connection = (TableServers)getConnection(conf);
     return connection.isRegionCached(tableName, row);
   }
 
   /* Encapsulates finding the servers for an HBase instance */
-  static class TableServers implements ServerConnection {
+  static class TableServers implements ServerConnection, Abortable {
     static final Log LOG = LogFactory.getLog(TableServers.class);
     private final Class<? extends HRegionInterface> serverInterfaceClass;
     private final long pause;
@@ -284,6 +200,10 @@ public class HConnectionManager {
     private volatile boolean closed;
     private volatile HMasterInterface master;
     private volatile boolean masterChecked;
+    // ZooKeeper reference
+    private ZooKeeperWatcher zooKeeper;
+    // ZooKeeper-based master address tracker
+    private MasterAddressTracker masterAddressTracker;
 
     private final Object rootRegionLock = new Object();
     private final Object metaRegionLock = new Object();
@@ -312,7 +232,8 @@ public class HConnectionManager {
      * @param conf Configuration object
      */
     @SuppressWarnings("unchecked")
-    public TableServers(Configuration conf) {
+    public TableServers(Configuration conf)
+    throws ZooKeeperConnectionException {
       this.conf = conf;
 
       String serverClassName =
@@ -340,14 +261,21 @@ public class HConnectionManager {
       this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
           10);
 
+      // initialize zookeeper and master address manager
+      getZooKeeperWatcher();
+      masterAddressTracker = new MasterAddressTracker(zooKeeper, this);
+      zooKeeper.registerListener(masterAddressTracker);
+      masterAddressTracker.start();
+
       this.master = null;
       this.masterChecked = false;
     }
 
     private long getPauseTime(int tries) {
       int ntries = tries;
-      if (ntries >= HConstants.RETRY_BACKOFF.length)
+      if (ntries >= HConstants.RETRY_BACKOFF.length) {
         ntries = HConstants.RETRY_BACKOFF.length - 1;
+      }
       return this.pause * HConstants.RETRY_BACKOFF[ntries];
     }
 
@@ -365,12 +293,14 @@ public class HConnectionManager {
       this.rootRegionLocation = rootRegion;
     }
 
-    public HMasterInterface getMaster() throws MasterNotRunningException {
-      ZooKeeperWrapper zk;
-      try {
-        zk = getZooKeeperWrapper();
-      } catch (IOException e) {
-        throw new MasterNotRunningException(e);
+    public HMasterInterface getMaster()
+    throws MasterNotRunningException, ZooKeeperConnectionException {
+
+      // Check if we already have a good master connection
+      if (master != null) {
+        if(master.isMasterRunning()) {
+          return master;
+        }
       }
 
       HServerAddress masterLocation = null;
@@ -382,7 +312,11 @@ public class HConnectionManager {
         tries++) {
 
           try {
-            masterLocation = zk.readMasterAddressOrThrow();
+            masterLocation = masterAddressTracker.getMasterAddress();
+            if(masterLocation == null) {
+              LOG.info("ZooKeeper available but no active master location found");
+              throw new MasterNotRunningException();
+            }
 
             HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
                 HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
@@ -424,20 +358,20 @@ public class HConnectionManager {
       return this.master;
     }
 
-    public boolean isMasterRunning() {
+    public boolean isMasterRunning()
+    throws MasterNotRunningException, ZooKeeperConnectionException {
       if (this.master == null) {
-        try {
-          getMaster();
-
-        } catch (MasterNotRunningException e) {
-          return false;
-        }
+        getMaster();
+      }
+      boolean isRunning = master.isMasterRunning();
+      if(isRunning) {
+        return true;
       }
-      return true;
+      throw new MasterNotRunningException();
     }
 
     public boolean tableExists(final byte [] tableName)
-    throws MasterNotRunningException {
+    throws MasterNotRunningException, ZooKeeperConnectionException {
       getMaster();
       if (tableName == null) {
         throw new IllegalArgumentException("Table name cannot be null");
@@ -537,15 +471,11 @@ public class HConnectionManager {
     }
 
     /*
-     * If online == true
-     *   Returns true if all regions are online
-     *   Returns false in any other case
-     * If online == false
-     *   Returns true if all regions are offline
-     *   Returns false in any other case
+     * @param True if table is online
      */
     private boolean testTableOnlineState(byte[] tableName, boolean online)
     throws IOException {
+      // TODO: Replace w/ CatalogTracker-based tableExists test.
       if (!tableExists(tableName)) {
         throw new TableNotFoundException(Bytes.toString(tableName));
       }
@@ -553,53 +483,14 @@ public class HConnectionManager {
         // The root region is always enabled
         return true;
       }
-      int rowsScanned = 0;
-      int rowsOffline = 0;
-      byte[] startKey =
-        HRegionInfo.createRegionName(tableName, null, HConstants.ZEROES, false);
-      byte[] endKey;
-      HRegionInfo currentRegion;
-      Scan scan = new Scan(startKey);
-      scan.addColumn(HConstants.CATALOG_FAMILY,
-          HConstants.REGIONINFO_QUALIFIER);
-      int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
-      scan.setCaching(rows);
-      ScannerCallable s = new ScannerCallable(this,
-          (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
-              HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan);
       try {
-        // Open scanner
-        getRegionServerWithRetries(s);
-        do {
-          currentRegion = s.getHRegionInfo();
-          Result r;
-          Result [] rrs;
-          while ((rrs = getRegionServerWithRetries(s)) != null && rrs.length > 0) {
-            r = rrs[0];
-            byte [] value = r.getValue(HConstants.CATALOG_FAMILY,
-              HConstants.REGIONINFO_QUALIFIER);
-            if (value != null) {
-              HRegionInfo info = Writables.getHRegionInfoOrNull(value);
-              if (info != null) {
-                if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
-                  rowsScanned += 1;
-                  rowsOffline += info.isOffline() ? 1 : 0;
-                }
-              }
-            }
-          }
-          endKey = currentRegion.getEndKey();
-        } while (!(endKey == null ||
-            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)));
-      } finally {
-        s.setClose();
-        // Doing below will call 'next' again and this will close the scanner
-        // Without it we leave scanners open.
-        getRegionServerWithRetries(s);
-      }
-      LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
-      boolean onOffLine = online? rowsOffline == 0: rowsOffline == rowsScanned;
-      return rowsScanned > 0 && onOffLine;
+        List<String> tables = ZKTableDisable.getDisabledTables(this.zooKeeper);
+        String searchStr = Bytes.toString(tableName);
+        boolean disabled = tables.contains(searchStr);
+        return online? !disabled: disabled;
+      } catch (KeeperException e) {
+        throw new IOException("Failed listing disabled tables", e);
+      }
     }
 
     private static class HTableDescriptorFinder
@@ -642,6 +533,20 @@ public class HConnectionManager {
       return result;
     }
 
+    @Override
+    public HRegionLocation locateRegion(final byte [] regionName)
+    throws IOException {
+      // TODO implement.  use old stuff or new stuff?
+      return null;
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final byte [] tableName)
+    throws IOException {
+      // TODO implement.  use old stuff or new stuff?
+      return null;
+    }
+
     public HRegionLocation locateRegion(final byte [] tableName,
         final byte [] row)
     throws IOException{
@@ -950,8 +855,7 @@ public class HConnectionManager {
      * Delete a cached location, if it satisfies the table name and row
      * requirements.
      */
-    void deleteCachedLocation(final byte [] tableName,
-                                      final byte [] row) {
+    void deleteCachedLocation(final byte [] tableName, final byte [] row) {
       synchronized (this.cachedRegionLocations) {
         SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
             getTableLocations(tableName);
@@ -998,7 +902,7 @@ public class HConnectionManager {
      * Allows flushing the region cache.
      */
     public void clearRegionCache() {
-      cachedRegionLocations.clear();
+     cachedRegionLocations.clear();
     }
 
     /*
@@ -1033,6 +937,7 @@ public class HConnectionManager {
                 regionServer.getInetSocketAddress(), this.conf,
                 this.maxRPCAttempts, this.rpcTimeout);
           } catch (RemoteException e) {
+            LOG.warn("Remove exception connecting to RS", e);
             throw RemoteExceptionHandler.decodeRemoteException(e);
           }
           this.servers.put(regionServer.toString(), server);
@@ -1047,13 +952,27 @@ public class HConnectionManager {
       return getHRegionConnection(regionServer, false);
     }
 
-    public synchronized ZooKeeperWrapper getZooKeeperWrapper()
-        throws IOException {
-      return HConnectionManager.getClientZooKeeperWatcher(conf)
-          .getZooKeeperWrapper();
+    /**
+     * Get the ZooKeeper instance for this TableServers instance.
+     *
+     * If ZK has not been initialized yet, this will connect to ZK.
+     * @returns zookeeper reference
+     * @throws ZooKeeperConncetionException if there's a problem connecting to zk
+     */
+    public synchronized ZooKeeperWatcher getZooKeeperWatcher()
+        throws ZooKeeperConnectionException {
+      if(zooKeeper == null) {
+        try {
+          zooKeeper = new ZooKeeperWatcher(conf,
+              ZKUtil.getZooKeeperClusterKey(conf), this);
+        } catch (IOException e) {
+          throw new ZooKeeperConnectionException(e);
+        }
+      }
+      return zooKeeper;
     }
 
-    /*
+    /**
      * Repeatedly try to find the root region in ZK
      * @return HRegionLocation for root region if found
      * @throws NoServerForRegionException - if the root region can not be
@@ -1065,7 +984,12 @@ public class HConnectionManager {
 
       // We lazily instantiate the ZooKeeper object because we don't want to
       // make the constructor have to throw IOException or handle it itself.
-      ZooKeeperWrapper zk = getZooKeeperWrapper();
+      ZooKeeperWatcher zk;
+      try {
+        zk = getZooKeeperWatcher();
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException(e);
+      }
 
       HServerAddress rootRegionAddress = null;
       for (int tries = 0; tries < numRetries; tries++) {
@@ -1074,7 +998,13 @@ public class HConnectionManager {
         while (rootRegionAddress == null && localTimeouts < numRetries) {
           // Don't read root region until we're out of safe mode so we know
           // that the meta regions have been assigned.
-          rootRegionAddress = zk.readRootRegionLocation();
+          try {
+            rootRegionAddress = ZKUtil.getDataAsAddress(zk, zk.rootServerZNode);
+          } catch (KeeperException e) {
+            LOG.error("Unexpected ZooKeeper error attempting to read the root " +
+                "region server address");
+            throw new IOException(e);
+          }
           if (rootRegionAddress == null) {
             try {
               if (LOG.isDebugEnabled()) {
@@ -1214,39 +1144,177 @@ public class HConnectionManager {
       return location;
     }
 
-    /**
-     * @deprecated Use HConnectionManager::processBatch instead.
+    /*
+     * Helper class for batch updates.
+     * Holds code shared doing batch puts and batch deletes.
      */
-    public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName, ExecutorService pool)
-    throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-      int count = 0;
-      for (Result r : results) {
-        if (r != null) {
-          count++;
+    private abstract class Batch {
+      final HConnection c;
+
+      private Batch(final HConnection c) {
+        this.c = c;
+      }
+
+      /**
+       * This is the method subclasses must implement.
+       * @param currentList current list of rows
+       * @param tableName table we are processing
+       * @param row row
+       * @return Count of items processed or -1 if all.
+       * @throws IOException if a remote or network exception occurs
+       * @throws RuntimeException other undefined exception
+       */
+      abstract int doCall(final List<? extends Row> currentList,
+        final byte [] row, final byte [] tableName)
+      throws IOException, RuntimeException;
+
+      /**
+       * Process the passed <code>list</code>.
+       * @param list list of rows to process
+       * @param tableName table we are processing
+       * @return Count of how many added or -1 if all added.
+       * @throws IOException if a remote or network exception occurs
+       */
+      int process(final List<? extends Row> list, final byte[] tableName)
+      throws IOException {
+        byte [] region = getRegionName(tableName, list.get(0).getRow(), false);
+        byte [] currentRegion = region;
+        boolean isLastRow;
+        boolean retryOnlyOne = false;
+        List<Row> currentList = new ArrayList<Row>();
+        int i, tries;
+        for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) {
+          Row row = list.get(i);
+          currentList.add(row);
+          // If the next record goes to a new region, then we are to clear
+          // currentList now during this cycle.
+          isLastRow = (i + 1) == list.size();
+          if (!isLastRow) {
+            region = getRegionName(tableName, list.get(i + 1).getRow(), false);
+          }
+          if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) {
+            int index = doCall(currentList, row.getRow(), tableName);
+            // index is == -1 if all processed successfully, else its index
+            // of last record successfully processed.
+            if (index != -1) {
+              if (tries == numRetries - 1) {
+                throw new RetriesExhaustedException("Some server, retryOnlyOne=" +
+                  retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow +
+                  ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i +
+                  ", listsize=" + list.size() + ", region=" +
+                  Bytes.toStringBinary(region), currentRegion, row.getRow(),
+                  tries, new ArrayList<Throwable>());
+              }
+              tries = doBatchPause(currentRegion, tries);
+              i = i - currentList.size() + index;
+              retryOnlyOne = true;
+              // Reload location.
+              region = getRegionName(tableName, list.get(i + 1).getRow(), true);
+            } else {
+              // Reset these flags/counters on successful batch Put
+              retryOnlyOne = false;
+              tries = 0;
+            }
+            currentRegion = region;
+            currentList.clear();
+          }
         }
+        return i;
+      }
+
+      /*
+       * @param t
+       * @param r
+       * @param re
+       * @return Region name that holds passed row <code>r</code>
+       * @throws IOException
+       */
+      private byte [] getRegionName(final byte [] t, final byte [] r,
+        final boolean re)
+      throws IOException {
+        HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re);
+        return location.getRegionInfo().getRegionName();
+      }
+
+      /*
+       * Do pause processing before retrying...
+       * @param currentRegion
+       * @param tries
+       * @return New value for tries.
+       */
+      private int doBatchPause(final byte [] currentRegion, final int tries) {
+        int localTries = tries;
+        long sleepTime = getPauseTime(tries);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
+            " location because regionserver didn't accept updates; tries=" +
+            tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms");
+        }
+        try {
+          Thread.sleep(sleepTime);
+          localTries++;
+        } catch (InterruptedException e) {
+          // continue
+        }
+        return localTries;
       }
-      return (count == list.size() ? -1 : count);
     }
 
-    /**
-     * @deprecated Use HConnectionManager::processBatch instead.
-     */
-    public int processBatchOfDeletes(final List<Delete> list,
-      final byte[] tableName, ExecutorService pool)
+    public int processBatchOfRows(final ArrayList<Put> list,
+      final byte[] tableName)
     throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-      int count = 0;
-      for (Result r : results) {
-        if (r != null) {
-          count++;
-        }
+      if (list.isEmpty()) {
+        return 0;
+      }
+      if (list.size() > 1) {
+        Collections.sort(list);
       }
-      return (count == list.size() ? -1 : count);
+      Batch b = new Batch(this) {
+        @SuppressWarnings("unchecked")
+        @Override
+        int doCall(final List<? extends Row> currentList, final byte [] row,
+          final byte [] tableName)
+        throws IOException, RuntimeException {
+          final List<Put> puts = (List<Put>)currentList;
+          return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
+              tableName, row) {
+            public Integer call() throws IOException {
+              return server.put(location.getRegionInfo().getRegionName(), puts);
+            }
+          });
+        }
+      };
+      return b.process(list, tableName);
     }
 
+    public int processBatchOfDeletes(final List<Delete> list,
+      final byte[] tableName)
+    throws IOException {
+      if (list.isEmpty()) {
+        return 0;
+      }
+      if (list.size() > 1) {
+        Collections.sort(list);
+      }
+      Batch b = new Batch(this) {
+        @SuppressWarnings("unchecked")
+        @Override
+        int doCall(final List<? extends Row> currentList, final byte [] row,
+          final byte [] tableName)
+        throws IOException, RuntimeException {
+          final List<Delete> deletes = (List<Delete>)currentList;
+          return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
+                tableName, row) {
+              public Integer call() throws IOException {
+                return server.delete(location.getRegionInfo().getRegionName(),
+                  deletes);
+              }
+            });
+          }
+        };
+        return b.process(list, tableName);
+      }
+
     void close(boolean stopProxy) {
       if (master != null) {
         if (stopProxy) {
@@ -1262,133 +1330,108 @@ public class HConnectionManager {
       }
     }
 
-    private Callable<MultiResponse> createCallable(
-        final HServerAddress address,
-        final MultiAction multi,
-        final byte [] tableName) {
-  	  final HConnection connection = this;
-  	  return new Callable<MultiResponse>() {
-  	    public MultiResponse call() throws IOException {
-  	      return getRegionServerWithoutRetries(
-  	          new ServerCallable<MultiResponse>(connection, tableName, null) {
-  	            public MultiResponse call() throws IOException {
-  	              return server.multi(multi);
-  	            }
-  	            @Override
-  	            public void instantiateServer(boolean reload) throws IOException {
-  	              server = connection.getHRegionConnection(address);
-  	            }
-  	          }
-  	      );
-  	    }
-  	  };
-  	}
-
-    public void processBatch(List<Row> list,
-        final byte[] tableName,
-        ExecutorService pool,
-        Result[] results) throws IOException {
-
-      // results must be the same size as list
-      if (results.length != list.size()) {
-        throw new IllegalArgumentException("argument results must be the same size as argument list");
-      }
-
-      if (list.size() == 0) {
-        return;
-      }
-
-      List<Row> workingList = new ArrayList<Row>(list);
-      final boolean singletonList = (list.size() == 1);
-      boolean retry = true;
+    /**
+     * Process a batch of Puts on the given executor service.
+     *
+     * @param list the puts to make - successful puts will be removed.
+     * @param pool thread pool to execute requests on
+     *
+     * In the case of an exception, we take different actions depending on the
+     * situation:
+     *  - If the exception is a DoNotRetryException, we rethrow it and leave the
+     *    'list' parameter in an indeterminate state.
+     *  - If the 'list' parameter is a singleton, we directly throw the specific
+     *    exception for that put.
+     *  - Otherwise, we throw a generic exception indicating that an error occurred.
+     *    The 'list' parameter is mutated to contain those puts that did not succeed.
+     */
+    public void processBatchOfPuts(List<Put> list,
+                                   final byte[] tableName, ExecutorService pool) throws IOException {
+      boolean singletonList = list.size() == 1;
       Throwable singleRowCause = null;
-
-      for (int tries = 0; tries < numRetries && retry; ++tries) {
-
-        // sleep first, if this is a retry
-        if (tries >= 1) {
-          long sleepTime = getPauseTime(tries);
-          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
-          try {
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException ignore) {
-            LOG.debug("Interupted");
-            Thread.currentThread().interrupt();
-            break;
-          }
-        }
-
-        // step 1: break up into regionserver-sized chunks and build the data structs
-
-        Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
-        for (int i=0; i<workingList.size(); i++) {
-          Row row = workingList.get(i);
-          if (row != null) {
-            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
-            HServerAddress address = loc.getServerAddress();
-            byte[] regionName = loc.getRegionInfo().getRegionName();
-
-            MultiAction actions = actionsByServer.get(address);
-            if (actions == null) {
-              actions = new MultiAction();
-              actionsByServer.put(address, actions);
-            }
-
-            Action action = new Action(regionName, row, i);
-            actions.add(regionName, action);
-          }
-        }
-
-        // step 2: make the requests
-
-        Map<HServerAddress,Future<MultiResponse>> futures =
-            new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
-
-        for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
-          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
-        }
-
-        // step 3: collect the failures and successes and prepare for retry
-
-        for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
-          HServerAddress address = responsePerServer.getKey();
-
+      for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
+        Collections.sort(list);
+        Map<HServerAddress, MultiPut> regionPuts =
+            new HashMap<HServerAddress, MultiPut>();
+        // step 1:
+        //  break up into regionserver-sized chunks and build the data structs
+        for ( Put put : list ) {
+          byte [] row = put.getRow();
+
+          HRegionLocation loc = locateRegion(tableName, row, true);
+          HServerAddress address = loc.getServerAddress();
+          byte [] regionName = loc.getRegionInfo().getRegionName();
+
+          MultiPut mput = regionPuts.get(address);
+          if (mput == null) {
+            mput = new MultiPut(address);
+            regionPuts.put(address, mput);
+          }
+          mput.add(regionName, put);
+        }
+
+        // step 2:
+        //  make the requests
+        // Discard the map, just use a list now, makes error recovery easier.
+        List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
+
+        List<Future<MultiPutResponse>> futures =
+            new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
+        for ( MultiPut put : multiPuts ) {
+          futures.add(pool.submit(createPutCallable(put.address,
+              put,
+              tableName)));
+        }
+        // RUN!
+        List<Put> failed = new ArrayList<Put>();
+
+        // step 3:
+        //  collect the failures and tries from step 1.
+        for (int i = 0; i < futures.size(); i++ ) {
+          Future<MultiPutResponse> future = futures.get(i);
+          MultiPut request = multiPuts.get(i);
           try {
-            // Gather the results for one server
-            Future<MultiResponse> future = responsePerServer.getValue();
-
-            // Not really sure what a reasonable timeout value is. Here's a first try.
-
-            MultiResponse resp = future.get();
+            MultiPutResponse resp = future.get();
 
-            if (resp == null) {
-              // Entire server failed
-              LOG.debug("Failed all for server: " + address + ", removing from cache");
-            } else {
-              // For each region
-              for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet()) {
-                byte[] regionName = e.getKey();
-                List<Pair<Integer, Result>> regionResults = e.getValue();
-                for (int i = 0; i < regionResults.size(); i++) {
-                  Pair<Integer, Result> regionResult = regionResults.get(i);
-                  if (regionResult.getSecond() == null) {
-                    // failed
-                    LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
-                  } else {
-                    // success
-                    results[regionResult.getFirst()] = regionResult.getSecond();
-                  }
-                }
+            // For each region
+            for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
+              Integer result = resp.getAnswer(e.getKey());
+              if (result == null) {
+                // failed
+                LOG.debug("Failed all for region: " +
+                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
+                failed.addAll(e.getValue());
+              } else if (result >= 0) {
+                // some failures
+                List<Put> lst = e.getValue();
+                failed.addAll(lst.subList(result, lst.size()));
+                LOG.debug("Failed past " + result + " for region: " +
+                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
               }
             }
           } catch (InterruptedException e) {
-            LOG.debug("Failed all from " + address, e);
-            Thread.currentThread().interrupt();
-            break;
+            // go into the failed list.
+            LOG.debug("Failed all from " + request.address, e);
+            failed.addAll(request.allPuts());
           } catch (ExecutionException e) {
-            LOG.debug("Failed all from " + address, e);
+            Throwable cause = e.getCause();
+            // Don't print stack trace if NSRE; NSRE is 'normal' operation.
+            if (cause instanceof NotServingRegionException) {
+              String msg = cause.getMessage();
+              if (msg != null && msg.length() > 0) {
+                // msg is the exception as a String... we just want first line.
+                msg = msg.split("[\\n\\r]+\\s*at")[0];
+              }
+              LOG.debug("Failed execution of all on " + request.address +
+                " because: " + msg);
+            } else {
+              // all go into the failed list.
+              LOG.debug("Failed execution of all on " + request.address,
+                e.getCause());
+            }
+            failed.addAll(request.allPuts());
 
-            // Just give up, leaving the batch incomplete
+            // Just give up, leaving the batch put list in an untouched/semi-committed state
             if (e.getCause() instanceof DoNotRetryIOException) {
               throw (DoNotRetryIOException) e.getCause();
             }
@@ -1399,57 +1442,56 @@ public class HConnectionManager {
             }
           }
         }
+        list.clear();
+        if (!failed.isEmpty()) {
+          for (Put failedPut: failed) {
+            deleteCachedLocation(tableName, failedPut.getRow());
+          }
 
-        // Find failures (i.e. null Result), and add them to the workingList (in
-        // order), so they can be retried.
-        retry = false;
-        workingList.clear();
-        for (int i = 0; i < results.length; i++) {
-          if (results[i] == null) {
-            retry = true;
-            Row row = list.get(i);
-            workingList.add(row);
-            deleteCachedLocation(tableName, row.getRow());
-          } else {
-            // add null to workingList, so the order remains consistent with the original list argument.
-            workingList.add(null);
+          list.addAll(failed);
+
+          long sleepTime = getPauseTime(tries);
+          LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
+              " ms!");
+          try {
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException ignored) {
           }
         }
       }
-
-      if (Thread.currentThread().isInterrupted()) {
-        throw new IOException("Aborting attempt because of a thread interruption");
-      }
-
-      if (retry) {
-        // ran out of retries and didn't successfully finish everything!
-        if (singleRowCause != null) {
+      if (!list.isEmpty()) {
+        if (singletonList && singleRowCause != null) {
           throw new IOException(singleRowCause);
-        } else {
-          throw new RetriesExhaustedException("Still had " + workingList.size()
-              + " actions left after retrying " + numRetries + " times.");
         }
+
+        // ran out of retries and didnt succeed everything!
+        throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
+            numRetries + " times.");
       }
     }
 
-    /**
-     * @deprecated Use HConnectionManager::processBatch instead.
-     */
-    public void processBatchOfPuts(List<Put> list,
-        final byte[] tableName,
-        ExecutorService pool) throws IOException {
-      Result[] results = new Result[list.size()];
-      processBatch((List) list, tableName, pool, results);
-
-      // mutate list so that it is empty for complete success, or contains only failed records
-      // results are returned in the same order as the requests in list
-      // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
-      for (int i = results.length - 1; i>=0; i--) {
-        // if result is not null, it succeeded
-        if (results[i] != null) {
-          list.remove(i);
+
+    private Callable<MultiPutResponse> createPutCallable(
+        final HServerAddress address, final MultiPut puts,
+        final byte [] tableName) {
+      final HConnection connection = this;
+      return new Callable<MultiPutResponse>() {
+        public MultiPutResponse call() throws IOException {
+          return getRegionServerWithoutRetries(
+              new ServerCallable<MultiPutResponse>(connection, tableName, null) {
+                public MultiPutResponse call() throws IOException {
+                  MultiPutResponse resp = server.multiPut(puts);
+                  resp.request = puts;
+                  return resp;
+                }
+                @Override
+                public void instantiateServer(boolean reload) throws IOException {
+                  server = connection.getHRegionConnection(address);
+                }
+              }
+          );
         }
-      }
+      };
     }
 
     private Throwable translateException(Throwable t) throws IOException {
@@ -1515,5 +1557,15 @@ public class HConnectionManager {
             new HRegionLocation(e.getKey(), e.getValue()));
       }
     }
+
+    @Override
+    public void abort(final String msg, Throwable t) {
+      if (t != null) LOG.fatal(msg, t);
+      else LOG.fatal(msg);
+      if(zooKeeper != null) {
+        zooKeeper.close();
+        zooKeeper = null;
+      }
+    }
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 31 23:51:44 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -45,7 +46,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -77,7 +77,7 @@ public class HTable implements HTableInt
   private long currentWriteBufferSize;
   protected int scannerCaching;
   private int maxKeyValueSize;
-  private ExecutorService pool;  // For Multi
+
   private long maxScannerResultSize;
 
   /**
@@ -144,11 +144,13 @@ public class HTable implements HTableInt
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-    
-    int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
-    if (nrThreads == 0) {
-      nrThreads = 1; // is there a better default?
+
+    int nrHRS = getCurrentNrHRS();
+    if (nrHRS == 0) {
+      // No servers running -- set default of 10 threads.
+      nrHRS = 10;
     }
+    int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
 
     // Unfortunately Executors.newCachedThreadPool does not allow us to
     // set the maximum size of the pool, so we have to do it ourselves.
@@ -169,12 +171,13 @@ public class HTable implements HTableInt
    * @throws IOException if a remote or network exception occurs
    */
   int getCurrentNrHRS() throws IOException {
-    return HConnectionManager
-      .getClientZooKeeperWatcher(this.configuration)
-      .getZooKeeperWrapper()
-      .getRSDirectoryCount();
+    HBaseAdmin admin = new HBaseAdmin(this.configuration);
+    return admin.getClusterStatus().getServers();
   }
 
+  // For multiput
+  private ExecutorService pool;
+
   /**
    * Tells whether or not a table is enabled or not.
    * @param tableName Name of table to check.
@@ -505,40 +508,6 @@ public class HTable implements HTableInt
     );
   }
 
-  /**
-   * Method that does a batch call on Deletes, Gets and Puts.
-   *
-   * @param actions list of Get, Put, Delete objects
-   * @param results Empty Result[], same size as actions. Provides access to partial
-   * results, in case an exception is thrown. A null in the result array means that
-   * the call for that action failed, even after retries
-   * @throws IOException
-   */
-  public synchronized void batch(final List<Row> actions, final Result[] results) throws IOException {
-    connection.processBatch(actions, tableName, pool, results);
-  }
-
-  /**
-   * Method that does a batch call on Deletes, Gets and Puts.
-   * 
-   * @param actions list of Get, Put, Delete objects
-   * @return the results from the actions. A null in the return array means that
-   * the call for that action failed, even after retries
-   * @throws IOException
-   */
-  public synchronized Result[] batch(final List<Row> actions) throws IOException {
-    Result[] results = new Result[actions.size()];
-    connection.processBatch(actions, tableName, pool, results);
-    return results;
-  }
-
-  /**
-   * Deletes the specified cells/row.
-   * 
-   * @param delete The object that specifies what to delete.
-   * @throws IOException if a remote or network exception occurs.
-   * @since 0.20.0
-   */
   public void delete(final Delete delete)
   throws IOException {
     connection.getRegionServerWithRetries(
@@ -551,28 +520,13 @@ public class HTable implements HTableInt
     );
   }
 
-  /**
-   * Deletes the specified cells/rows in bulk.
-   * @param deletes List of things to delete. As a side effect, it will be modified:
-   * successful {@link Delete}s are removed. The ordering of the list will not change. 
-   * @throws IOException if a remote or network exception occurs. In that case
-   * the {@code deletes} argument will contain the {@link Delete} instances
-   * that have not be successfully applied.
-   * @since 0.20.1
-   */
   public void delete(final List<Delete> deletes)
   throws IOException {
-    Result[] results = new Result[deletes.size()];
-    connection.processBatch((List) deletes, tableName, pool, results);
-
-    // mutate list so that it is empty for complete success, or contains only failed records
-    // results are returned in the same order as the requests in list
-    // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
-    for (int i = results.length - 1; i>=0; i--) {
-      // if result is not null, it succeeded
-      if (results[i] != null) {
-        deletes.remove(i);
-      }
+    int last = 0;
+    try {
+      last = connection.processBatchOfDeletes(deletes, this.tableName);
+    } finally {
+      deletes.subList(0, last).clear();
     }
   }
 
@@ -601,7 +555,6 @@ public class HTable implements HTableInt
     return incrementColumnValue(row, family, qualifier, amount, true);
   }
 
-  @SuppressWarnings({"ThrowableInstanceNeverThrown"})
   public long incrementColumnValue(final byte [] row, final byte [] family,
       final byte [] qualifier, final long amount, final boolean writeToWAL)
   throws IOException {
@@ -675,7 +628,7 @@ public class HTable implements HTableInt
           public Boolean call() throws IOException {
             return server.checkAndDelete(
                 location.getRegionInfo().getRegionName(),
-                row, family, qualifier, value, delete) 
+                row, family, qualifier, value, delete)
             ? Boolean.TRUE : Boolean.FALSE;
           }
         }
@@ -704,17 +657,10 @@ public class HTable implements HTableInt
     );
   }
 
-  /**
-   * Executes all the buffered {@link Put} operations.
-   * <p>
-   * This method gets called once automatically for every {@link Put} or batch
-   * of {@link Put}s (when {@link #batch(List)} is used) when
-   * {@link #isAutoFlush()} is {@code true}.
-   * @throws IOException if a remote or network exception occurs.
-   */
   public void flushCommits() throws IOException {
     try {
-      connection.processBatchOfPuts(writeBuffer, tableName, pool);
+      connection.processBatchOfPuts(writeBuffer,
+          tableName, pool);
     } finally {
       // the write buffer was adjusted by processBatchOfPuts
       currentWriteBufferSize = 0;
@@ -1153,10 +1099,12 @@ public class HTable implements HTableInt
             Thread t = new Thread(group, r,
                                   namePrefix + threadNumber.getAndIncrement(),
                                   0);
-            if (!t.isDaemon())
-                t.setDaemon(true);
-            if (t.getPriority() != Thread.NORM_PRIORITY)
-                t.setPriority(Thread.NORM_PRIORITY);
+            if (!t.isDaemon()) {
+              t.setDaemon(true);
+            }
+            if (t.getPriority() != Thread.NORM_PRIORITY) {
+              t.setPriority(Thread.NORM_PRIORITY);
+            }
             return t;
         }
   }
@@ -1168,9 +1116,10 @@ public class HTable implements HTableInt
    * @param tableName name of table to configure.
    * @param enable Set to true to enable region cache prefetch. Or set to
    * false to disable it.
+   * @throws ZooKeeperConnectionException
    */
   public static void setRegionCachePrefetch(final byte[] tableName,
-      boolean enable) {
+      boolean enable) throws ZooKeeperConnectionException {
     HConnectionManager.getConnection(HBaseConfiguration.create()).
     setRegionCachePrefetch(tableName, enable);
   }
@@ -1183,9 +1132,10 @@ public class HTable implements HTableInt
    * @param tableName name of table to configure.
    * @param enable Set to true to enable region cache prefetch. Or set to
    * false to disable it.
+   * @throws ZooKeeperConnectionException
    */
   public static void setRegionCachePrefetch(final Configuration conf,
-      final byte[] tableName, boolean enable) {
+      final byte[] tableName, boolean enable) throws ZooKeeperConnectionException {
     HConnectionManager.getConnection(conf).setRegionCachePrefetch(
         tableName, enable);
   }
@@ -1196,9 +1146,10 @@ public class HTable implements HTableInt
    * @param tableName name of table to check
    * @return true if table's region cache prefecth is enabled. Otherwise
    * it is disabled.
+   * @throws ZooKeeperConnectionException
    */
   public static boolean getRegionCachePrefetch(final Configuration conf,
-      final byte[] tableName) {
+      final byte[] tableName) throws ZooKeeperConnectionException {
     return HConnectionManager.getConnection(conf).getRegionCachePrefetch(
         tableName);
   }
@@ -1208,8 +1159,9 @@ public class HTable implements HTableInt
    * @param tableName name of table to check
    * @return true if table's region cache prefecth is enabled. Otherwise
    * it is disabled.
+   * @throws ZooKeeperConnectionException
    */
-  public static boolean getRegionCachePrefetch(final byte[] tableName) {
+  public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException {
     return HConnectionManager.getConnection(HBaseConfiguration.create()).
     getRegionCachePrefetch(tableName);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Tue Aug 31 23:51:44 2010
@@ -20,6 +20,10 @@
 
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -27,8 +31,6 @@ import org.apache.hadoop.hbase.TableNotF
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 
-import java.io.IOException;
-
 /**
  * Scanner class that contains the <code>.META.</code> table scanning logic
  * and uses a Retryable scanner. Provided visitors will be called
@@ -38,7 +40,6 @@ import java.io.IOException;
  * minor releases.
  */
 public class MetaScanner {
-
   /**
    * Scans the meta table and calls a visitor on each RowResult and uses a empty
    * start row value as table name.
@@ -50,7 +51,7 @@ public class MetaScanner {
   public static void metaScan(Configuration configuration,
       MetaScannerVisitor visitor)
   throws IOException {
-    metaScan(configuration, visitor, HConstants.EMPTY_START_ROW);
+    metaScan(configuration, visitor, null);
   }
 
   /**
@@ -170,6 +171,32 @@ public class MetaScanner {
   }
 
   /**
+   * Lists all of the regions currently in META.
+   * @return
+   * @throws IOException
+   */
+  public static List<HRegionInfo> listAllRegions(Configuration conf)
+  throws IOException {
+    final List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    MetaScannerVisitor visitor =
+      new MetaScannerVisitor() {
+        @Override
+        public boolean processRow(Result result) throws IOException {
+          if (result == null || result.isEmpty()) {
+            return true;
+          }
+          HRegionInfo regionInfo = Writables.getHRegionInfo(
+              result.getValue(HConstants.CATALOG_FAMILY,
+                  HConstants.REGIONINFO_QUALIFIER));
+          regions.add(regionInfo);
+          return true;
+        }
+    };
+    metaScan(conf, visitor);
+    return regions;
+  }
+
+  /**
    * Visitor class called to process each row of the .META. table
    */
   public interface MetaScannerVisitor {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+/**
+ * Container for Actions (i.e. Get, Delete, or Put), which are grouped by
+ * regionName. Intended to be used with HConnectionManager.processBatch()
+ */
+public final class MultiAction implements Writable {
+
+  // map of regions to lists of puts/gets/deletes for that region.
+  public Map<byte[], List<Action>> actions = new TreeMap<byte[], List<Action>>(
+      Bytes.BYTES_COMPARATOR);
+
+  public MultiAction() {
+  }
+
+  /**
+   * Get the total number of Actions
+   * 
+   * @return total number of Actions for all groups in this container.
+   */
+  public int size() {
+    int size = 0;
+    for (List l : actions.values()) {
+      size += l.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   * 
+   * @param regionName
+   * @param a
+   */
+  public void add(byte[] regionName, Action a) {
+    List<Action> rsActions = actions.get(regionName);
+    if (rsActions == null) {
+      rsActions = new ArrayList<Action>();
+      actions.put(regionName, rsActions);
+    }
+    rsActions.add(a);
+  }
+
+  /**
+   * @return All actions from all regions in this container
+   */
+  public List<Action> allActions() {
+    List<Action> res = new ArrayList<Action>();
+    for (List<Action> lst : actions.values()) {
+      res.addAll(lst);
+    }
+    return res;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(actions.size());
+    for (Map.Entry<byte[], List<Action>> e : actions.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+      List<Action> lst = e.getValue();
+      out.writeInt(lst.size());
+      for (Action a : lst) {
+        HbaseObjectWritable.writeObject(out, a, Action.class, null);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    actions.clear();
+    int mapSize = in.readInt();
+    for (int i = 0; i < mapSize; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      int listSize = in.readInt();
+      List<Action> lst = new ArrayList<Action>(listSize);
+      for (int j = 0; j < listSize; j++) {
+        lst.add((Action) HbaseObjectWritable.readObject(in, null));
+      }
+      actions.put(key, lst);
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Tue Aug 31 23:51:44 2010
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * @deprecated Use MultiAction instead
  * Data type class for putting multiple regions worth of puts in one RPC.
  */
 public class MultiPut implements Writable {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Tue Aug 31 23:51:44 2010
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * @deprecated Replaced by MultiResponse
  * Response class for MultiPut.
  */
 public class MultiPutResponse implements Writable {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+/**
+ * A container for Result objects, grouped by regionName.
+ */
+public class MultiResponse implements Writable {
+
+  // map of regionName to list of (Results paired to the original index for that
+  // Result)
+  private Map<byte[], List<Pair<Integer, Result>>> results = new TreeMap<byte[], List<Pair<Integer, Result>>>(
+      Bytes.BYTES_COMPARATOR);
+
+  public MultiResponse() {
+  }
+
+  /**
+   * @return Number of pairs in this container
+   */
+  public int size() {
+    int size = 0;
+    for (Collection<?> c : results.values()) {
+      size += c.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add the pair to the container, grouped by the regionName
+   * 
+   * @param regionName
+   * @param r
+   *          First item in the pair is the original index of the Action
+   *          (request). Second item is the Result. Result will be empty for
+   *          successful Put and Delete actions.
+   */
+  public void add(byte[] regionName, Pair<Integer, Result> r) {
+    List<Pair<Integer, Result>> rs = results.get(regionName);
+    if (rs == null) {
+      rs = new ArrayList<Pair<Integer, Result>>();
+      results.put(regionName, rs);
+    }
+    rs.add(r);
+  }
+
+  public Map<byte[], List<Pair<Integer, Result>>> getResults() {
+    return results;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(results.size());
+    for (Map.Entry<byte[], List<Pair<Integer, Result>>> e : results.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+      List<Pair<Integer, Result>> lst = e.getValue();
+      out.writeInt(lst.size());
+      for (Pair<Integer, Result> r : lst) {
+        out.writeInt(r.getFirst());
+        HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    results.clear();
+    int mapSize = in.readInt();
+    for (int i = 0; i < mapSize; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      int listSize = in.readInt();
+      List<Pair<Integer, Result>> lst = new ArrayList<Pair<Integer, Result>>(
+          listSize);
+      for (int j = 0; j < listSize; j++) {
+        Integer idx = in.readInt();
+        Result r = (Result) HbaseObjectWritable.readObject(in, null);
+        lst.add(new Pair<Integer, Result>(idx, r));
+      }
+      results.put(key, lst);
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java Tue Aug 31 23:51:44 2010
@@ -31,6 +31,10 @@ public class RetriesExhaustedException e
     super(msg);
   }
 
+  public RetriesExhaustedException(final String msg, final IOException e) {
+    super(msg, e);
+  }
+
   /**
    * Create a new RetriesExhaustedException from the list of prior failures.
    * @param serverName name of HRegionServer

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionManager.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerConnectionManager.java Tue Aug 31 23:51:44 2010
@@ -21,6 +21,7 @@
 package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 
 
 /**
@@ -38,8 +39,9 @@ public class ServerConnectionManager ext
    * If no current connection exists, create a new connection for that instance
    * @param conf configuration
    * @return HConnection object for the instance specified by the configuration
+   * @throws ZooKeeperConnectionException
    */
-  public static ServerConnection getConnection(Configuration conf) {
+  public static ServerConnection getConnection(Configuration conf) throws ZooKeeperConnectionException {
     return (ServerConnection) HConnectionManager.getConnection(conf);
   }
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,223 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Server;
+
+
+/**
+ * Abstract base class for all HBase event handlers. Subclasses should
+ * implement the {@link #process()} method.  Subclasses should also do all
+ * necessary checks up in their constructor if possible -- check table exists,
+ * is disabled, etc. -- so they fail fast rather than later when process is
+ * running.  Do it this way because process be invoked directly but event
+ * handlers are also
+ * run in an executor context -- i.e. asynchronously -- and in this case,
+ * exceptions thrown at process time will not be seen by the invoker, not till
+ * we implement a call-back mechanism so the client can pick them up later.
+ * <p>
+ * Event handlers have an {@link EventType}.
+ * {@link EventType} is a list of ALL handler event types.  We need to keep
+ * a full list in one place -- and as enums is a good shorthand for an
+ * implemenations -- because event handlers can be passed to executors when
+ * they are to be run asynchronously. The
+ * hbase executor, see {@link ExecutorService}, has a switch for passing
+ * event type to executor.
+ * <p>
+ * Event listeners can be installed and will be called pre- and post- process if
+ * this EventHandler is run in a Thread (its a Runnable so if its {@link #run()}
+ * method gets called).  Implement
+ * {@link EventHandlerListener}s, and registering using
+ * {@link #setListener(EventHandlerListener)}.
+ * @see {@link ExecutorService}
+ */
+public abstract class EventHandler implements Runnable, Comparable<Runnable> {
+  private static final Log LOG = LogFactory.getLog(EventHandler.class);
+
+  // type of event this object represents
+  protected EventType eventType;
+
+  protected Server server;
+
+  // sequence id generator for default FIFO ordering of events
+  protected static AtomicLong seqids = new AtomicLong(0);
+
+  // sequence id for this event
+  private final long seqid;
+
+  // Listener to call pre- and post- processing.  May be null.
+  private EventHandlerListener listener;
+
+  /**
+   * This interface provides pre- and post-process hooks for events.
+   */
+  public interface EventHandlerListener {
+    /**
+     * Called before any event is processed
+     * @param The event handler whose process method is about to be called.
+     */
+    public void beforeProcess(EventHandler event);
+    /**
+     * Called after any event is processed
+     * @param The event handler whose process method is about to be called.
+     */
+    public void afterProcess(EventHandler event);
+  }
+
+  /**
+   * List of all HBase event handler types.  Event types are named by a
+   * convention: event type names specify the component from which the event
+   * originated and then where its destined -- e.g. RS2ZK_ prefix means the
+   * event came from a regionserver destined for zookeeper -- and then what
+   * the even is; e.g. REGION_OPENING.
+   * 
+   * <p>We give the enums indices so we can add types later and keep them
+   * grouped together rather than have to add them always to the end as we
+   * would have to if we used raw enum ordinals.
+   */
+  public enum EventType {
+    // Messages originating from RS (NOTE: there is NO direct communication from
+    // RS to Master). These are a result of RS updates into ZK.
+    RS2ZK_REGION_CLOSING      (1),   // RS is in process of closing a region
+    RS2ZK_REGION_CLOSED       (2),   // RS has finished closing a region
+    RS2ZK_REGION_OPENING      (3),   // RS is in process of opening a region
+    RS2ZK_REGION_OPENED       (4),   // RS has finished opening a region
+
+    // Messages originating from Master to RS
+    M2RS_OPEN_REGION          (20),  // Master asking RS to open a region
+    M2RS_OPEN_ROOT            (21),  // Master asking RS to open root
+    M2RS_OPEN_META            (22),  // Master asking RS to open meta
+    M2RS_CLOSE_REGION         (23),  // Master asking RS to close a region
+    M2RS_CLOSE_ROOT           (24),  // Master asking RS to close root
+    M2RS_CLOSE_META           (25),  // Master asking RS to close meta
+
+    // Messages originating from Client to Master
+    C2M_DELETE_TABLE          (40),   // Client asking Master to delete a table
+    C2M_DISABLE_TABLE         (41),   // Client asking Master to disable a table
+    C2M_ENABLE_TABLE          (42),   // Client asking Master to enable a table
+    C2M_MODIFY_TABLE          (43),   // Client asking Master to modify a table
+    C2M_ADD_FAMILY            (44),   // Client asking Master to add family to table
+    C2M_DELETE_FAMILY         (45),   // Client asking Master to delete family of table
+    C2M_MODIFY_FAMILY         (46),   // Client asking Master to modify family of table
+
+    // Updates from master to ZK. This is done by the master and there is
+    // nothing to process by either Master or RS
+    M2ZK_REGION_OFFLINE       (50),  // Master adds this region as offline in ZK
+
+    // Master controlled events to be executed on the master
+    M_SERVER_SHUTDOWN         (70);  // Master is processing shutdown of a RS
+
+    /**
+     * Constructor
+     */
+    EventType(int value) {}
+  }
+
+  /**
+   * Default base class constructor.
+   */
+  public EventHandler(Server server, EventType eventType) {
+    this.server = server;
+    this.eventType = eventType;
+    seqid = seqids.incrementAndGet();
+  }
+
+  public void run() {
+    try {
+      if (getListener() != null) getListener().beforeProcess(this);
+      process();
+      if (getListener() != null) getListener().afterProcess(this);
+    } catch(Throwable t) {
+      LOG.error("Caught throwable while processing event " + eventType, t);
+    }
+  }
+
+  /**
+   * This method is the main processing loop to be implemented by the various
+   * subclasses.
+   * @throws IOException
+   */
+  public abstract void process() throws IOException;
+
+  /**
+   * Return the event type
+   * @return
+   */
+  public EventType getEventType() {
+    return this.eventType;
+  }
+
+  /**
+   * Get the priority level for this handler instance.  This uses natural
+   * ordering so lower numbers are higher priority.
+   * <p>
+   * Lowest priority is Integer.MAX_VALUE.  Highest priority is 0.
+   * <p>
+   * Subclasses should override this method to allow prioritizing handlers.
+   * <p>
+   * Handlers with the same priority are handled in FIFO order.
+   * <p>
+   * @return Integer.MAX_VALUE by default, override to set higher priorities
+   */
+  public int getPriority() {
+    return Integer.MAX_VALUE;
+  }
+
+  /**
+   * @return This events' sequence id.
+   */
+  public long getSeqid() {
+    return this.seqid;
+  }
+
+  /**
+   * Default prioritized runnable comparator which implements a FIFO ordering.
+   * <p>
+   * Subclasses should not override this.  Instead, if they want to implement
+   * priority beyond FIFO, they should override {@link #getPriority()}.
+   */
+  @Override
+  public int compareTo(Runnable o) {
+    EventHandler eh = (EventHandler)o;
+    if(getPriority() != eh.getPriority()) {
+      return (getPriority() < eh.getPriority()) ? -1 : 1;
+    }
+    return (this.seqid < eh.seqid) ? -1 : 1;
+  }
+
+  /**
+   * @return Current listener or null if none set.
+   */
+  public synchronized EventHandlerListener getListener() {
+    return listener;
+  }
+
+  /**
+   * @param listener Listener to call pre- and post- {@link #process()}.
+   */
+  public synchronized void setListener(EventHandlerListener listener) {
+    this.listener = listener;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a generic executor service. This component abstracts a
+ * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
+ * and a <code>Runnable</code> that handles the object that is added to the queue.
+ *
+ * <p>In order to create a new service, create an instance of this class and 
+ * then do: <code>instance.startExecutorService("myService");</code>.  When done
+ * call {@link #shutdown()}.
+ *
+ * <p>In order to use the service created above, call
+ * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
+ * by registering your implementation of {@link EventHandler.EventHandlerListener}
+ * with {@link #registerListener(EventType, EventHandlerListener)}.  Be sure
+ * to deregister your listener when done via {@link #unregisterListener(EventType)}.
+ */
+public class ExecutorService {
+  private static final Log LOG = LogFactory.getLog(ExecutorService.class);
+
+  // hold the all the executors created in a map addressable by their names
+  private final ConcurrentHashMap<String, Executor> executorMap =
+    new ConcurrentHashMap<String, Executor>();
+
+  // listeners that are called before and after an event is processed
+  private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
+    new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
+
+  // Name of the server hosting this executor service.
+  private final String servername;
+
+  /**
+   * The following is a list of all executor types, both those that run in the
+   * master and those that run in the regionserver.
+   */
+  public enum ExecutorType {
+
+    // Master executor services
+    MASTER_CLOSE_REGION        (1),
+    MASTER_OPEN_REGION         (2),
+    MASTER_SERVER_OPERATIONS   (3),
+    MASTER_TABLE_OPERATIONS    (4),
+    MASTER_RS_SHUTDOWN         (5),
+
+    // RegionServer executor services
+    RS_OPEN_REGION             (20),
+    RS_OPEN_ROOT               (21),
+    RS_OPEN_META               (22),
+    RS_CLOSE_REGION            (23),
+    RS_CLOSE_ROOT              (24),
+    RS_CLOSE_META              (25);
+
+    ExecutorType(int value) {}
+
+    /**
+     * @param serverName
+     * @return Conflation of the executor type and the passed servername.
+     */
+    String getExecutorName(String serverName) {
+      return this.toString() + "-" + serverName;
+    }
+  }
+
+  /**
+   * Returns the executor service type (the thread pool instance) for the
+   * passed event handler type.
+   * @param type EventHandler type.
+   */
+  public ExecutorType getExecutorServiceType(final EventHandler.EventType type) {
+    switch(type) {
+      // Master executor services
+
+      case RS2ZK_REGION_CLOSED:
+        return ExecutorType.MASTER_CLOSE_REGION;
+
+      case RS2ZK_REGION_OPENED:
+        return ExecutorType.MASTER_OPEN_REGION;
+
+      case M_SERVER_SHUTDOWN:
+        return ExecutorType.MASTER_SERVER_OPERATIONS;
+
+      case C2M_DELETE_TABLE:
+      case C2M_DISABLE_TABLE:
+      case C2M_ENABLE_TABLE:
+      case C2M_MODIFY_TABLE:
+        return ExecutorType.MASTER_TABLE_OPERATIONS;
+
+      // RegionServer executor services
+
+      case M2RS_OPEN_REGION:
+        return ExecutorType.RS_OPEN_REGION;
+
+      case M2RS_OPEN_ROOT:
+        return ExecutorType.RS_OPEN_ROOT;
+
+      case M2RS_OPEN_META:
+        return ExecutorType.RS_OPEN_META;
+
+      case M2RS_CLOSE_REGION:
+        return ExecutorType.RS_CLOSE_REGION;
+
+      case M2RS_CLOSE_ROOT:
+        return ExecutorType.RS_CLOSE_ROOT;
+
+      case M2RS_CLOSE_META:
+        return ExecutorType.RS_CLOSE_META;
+
+      default:
+        throw new RuntimeException("Unhandled event type " + type);
+    }
+  }
+
+  /**
+   * Default constructor.
+   * @param Name of the hosting server.
+   */
+  public ExecutorService(final String servername) {
+    super();
+    this.servername = servername;
+  }
+
+  /**
+   * Start an executor service with a given name. If there was a service already
+   * started with the same name, this throws a RuntimeException.
+   * @param name Name of the service to start.
+   */
+  void startExecutorService(String name, int maxThreads) {
+    if (this.executorMap.get(name) != null) {
+      throw new RuntimeException("An executor service with the name " + name +
+        " is already running!");
+    }
+    Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
+    if (this.executorMap.putIfAbsent(name, hbes) != null) {
+      throw new RuntimeException("An executor service with the name " + name +
+      " is already running (2)!");
+    }
+    LOG.debug("Starting executor service: " + name);
+  }
+
+  boolean isExecutorServiceRunning(String name) {
+    return this.executorMap.containsKey(name);
+  }
+
+  public void shutdown() {
+    for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
+      List<Runnable> wasRunning =
+        entry.getValue().threadPoolExecutor.shutdownNow();
+      if (!wasRunning.isEmpty()) {
+        LOG.info(entry.getKey() + " had " + wasRunning + " on shutdown");
+      }
+    }
+    this.executorMap.clear();
+  }
+
+  Executor getExecutor(final ExecutorType type) {
+    return getExecutor(type.getExecutorName(this.servername));
+  }
+
+  Executor getExecutor(String name) {
+    Executor executor = this.executorMap.get(name);
+    if (executor == null) {
+      LOG.debug("Executor service [" + name + "] not found in " + this.executorMap);
+    }
+    return executor;
+  }
+
+
+  public void startExecutorService(final ExecutorType type, final int maxThreads) {
+    String name = type.getExecutorName(this.servername);
+    if (isExecutorServiceRunning(name)) {
+      LOG.debug("Executor service " + toString() + " already running on " +
+        this.servername);
+      return;
+    }
+    startExecutorService(name, maxThreads);
+  }
+
+  public void submit(final EventHandler eh) {
+    getExecutor(getExecutorServiceType(eh.getEventType())).submit(eh);
+  }
+
+  /**
+   * Subscribe to updates before and after processing instances of
+   * {@link EventHandler.EventType}.  Currently only one listener per
+   * event type.
+   * @param type Type of event we're registering listener for
+   * @param listener The listener to run.
+   * @return The <code>listener</code> that was passed
+   */
+  public void registerListener(final EventHandler.EventType type,
+      final EventHandlerListener listener) {
+    this.eventHandlerListeners.put(type, listener);
+  }
+
+  /**
+   * Stop receiving updates before and after processing instances of
+   * {@link EventHandler.EventType}
+   * @param type Type of event we're registering listener for
+   * @return The listener we removed or null if we did not remove it.
+   */
+  public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
+    return this.eventHandlerListeners.remove(type);
+  }
+
+  /**
+   * Executor instance.
+   */
+  private static class Executor {
+    // default number of threads in the pool
+    private int corePoolSize = 1;
+    // how long to retain excess threads
+    private long keepAliveTimeInMillis = 1000;
+    // the thread pool executor that services the requests
+    private final ThreadPoolExecutor threadPoolExecutor;
+    // work queue to use - unbounded queue
+    BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
+    private final AtomicInteger threadid = new AtomicInteger(0);
+    private final String name;
+    private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
+
+    protected Executor(String name, int maxThreads,
+        final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
+      this.name = name;
+      this.eventHandlerListeners = eventHandlerListeners;
+      // create the thread pool executor
+      this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
+          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
+      // name the threads for this threadpool
+      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+      tfb.setNameFormat(this.name + "-" + this.threadid.incrementAndGet());
+      this.threadPoolExecutor.setThreadFactory(tfb.build());
+    }
+
+    /**
+     * Submit the event to the queue for handling.
+     * @param event
+     */
+    void submit(final EventHandler event) {
+      // If there is a listener for this type, make sure we call the before
+      // and after process methods.
+      EventHandlerListener listener =
+        this.eventHandlerListeners.get(event.getEventType());
+      if (listener != null) {
+        event.setListener(listener);
+      }
+      this.threadPoolExecutor.execute(event);
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message