hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r986524 [1/2] - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/executor/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/o...
Date Tue, 17 Aug 2010 23:36:50 GMT
Author: stack
Date: Tue Aug 17 23:36:49 2010
New Revision: 986524

URL: http://svn.apache.org/viewvc?rev=986524&view=rev
Log:
Big changes are:

Renamed HBaseExcecutorService as ExcecutorService; undid its staticness
so minicluster tests would work (the first server to exit was killing
the executor service on others still running)

Made it so cluster shuts down gracefully keyed off state of the
cluster shutdown flag up in zk.  As soon as it is set, RSs start closing
all user regions WITHOUT updating zk and using close handlers
exclusively (now only one way to close a region rather than one way
per type of operation)  If the RS was NOT carrying a catalog table
when all user tables closed, it just shuts itself down.  When master
notices that all but <= 2 regionservers remain, it tells them they
can close.  This mimics the quiesce state we used to have.  Master
will shut himself down when no more regionservers.  Shutdown of
regionservers is now handled by zk expireserver watcher only where
before we watched rss w/ zk and then processed them with HMsgs on
hearbeat... now only the one means of handling rs expiration.

M src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
  Fixes so can restart cluster inside same Test; wasn't working.
M src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
  Made this test pass. Changed order of the tests because I was working
  on the second one last; otherwise just formatted it same as other
  tests.
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
  Formatting of log messages.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Added watcher on cluster status flag.
  Lots of refactoring -- lots of small coherent methods added in
  place of long reams of code -- and cleanup removing unused code.
M src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
M src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java
M src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
  Changed this so could flag it not to update zk
M src/main/java/org/apache/hadoop/hbase/Server.java
  Removed master only methods; added getExecutorService so can add
  Events easy enough from any context (Mostly used on master-side).
M src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
  Added pretty print of root and meta names rather than output their hashes.
M src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
  Added running of cluster shutdown.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
  Fixed bug by breaking up processStartup into two pieces... the clean of
  unassigned dir and then the assigning of all regions.  When together
  they were erasing legit state during startup.
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
  Mostly stuff around new format for ExecutorService.
M src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
  Recast so not based on statics.
A src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
  Recast so not based on statics and rename of HBaseExecutorService.
D src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
  Renamed.
D src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java
  Used guava version instead.

Added:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
Removed:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java
Modified:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/Server.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java Tue Aug 17 23:36:49 2010
@@ -109,6 +109,22 @@ public class HRegionInfo extends Version
     return encodedName;
   }
 
+  /**
+   * Use logging.
+   * @param encodedRegionName The encoded regionname.
+   * @return <code>-ROOT-</code> if passed <code>70236052</code> or
+   * <code>.META.</code> if passed </code>1028785192</code> else returns
+   * <code>encodedRegionName</code>
+   */
+  public static String prettyPrint(final String encodedRegionName) {
+    if (encodedRegionName.equals("70236052")) {
+      return encodedRegionName + "/-ROOT-";
+    } else if (encodedRegionName.equals("1028785192")) {
+      return encodedRegionName + "/.META.";
+    }
+    return encodedRegionName;
+  }
+
   /** delimiter used between portions of a region name */
   public static final int DELIMITER = ',';
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/Server.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/Server.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/Server.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/Server.java Tue Aug 17 23:36:49 2010
@@ -20,7 +20,7 @@
 package org.apache.hadoop.hbase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
@@ -29,11 +29,6 @@ import org.apache.hadoop.hbase.zookeeper
  */
 public interface Server extends Abortable, Stoppable {
   /**
-   * Returns the address of the current server.
-   */
-  public HServerAddress getHServerAddress();
-
-  /**
    * Gets the configuration object for this server.
    */
   public Configuration getConfiguration();
@@ -53,7 +48,7 @@ public interface Server extends Abortabl
   public String getServerName();
 
   /**
-   * Return the server RPC connection object
+   * @return This servers executor service.
    */
-  public ServerConnection getServerConnection();
+  public ExecutorService getExecutorService();
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Tue Aug 17 23:36:49 2010
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
 
 
 /**
@@ -35,13 +35,13 @@ import org.apache.hadoop.hbase.executor.
  * implement the process() method where the actual handling of the event
  * happens.
  * <p>
- * HBaseEventType is a list of ALL events (which also corresponds to messages -
+ * EventType is a list of ALL events (which also corresponds to messages -
  * either internal to one component or between components). The event type
  * names specify the component from which the event originated, and the
  * component which is supposed to handle it.
  * <p>
  * Listeners can listen to all the events by implementing the interface
- * HBaseEventHandlerListener, and by registering themselves as a listener. They
+ * EventHandlerListener, and by registering themselves as a listener. They
  * will be called back before and after the process of every event.
  */
 public abstract class EventHandler implements Runnable, Comparable<Runnable> {
@@ -118,60 +118,51 @@ public abstract class EventHandler imple
      * handled just add the NONE executor.
      * @return name of the executor service
      */
-    public HBaseExecutorServiceType getExecutorServiceType() {
+    public ExecutorType getExecutorServiceType() {
       switch(this) {
 
         // Master executor services
 
         case RS2ZK_REGION_CLOSED:
-          return HBaseExecutorServiceType.MASTER_CLOSE_REGION;
+          return ExecutorType.MASTER_CLOSE_REGION;
 
         case RS2ZK_REGION_OPENED:
-          return HBaseExecutorServiceType.MASTER_OPEN_REGION;
+          return ExecutorType.MASTER_OPEN_REGION;
 
         case M_SERVER_SHUTDOWN:
-          return HBaseExecutorServiceType.MASTER_SERVER_OPERATIONS;
+          return ExecutorType.MASTER_SERVER_OPERATIONS;
 
         case C2M_DELETE_TABLE:
         case C2M_DISABLE_TABLE:
         case C2M_ENABLE_TABLE:
         case C2M_MODIFY_TABLE:
-          return HBaseExecutorServiceType.MASTER_TABLE_OPERATIONS;
+          return ExecutorType.MASTER_TABLE_OPERATIONS;
 
         // RegionServer executor services
 
         case M2RS_OPEN_REGION:
-          return HBaseExecutorServiceType.RS_OPEN_REGION;
+          return ExecutorType.RS_OPEN_REGION;
 
         case M2RS_OPEN_ROOT:
-          return HBaseExecutorServiceType.RS_OPEN_ROOT;
+          return ExecutorType.RS_OPEN_ROOT;
 
         case M2RS_OPEN_META:
-          return HBaseExecutorServiceType.RS_OPEN_META;
+          return ExecutorType.RS_OPEN_META;
 
         case M2RS_CLOSE_REGION:
-          return HBaseExecutorServiceType.RS_CLOSE_REGION;
+          return ExecutorType.RS_CLOSE_REGION;
 
         case M2RS_CLOSE_ROOT:
-          return HBaseExecutorServiceType.RS_CLOSE_ROOT;
+          return ExecutorType.RS_CLOSE_ROOT;
 
         case M2RS_CLOSE_META:
-          return HBaseExecutorServiceType.RS_CLOSE_META;
+          return ExecutorType.RS_CLOSE_META;
 
         default:
           throw new RuntimeException("Unhandled event type " + this.name());
       }
     }
 
-    /**
-     * Start the executor service that handles the passed in event type. The
-     * server that starts these event executor services wants to handle these
-     * event types.
-     */
-    public void startExecutorService(String serverName, int maxThreads) {
-      getExecutorServiceType().startExecutorService(serverName, maxThreads);
-    }
-
     EventType(int value) {}
 
     @Override
@@ -245,7 +236,7 @@ public abstract class EventHandler imple
    * Return the name for this event type.
    * @return
    */
-  public HBaseExecutorServiceType getEventHandlerName() {
+  public ExecutorType getExecutorType() {
     return eventType.getExecutorServiceType();
   }
 
@@ -258,19 +249,6 @@ public abstract class EventHandler imple
   }
 
   /**
-   * Submits this event object to the correct executor service.
-   */
-  public void submit() {
-    HBaseExecutorServiceType serviceType = getEventHandlerName();
-    if(serviceType == null) {
-      throw new RuntimeException("Event " + eventType + " not handled on " +
-          "this server " + server.getServerName());
-    }
-    serviceType.getExecutor(server.getServerName()).submit(this);
-  }
-
-
-  /**
    * Get the priority level for this handler instance.  This uses natural
    * ordering so lower numbers are higher priority.
    * <p>

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java?rev=986524&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java Tue Aug 17 23:36:49 2010
@@ -0,0 +1,189 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a generic executor service. This component abstracts a
+ * threadpool, a queue to which jobs can be submitted and a Runnable that
+ * handles the object that is added to the queue.
+ *
+ * <p>In order to create a new service, create an instance of this class and 
+ * then do: <code>instance.startExecutorService("myService");</code>.  When done
+ * call {@link #shutdown()}.
+ *
+ * In order to use the service created above, you need to override the
+ * <code>EventHandler</code> class and create an event type that submits to this
+ * service.
+ */
+public class ExecutorService {
+  private static final Log LOG = LogFactory.getLog(ExecutorService.class);
+
+  // hold the all the executors created in a map addressable by their names
+  private final ConcurrentHashMap<String, Executor> executorMap =
+    new ConcurrentHashMap<String, Executor>();
+
+  private final String servername;
+
+  /**
+   * The following is a list of names for the various executor services in both
+   * the master and the region server.
+   */
+  public enum ExecutorType {
+
+    // Master executor services
+    MASTER_CLOSE_REGION        (1),
+    MASTER_OPEN_REGION         (2),
+    MASTER_SERVER_OPERATIONS   (3),
+    MASTER_TABLE_OPERATIONS    (4),
+    MASTER_RS_SHUTDOWN         (5),
+
+    // RegionServer executor services
+    RS_OPEN_REGION             (20),
+    RS_OPEN_ROOT               (21),
+    RS_OPEN_META               (22),
+    RS_CLOSE_REGION            (23),
+    RS_CLOSE_ROOT              (24),
+    RS_CLOSE_META              (25);
+
+    ExecutorType(int value) {}
+
+    String getExecutorName(String serverName) {
+      return this.toString() + "-" + serverName;
+    }
+  }
+
+  /**
+   * Default constructor.
+   * @param Name of the hosting server.
+   */
+  public ExecutorService(final String servername) {
+    super();
+    this.servername = servername;
+  }
+
+  /**
+   * Start an executor service with a given name. If there was a service already
+   * started with the same name, this throws a RuntimeException.
+   * @param name Name of the service to start.
+   */
+  void startExecutorService(String name, int maxThreads) {
+    if (this.executorMap.get(name) != null) {
+      throw new RuntimeException("An executor service with the name " + name +
+        " is already running!");
+    }
+    Executor hbes = new Executor(name, maxThreads);
+    if (this.executorMap.putIfAbsent(name, hbes) != null) {
+      throw new RuntimeException("An executor service with the name " + name +
+      " is already running (2)!");
+    }
+    LOG.debug("Starting executor service: " + name);
+  }
+
+  boolean isExecutorServiceRunning(String name) {
+    return this.executorMap.containsKey(name);
+  }
+
+  public void shutdown() {
+    for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
+      List<Runnable> wasRunning =
+        entry.getValue().threadPoolExecutor.shutdownNow();
+      if (!wasRunning.isEmpty()) {
+        LOG.info(entry.getKey() + " had " + wasRunning + " on shutdown");
+      }
+    }
+    this.executorMap.clear();
+  }
+
+  Executor getExecutor(final ExecutorType type) {
+    return getExecutor(type.getExecutorName(this.servername));
+  }
+
+  Executor getExecutor(String name) {
+    Executor executor = this.executorMap.get(name);
+    if (executor == null) {
+      LOG.debug("Executor service [" + name + "] not found in " + this.executorMap);
+    }
+    return executor;
+  }
+
+
+  public void startExecutorService(final ExecutorType type, final int maxThreads) {
+    String name = type.getExecutorName(this.servername);
+    if (isExecutorServiceRunning(name)) {
+      LOG.debug("Executor service " + toString() + " already running on " +
+        this.servername);
+      return;
+    }
+    startExecutorService(name, maxThreads);
+  }
+
+  public void submit(final EventHandler eh) {
+    getExecutor(eh.getExecutorType()).submit(eh);
+  }
+
+  /**
+   * Executor instance.
+   */
+  private static class Executor {
+    // default number of threads in the pool
+    private int corePoolSize = 1;
+    // how long to retain excess threads
+    private long keepAliveTimeInMillis = 1000;
+    // the thread pool executor that services the requests
+    private final ThreadPoolExecutor threadPoolExecutor;
+    // work queue to use - unbounded queue
+    BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
+    private final AtomicInteger threadid = new AtomicInteger(0);
+    private final String name;
+
+    protected Executor(String name, int maxThreads) {
+      this.name = name;
+      // create the thread pool executor
+      this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
+          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
+      // name the threads for this threadpool
+      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+      tfb.setNameFormat(this.name + "-" + this.threadid.incrementAndGet());
+      this.threadPoolExecutor.setThreadFactory(tfb.build());
+    }
+
+    /**
+     * Submit the event to the queue for handling.
+     * @param event
+     */
+    void submit(Runnable event) {
+      this.threadPoolExecutor.execute(event);
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Tue Aug 17 23:36:49 2010
@@ -82,8 +82,8 @@ public class AssignmentManager extends Z
   private TimeoutMonitor timeoutMonitor;
 
   /** Regions currently in transition. */
-  private final Map<String,RegionState> regionsInTransition =
-    new TreeMap<String,RegionState>();
+  private final Map<String, RegionState> regionsInTransition =
+    new TreeMap<String, RegionState>();
 
   /** Plans for region movement. */
   // TODO: When do plans get cleaned out?  Ever?
@@ -136,17 +136,16 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Cluster startup.  Reset all unassigned nodes and assign all user regions.
+   * Reset all unassigned znodes.  Called on startup of master.
+   * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
    * @throws IOException
    * @throws KeeperException
    */
-  void processStartup() throws IOException, KeeperException {
+  void cleanoutUnassigned() throws IOException, KeeperException {
     // Cleanup any existing ZK nodes and start watching
     ZKAssign.deleteAllNodes(watcher);
     ZKUtil.listChildrenAndWatchForNewChildren(watcher,
         watcher.assignmentZNode);
-    // Assign all existing user regions out
-    assignAllUserRegions();
   }
 
   /**
@@ -230,12 +229,13 @@ public class AssignmentManager extends Z
       // Verify this is a known server
       if(!serverManager.isServerOnline(data.getServerName())) {
         LOG.warn("Attempted to handle region transition for server " +
-            data.getServerName() + " but server is not online");
+          data.getServerName() + " but server is not online");
         return;
       }
       String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
+      String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
       LOG.debug("Handling region transition for server " +
-          data.getServerName() + " and region " + encodedName);
+        data.getServerName() + " and region " + prettyPrintedRegionName);
       RegionState regionState = regionsInTransition.get(encodedName);
       switch(data.getEventType()) {
 
@@ -244,7 +244,7 @@ public class AssignmentManager extends Z
           // times after already being in state of CLOSING
           if(regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
-            LOG.warn("Received CLOSING for region " + encodedName +
+            LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_CLOSE or CLOSING states");
@@ -258,15 +258,15 @@ public class AssignmentManager extends Z
           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
           if(regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
-            LOG.warn("Received CLOSED for region " + encodedName +
+            LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_CLOSE or CLOSING states");
             return;
           }
           // Handle CLOSED by assigning elsewhere or stopping if a disable
-          new ClosedRegionHandler(master, this, data, regionState.getRegion())
-          .submit();
+          this.master.getExecutorService().submit(new ClosedRegionHandler(master,
+            this, data, regionState.getRegion()));
           break;
 
         case RS2ZK_REGION_OPENING:
@@ -274,7 +274,8 @@ public class AssignmentManager extends Z
           // times after already being in state of OPENING
           if(regionState == null ||
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
-            LOG.warn("Received OPENING for region " + encodedName +
+            LOG.warn("Received OPENING for region " +
+                prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_OPEN or OPENING states");
@@ -288,7 +289,8 @@ public class AssignmentManager extends Z
           // Should see OPENED after OPENING but possible after PENDING_OPEN
           if(regionState == null ||
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
-            LOG.warn("Received OPENED for region " + encodedName +
+            LOG.warn("Received OPENED for region " +
+                prettyPrintedRegionName +
                 " from server " + data.getServerName() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_OPEN or OPENING states");
@@ -296,17 +298,16 @@ public class AssignmentManager extends Z
           }
           // If this is a catalog table, update catalog manager accordingly
           // Moving root and meta editing over to RS who does the opening
-          LOG.debug("Processing OPENED for region " + regionState.getRegion() +
-              " which isMeta[" + regionState.getRegion().isMetaRegion() + "] " +
-              " isRoot[" + regionState.getRegion().isRootRegion() + "]");
+          LOG.debug("Processing OPENED for region " +
+            regionState.getRegion().getRegionNameAsString());
 
           // Used to have updating of root/meta locations here but it's
           // automatic in CatalogTracker now
 
           // Handle OPENED by removing from transition and deleted zk node
-          new OpenedRegionHandler(master, this, data, regionState.getRegion(),
-              serverManager.getServerInfo(data.getServerName()))
-          .submit();
+          this.master.getExecutorService().submit(
+            new OpenedRegionHandler(master, this, data, regionState.getRegion(),
+              this.serverManager.getServerInfo(data.getServerName())));
           break;
       }
     }
@@ -481,7 +482,7 @@ public class AssignmentManager extends Z
    * @param regionName server to be assigned
    */
   public void assign(HRegionInfo region) {
-    LOG.debug("Starting assignment for region " + region);
+    LOG.debug("Starting assignment for region " + region.getRegionNameAsString());
     // Grab the state of this region and synchronize on it
     String encodedName = region.getEncodedName();
     RegionState state;
@@ -535,7 +536,8 @@ public class AssignmentManager extends Z
     synchronized(regionPlans) {
       plan = regionPlans.get(encodedName);
       if(plan == null) {
-        LOG.debug("No previous transition plan for " + encodedName +
+        LOG.debug("No previous transition plan for " +
+            state.getRegion().getRegionNameAsString() +
             " so generating a random one from " + serverManager.numServers() +
             " ( " + serverManager.getOnlineServers().size() + ") available servers");
         plan = new RegionPlan(encodedName, null,
@@ -570,24 +572,27 @@ public class AssignmentManager extends Z
    * @param regionName server to be unassigned
    */
   public void unassign(HRegionInfo region) {
-    LOG.debug("Starting unassignment of region " + region + " (offlining)");
+    LOG.debug("Starting unassignment of region " +
+      region.getRegionNameAsString() + " (offlining)");
     // Check if this region is currently assigned
     if (!regions.containsKey(region)) {
-      LOG.debug("Attempted to unassign region " + region + " but it is not " +
-          "currently assigned anywhere");
+      LOG.debug("Attempted to unassign region " + region.getRegionNameAsString() +
+        " but it is not " +
+        "currently assigned anywhere");
       return;
     }
-    String regionName = region.getEncodedName();
+    String encodedName = region.getEncodedName();
     // Grab the state of this region and synchronize on it
     RegionState state;
     synchronized(regionsInTransition) {
-      state = regionsInTransition.get(regionName);
+      state = regionsInTransition.get(encodedName);
       if(state == null) {
         state = new RegionState(region, RegionState.State.PENDING_CLOSE);
-        regionsInTransition.put(regionName, state);
+        regionsInTransition.put(encodedName, state);
       } else {
-        LOG.debug("Attempting to unassign region " + region + " but it is " +
-            "already in transition (" + state.getState() + ")");
+        LOG.debug("Attempting to unassign region " +
+          region.getRegionNameAsString() + " but it is " +
+          "already in transition (" + state.getState() + ")");
         return;
       }
     }
@@ -595,7 +600,8 @@ public class AssignmentManager extends Z
     try {
       serverManager.sendRegionClose(regions.get(region), state.getRegion());
     } catch (NotServingRegionException e) {
-      LOG.warn("Attempted to close region " + region + " but got an NSRE", e);
+      LOG.warn("Attempted to close region " + region.getRegionNameAsString() +
+        " but got an NSRE", e);
     }
   }
 
@@ -678,7 +684,7 @@ public class AssignmentManager extends Z
       List<HRegionInfo> regions = entry.getValue();
       LOG.debug("Assigning " + regions.size() + " regions to " + server);
       for(HRegionInfo region : regions) {
-        LOG.debug("Assigning " + region + " to " + server);
+        LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + server);
         String regionName = region.getEncodedName();
         RegionPlan plan = new RegionPlan(regionName, null,server);
         regionPlans.put(regionName, plan);
@@ -694,7 +700,7 @@ public class AssignmentManager extends Z
       throw new IOException(e);
     }
 
-    LOG.info("\n\nAll user regions have been assigned");
+    LOG.info("All user regions have been assigned");
   }
 
   private void rebuildUserRegions() throws IOException {
@@ -861,14 +867,13 @@ public class AssignmentManager extends Z
 
     @Override
     protected void chore() {
-      synchronized(regionsInTransition) {
+      synchronized (regionsInTransition) {
         // Iterate all regions in transition checking for time outs
         long now = System.currentTimeMillis();
-        for(RegionState regionState : regionsInTransition.values()) {
+        for (RegionState regionState : regionsInTransition.values()) {
           if(regionState.getStamp() + timeout <= now) {
             HRegionInfo regionInfo = regionState.getRegion();
-            String regionName = regionInfo.getEncodedName();
-            LOG.info("Region transition timed out for region " + regionName);
+            LOG.info("Regions in transition timed out:  " + regionState);
             // Expired!  Do a retry.
             switch(regionState.getState()) {
               case OFFLINE:
@@ -980,8 +985,8 @@ public class AssignmentManager extends Z
 
     @Override
     public String toString() {
-      return "RegionState (" + region.getRegionNameAsString() + ") " + state +
-             " at time " + stamp;
+      return region.getRegionNameAsString() + " state=" + state +
+        ", ts=" + stamp;
     }
 
     @Override

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Aug 17 23:36:49 2010
@@ -56,8 +56,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
@@ -155,6 +155,9 @@ implements HMasterInterface, HMasterRegi
   // Set on abort -- usually failure of our zk session
   private volatile boolean abort = false;
 
+  // Instance of the hbase executor service.
+  private ExecutorService service;
+
   /**
    * Initializes the HMaster. The steps are as follows:
    *
@@ -208,11 +211,12 @@ implements HMasterInterface, HMasterRegi
     this.connection = ServerConnectionManager.getConnection(conf);
     this.metrics = new MasterMetrics(this.getName());
     fileSystemManager = new MasterFileSystem(this);
-    serverManager = new ServerManager(this, metrics, fileSystemManager);
+    serverManager = new ServerManager(this, this.connection, metrics,
+      fileSystemManager);
     regionServerTracker = new RegionServerTracker(zooKeeper, this,
-        serverManager);
+      serverManager);
     catalogTracker = new CatalogTracker(zooKeeper, connection, this,
-        conf.getInt("hbase.master.catalog.timeout", -1));
+      conf.getInt("hbase.master.catalog.timeout", -1));
     assignmentManager = new AssignmentManager(this, serverManager, catalogTracker);
     clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
 
@@ -275,24 +279,20 @@ implements HMasterInterface, HMasterRegi
       // wait for minimum number of region servers to be up
       serverManager.waitForMinServers();
 
-      // TOOD: Whey do we assign root and meta on startup?  Shouldn't we check
-      // if we started the cluster before we assigning root and meta?  Perhaps
-      // they are happy where they are?  St.Ack 20100812.
-
-      // assign the root region
-      assignmentManager.assignRoot();
-      catalogTracker.waitForRoot();
-      // assign the meta region
-      assignmentManager.assignMeta();
-      catalogTracker.waitForMeta();
-      // above check waits for general meta availability but this does not
-      // guarantee that the transition has completed
-      assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
       // start assignment of user regions, startup or failure
       if (this.clusterStarter) {
-        // We're starting up the cluster.  Create or clear out unassigned node
-        // in ZK, read all regions from META and assign them out.
-        assignmentManager.processStartup();
+        // Clean out current state of unassigned
+        assignmentManager.cleanoutUnassigned();
+        // assign the root region
+        assignmentManager.assignRoot();
+        catalogTracker.waitForRoot();
+        // assign the meta region
+        assignmentManager.assignMeta();
+        catalogTracker.waitForMeta();
+        // above check waits for general meta availability but this does not
+        // guarantee that the transition has completed
+        assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
+        assignmentManager.assignAllUserRegions();
       } else {
         // Process existing unassigned nodes in ZK, read all regions from META,
         // rebuild in-memory state.
@@ -300,19 +300,10 @@ implements HMasterInterface, HMasterRegi
       }
       LOG.info("HMaster started on " + this.address.toString() +
         "; clusterStarter=" + this.clusterStarter);
+      // Check if we should stop every second.
       Sleeper sleeper = new Sleeper(1000, this);
-      int countOfServersStillRunning = this.serverManager.numServers();
       while (!this.stopped  && !this.abort) {
-        // Master has nothing to do
         sleeper.sleep();
-        if (this.serverManager.isClusterShutdown()) {
-          int count = this.serverManager.numServers();
-          if (count != countOfServersStillRunning) {
-            countOfServersStillRunning = count;
-            LOG.info("Regionserver(s) still running: " +
-              countOfServersStillRunning);
-          }
-        }
       }
     } catch (Throwable t) {
       abort("Unhandled exception. Starting shutdown.", t);
@@ -336,11 +327,10 @@ implements HMasterInterface, HMasterRegi
     this.rpcServer.stop();
     this.activeMasterManager.stop();
     this.zooKeeper.close();
-    HBaseExecutorService.shutdown();
+    this.service.shutdown();
     LOG.info("HMaster main thread exiting");
   }
 
-  @Override
   public HServerAddress getHServerAddress() {
     return address;
   }
@@ -373,9 +363,7 @@ implements HMasterInterface, HMasterRegi
     return this.infoServer;
   }
 
-  /**
-   * @return Return configuration being used by this server.
-   */
+  @Override
   public Configuration getConfiguration() {
     return this.conf;
   }
@@ -384,10 +372,6 @@ implements HMasterInterface, HMasterRegi
     return this.serverManager;
   }
 
-  public ServerConnection getServerConnection() {
-    return this.connection;
-  }
-
   /**
    * Get the ZK wrapper object - needed by master_jsp.java
    * @return the zookeeper wrapper
@@ -396,6 +380,11 @@ implements HMasterInterface, HMasterRegi
     return this.zooKeeper;
   }
 
+  @Override
+  public ExecutorService getExecutorService() {
+    return this.service;
+  }
+
   /*
    * Start up all services. If any of these threads gets an unhandled exception
    * then they just die with a logged message.  This should be fine because
@@ -406,18 +395,15 @@ implements HMasterInterface, HMasterRegi
   private void startServiceThreads() {
     try {
       // Start the executor service pools
-      HBaseExecutorServiceType.MASTER_OPEN_REGION.startExecutorService(
-        getServerName(),
-          conf.getInt("hbase.master.executor.openregion.threads", 5));
-      HBaseExecutorServiceType.MASTER_CLOSE_REGION.startExecutorService(
-        getServerName(),
-          conf.getInt("hbase.master.executor.closeregion.threads", 5));
-      HBaseExecutorServiceType.MASTER_SERVER_OPERATIONS.startExecutorService(
-        getServerName(),
-          conf.getInt("hbase.master.executor.serverops.threads", 5));
-      HBaseExecutorServiceType.MASTER_TABLE_OPERATIONS.startExecutorService(
-        getServerName(),
-          conf.getInt("hbase.master.executor.tableops.threads", 5));
+      this.service = new ExecutorService(getServerName());
+      this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
+        conf.getInt("hbase.master.executor.openregion.threads", 5));
+      this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
+        conf.getInt("hbase.master.executor.closeregion.threads", 5));
+      this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
+        conf.getInt("hbase.master.executor.serverops.threads", 5));
+      this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS,
+        conf.getInt("hbase.master.executor.tableops.threads", 5));
 
       // Put up info server.
       int port = this.conf.getInt("hbase.master.info.port", 60010);
@@ -685,8 +671,8 @@ implements HMasterInterface, HMasterRegi
   public void modifyTable(final byte[] tableName, HTableDescriptor htd)
   throws IOException {
     LOG.info("modifyTable(SET_HTD): " + htd);
-    new ModifyTableHandler(tableName, this, catalogTracker, fileSystemManager)
-    .submit();
+    this.service.submit(new ModifyTableHandler(tableName, this, catalogTracker,
+      fileSystemManager));
   }
 
   /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Aug 17 23:36:49 2010
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.PleaseHol
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
@@ -67,8 +67,6 @@ import org.apache.hadoop.hbase.util.Thre
 public class ServerManager {
   private static final Log LOG = LogFactory.getLog(ServerManager.class);
 
-  private final AtomicInteger availableServers = new AtomicInteger(0);
-
   // Set if we are to shutdown the cluster.
   private volatile boolean clusterShutdown = false;
 
@@ -103,6 +101,8 @@ public class ServerManager {
 
   private final OldLogsCleaner oldLogCleaner;
 
+  private final ServerConnection connection;
+
   /**
    * Dumps into log current stats on dead servers and number of servers
    * TODO: Make this a metric; dump metrics into log.
@@ -114,7 +114,7 @@ public class ServerManager {
 
     @Override
     protected void chore() {
-      int numServers = availableServers.get();
+      int numServers = numServers();
       int numDeadServers = deadServers.size();
       double averageLoad = getAverageLoad();
       String deadServersList = null;
@@ -146,10 +146,12 @@ public class ServerManager {
    * @param masterFileSystem
    */
   public ServerManager(Server master,
+      final ServerConnection connection,
       MasterMetrics masterMetrics,
       MasterFileSystem masterFileSystem) {
     this.master = master;
     this.masterMetrics = masterMetrics;
+    this.connection = connection;
     Configuration c = master.getConfiguration();
     int metaRescanInterval = c.getInt("hbase.master.meta.thread.rescanfrequency",
       60 * 1000);
@@ -246,9 +248,8 @@ public class ServerManager {
     String serverName = info.getServerName();
     info.setLoad(load);
     // TODO: Why did we update the RS location ourself?  Shouldn't RS do this?
-//    masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
+    // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
     onlineServers.put(serverName, info);
-    availableServers.incrementAndGet();
     if(hri == null) {
       serverConnections.remove(serverName);
     } else {
@@ -291,7 +292,19 @@ public class ServerManager {
       return HMsg.STOP_REGIONSERVER_ARRAY;
     }
 
-    return processRegionServerAllsWell(info, mostLoadedRegions, HMsg.EMPTY_HMSG_ARRAY);
+    HMsg [] reply = null;
+    int numservers = numServers();
+    if (this.clusterShutdown) {
+      if (numservers <= 2) {
+        // Shutdown needs to be staggered; the meta regions need to close last
+        // in case they need to be updated during the close melee.  If <= 2
+        // servers left, then these are the two that were carrying root and meta
+        // most likely (TODO: This presumes unsplittable meta -- FIX). Tell
+        // these servers can shutdown now too.
+        reply = HMsg.STOP_REGIONSERVER_ARRAY;
+      }
+    }
+    return processRegionServerAllsWell(info, mostLoadedRegions, reply);
   }
 
   private boolean raceThatShouldNotHappenAnymore(final HServerInfo storedInfo,
@@ -342,7 +355,6 @@ public class ServerManager {
   private boolean removeServerInfo(final String serverName) {
     HServerInfo info = this.onlineServers.remove(serverName);
     if (info != null) {
-      this.availableServers.decrementAndGet();
       return true;
     }
     return false;
@@ -368,7 +380,12 @@ public class ServerManager {
 
   /** @return the number of active servers */
   public int numServers() {
-    return availableServers.get();
+    int num = -1;
+    // This synchronized seems gratuitous.
+    synchronized (this.onlineServers) {
+      num = this.onlineServers.size();
+    }
+    return num;
   }
 
   /**
@@ -453,18 +470,20 @@ public class ServerManager {
     }
     // Remove the server from the known servers lists and update load info
     this.onlineServers.remove(serverName);
-    this.availableServers.decrementAndGet();
     this.serverConnections.remove(serverName);
     // If cluster is going down, yes, servers are going to be expiring; don't
     // process as a dead server
     if (this.clusterShutdown) {
-      LOG.info("Cluster shutdown in progress; " + hsi.getServerName() +
-        " is down");
+      LOG.info("Cluster shutdown set; " + hsi.getServerName() +
+        " expired; onlineServers=" + this.onlineServers.size());
+      if (this.onlineServers.isEmpty()) {
+        master.stop("Cluster shutdown set; onlineServer=0");
+      }
       return;
     }
     // Add to dead servers and queue a shutdown processing.
     this.deadServers.add(serverName);
-    new ServerShutdownHandler(master).submit();
+    this.master.getExecutorService().submit(new ServerShutdownHandler(master));
     LOG.debug("Added=" + serverName +
       " to dead servers, submitted shutdown handler to be executed");
   }
@@ -562,7 +581,7 @@ public class ServerManager {
       HRegionInterface hri = serverConnections.get(info.getServerName());
       if(hri == null) {
         LOG.info("new connection");
-        hri = master.getServerConnection().getHRegionConnection(
+        hri = this.connection.getHRegionConnection(
           info.getServerAddress(), false);
         serverConnections.put(info.getServerName(), hri);
       }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Aug 17 23:36:49 2010
@@ -1164,7 +1164,7 @@ public class HRegion implements HeapSize
   protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
     newScannerLock.readLock().lock();
     try {
-      if (this.closed.get()) {
+      if (this.closing.get() || this.closed.get()) {
         throw new NotServingRegionException("Region " + this + " closed");
       }
       // Verify families are all valid

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Aug 17 23:36:49 2010
@@ -58,7 +58,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
@@ -73,8 +75,6 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -85,8 +85,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -246,6 +246,9 @@ public class HRegionServer implements HR
 
   private final String machineName;
 
+  // Instance of the hbase executor service.
+  private ExecutorService service;
+
   /**
    * Starts a HRegionServer at the default location
    *
@@ -355,20 +358,10 @@ public class HRegionServer implements HR
       @Override
       public synchronized void nodeDeleted(String path) {
         super.nodeDeleted(path);
-        if (getData() == null) {
+        if (isClusterShutdown()) {
           // Cluster was just marked for shutdown.
           LOG.info("Received cluster shutdown message");
-          closeUserRegions();
-          TODO: QUEUE UP THE CLOSEREGIONHANDLERSS FOR USER REGIONS
-          if (onlineRegions.isEmpty()) {
-            // We closed all user regions and there is nothing else left
-            // on this server; just go down.
-            HRegionServer.this.stop("All user regions closed; not " +
-              "carrying catalog regions -- so stopping");
-          } else {
-            LOG.info("Closed all user regions; still carrying " +
-              onlineRegions.values());
-          }
+          closeUserRegions(false);
         }
       }
     };
@@ -376,30 +369,16 @@ public class HRegionServer implements HR
     this.clusterShutdownTracker.start();
   }
 
+  /**
+   * @return True if cluster shutdown in progress
+   */
+  private boolean isClusterShutdown() {
+    return this.clusterShutdownTracker.getData() == null;
+  }
+
   private void initializeThreads() throws IOException {
     this.workerThread = new Thread(worker);
 
-    // Start executor services
-
-    HBaseExecutorServiceType.RS_OPEN_REGION.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.openregion.threads", 5));
-    HBaseExecutorServiceType.RS_OPEN_ROOT.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
-    HBaseExecutorServiceType.RS_OPEN_META.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
-    HBaseExecutorServiceType.RS_CLOSE_REGION.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.closeregion.threads", 5));
-    HBaseExecutorServiceType.RS_CLOSE_ROOT.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
-    HBaseExecutorServiceType.RS_CLOSE_META.startExecutorService(
-        getServerName(),
-        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
-
     // Cache flushing thread.
     this.cacheFlusher = new MemStoreFlusher(conf, this);
 
@@ -430,33 +409,19 @@ public class HRegionServer implements HR
   public void run() {
     regionServerThread = Thread.currentThread();
     try {
-      MapWritable w = null;
       while (!this.stopped) {
-        w = reportForDuty();
-        if (w != null) {
-          init(w);
-          break;
-        }
-        sleeper.sleep();
-        LOG.warn("No response from master on reportForDuty. Sleeping and "
-            + "then trying again.");
+        if (tryReportForDuty()) break;
       }
-      List<HMsg> outboundMessages = new ArrayList<HMsg>();
       long lastMsg = 0;
-      // Now ask master what it wants us to do and tell it what we have done
+      List<HMsg> outboundMessages = new ArrayList<HMsg>();
+      // The main run loop.
       for (int tries = 0; !this.stopped && isHealthy();) {
-        // Try to get the root region location from zookeeper.
-        if (!haveRootRegion.get()) {
-          HServerAddress rootServer = catalogTracker.getRootLocation();
-          if (rootServer != null) {
-            // By setting the root region location, we bypass the wait imposed
-            // on
-            // HTable for all regions being assigned.
-            this.connection.setRootRegionLocation(new HRegionLocation(
-                HRegionInfo.ROOT_REGIONINFO, rootServer));
-            haveRootRegion.set(true);
-          }
+        if (isClusterShutdown() && this.onlineRegions.isEmpty()) {
+          stop("Exiting; cluster shutdown set and not carrying any regions");
+          continue;
         }
+        // Try to get the root region location from zookeeper.
+        checkRootRegionLocation();
         long now = System.currentTimeMillis();
         // Drop into the send loop if msgInterval has elapsed or if something
         // to send. If we fail talking to the master, then we'll sleep below
@@ -464,49 +429,11 @@ public class HRegionServer implements HR
         if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
           try {
             doMetrics();
-            MemoryUsage memory =
-              ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-            HServerLoad hsl = new HServerLoad(requestCount.get(),
-              (int)(memory.getUsed() / 1024 / 1024),
-              (int) (memory.getMax() / 1024 / 1024));
-            for (HRegion r : onlineRegions.values()) {
-              hsl.addRegionInfo(createRegionLoad(r));
-            }
-            this.serverInfo.setLoad(hsl);
-            this.requestCount.set(0);
-            addOutboundMsgs(outboundMessages);
-            HMsg msgs[] = this.hbaseMaster.regionServerReport(serverInfo,
-              outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
-              getMostLoadedRegions());
+            tryRegionServerReport(outboundMessages);
             lastMsg = System.currentTimeMillis();
-            updateOutboundMsgs(outboundMessages);
-            outboundMessages.clear();
-
-            // Queue up the HMaster's instruction stream for processing
-            boolean restart = false;
-            for (int i = 0; !restart && !stopped && i < msgs.length; i++) {
-              LOG.info(msgs[i].toString());
-              this.connection.unsetRootRegionLocation();
-              switch (msgs[i].getType()) {
-
-                default:
-                  if (fsOk) {
-                    try {
-                      toDo.put(new ToDoEntry(msgs[i]));
-                    } catch (InterruptedException e) {
-                      throw new RuntimeException("Putting into msgQueue was "
-                          + "interrupted.", e);
-                    }
-                  }
-              }
-            }
             // Reset tries count if we had a successful transaction.
             tries = 0;
-
-            if (restart || this.stopped) {
-              toDo.clear();
-              continue;
-            }
+            if (this.stopped) continue;
           } catch (Exception e) { // FindBugs REC_CATCH_EXCEPTION
             // Two special exceptions could be printed out here,
             // PleaseHoldException and YouAreDeadException
@@ -534,18 +461,15 @@ public class HRegionServer implements HR
           }
         }
         now = System.currentTimeMillis();
-        HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)),
-            TimeUnit.MILLISECONDS);
-        // If we got something, add it to list of things to send.
-        if (msg != null) {
-          outboundMessages.add(msg);
-        }
+        HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), TimeUnit.MILLISECONDS);
+        if (msg != null) outboundMessages.add(msg);
       } // for
     } catch (Throwable t) {
       if (!checkOOME(t)) {
         abort("Unhandled exception", t);
       }
     }
+    this.toDo.clear();
     this.leases.closeAfterLeasesExpire();
     this.worker.stop();
     this.server.stop();
@@ -574,16 +498,17 @@ public class HRegionServer implements HR
       // Just skip out w/o closing regions.
     } else if (abortRequested) {
       if (this.fsOk) {
+        closeAllRegions(abortRequested); // Don't leave any open file handles
         closeWAL(false);
-        closeAllRegions(); // Don't leave any open file handles
       }
       LOG.info("aborting server at: " + this.serverInfo.getServerName());
     } else {
-      closeAllRegions();
+      closeAllRegions(abortRequested);
       closeWAL(true);
       closeAllScanners();
       LOG.info("stopping server at: " + this.serverInfo.getServerName());
     }
+    waitOnAllRegionsToClose();
 
     // Make sure the proxy is down.
     if (this.hbaseMaster != null) {
@@ -599,6 +524,102 @@ public class HRegionServer implements HR
     LOG.info(Thread.currentThread().getName() + " exiting");
   }
 
+  /**
+   * Wait on regions close.
+   */
+  private void waitOnAllRegionsToClose() {
+    // Wait till all regions are closed before going out.
+    int lastCount = -1;
+    while (!this.onlineRegions.isEmpty()) {
+      int count = this.onlineRegions.size();
+      // Only print a message if the count of regions has changed.
+      if (count != lastCount) {
+        lastCount = count;
+        LOG.info("Waiting on " + count + " regions to close");
+        // Only print out regions still closing if a small number else will
+        // swamp the log.
+        if (count < 10) {
+          LOG.debug(this.onlineRegions);
+        }
+      }
+      Threads.sleep(1000);
+    }
+  }
+
+  List<HMsg> tryRegionServerReport(final List<HMsg> outboundMessages)
+  throws IOException {
+    this.serverInfo.setLoad(buildServerLoad());
+    this.requestCount.set(0);
+    addOutboundMsgs(outboundMessages);
+    HMsg [] msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
+      outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
+      getMostLoadedRegions());
+    updateOutboundMsgs(outboundMessages);
+    outboundMessages.clear();
+
+    // Queue up the HMaster's instruction stream for processing
+    for (int i = 0; !this.stopped && msgs != null && i < msgs.length; i++) {
+      LOG.info(msgs[i].toString());
+      // Intercept stop regionserver messages
+      if (msgs[i].getType().equals(HMsg.Type.STOP_REGIONSERVER)) {
+        stop("Received " + msgs[i]);
+        continue;
+      }
+      this.connection.unsetRootRegionLocation();
+      switch (msgs[i].getType()) {
+        default:
+          if (fsOk) {
+            try {
+              toDo.put(new ToDoEntry(msgs[i]));
+            } catch (InterruptedException e) {
+              throw new RuntimeException("Putting into msgQueue was "
+                  + "interrupted.", e);
+            }
+          }
+      }
+    }
+    return outboundMessages;
+  }
+
+  private HServerLoad buildServerLoad() {
+    MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+    HServerLoad hsl = new HServerLoad(requestCount.get(),
+      (int)(memory.getUsed() / 1024 / 1024),
+      (int) (memory.getMax() / 1024 / 1024));
+    for (HRegion r : this.onlineRegions.values()) {
+      hsl.addRegionInfo(createRegionLoad(r));
+    }
+    return hsl;
+  }
+
+  /**
+   * @return True if successfully invoked {@link #reportForDuty()}
+   * @throws IOException
+   */
+  private boolean tryReportForDuty() throws IOException {
+    MapWritable w = reportForDuty();
+    if (w != null) {
+      init(w);
+      return true;
+    }
+    sleeper.sleep();
+    LOG.warn("No response on reportForDuty. Sleeping and then retrying.");
+    return false;
+  }
+
+  private void checkRootRegionLocation() throws InterruptedException {
+    if (this.haveRootRegion.get()) return;
+    HServerAddress rootServer = catalogTracker.getRootLocation();
+    if (rootServer != null) {
+      // By setting the root region location, we bypass the wait imposed on
+      // HTable for all regions being assigned.
+      HRegionLocation hrl =
+        new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer);
+      this.connection.setRootRegionLocation(hrl);
+      this.haveRootRegion.set(true);
+    }
+  }
+
   private void closeWAL(final boolean delete) {
     try {
       if (this.hlog != null) {
@@ -995,6 +1016,22 @@ public class HRegionServer implements HR
         abort("Uncaught exception in service thread " + t.getName(), e);
       }
     };
+
+    // Start executor services
+    this.service = new ExecutorService(getServerName());
+    this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
+      conf.getInt("hbase.regionserver.executor.openregion.threads", 5));
+    this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
+      conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
+    this.service.startExecutorService(ExecutorType.RS_OPEN_META,
+      conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
+    this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
+      conf.getInt("hbase.regionserver.executor.closeregion.threads", 5));
+    this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
+      conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
+    this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
+      conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
+
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
         handler);
@@ -1130,7 +1167,7 @@ public class HRegionServer implements HR
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
-    HBaseExecutorService.shutdown();
+    this.service.shutdown();
   }
 
   /**
@@ -1339,98 +1376,55 @@ public class HRegionServer implements HR
     }
   }
 
-  /** Called either when the master tells us to restart or from stop() */
-  ArrayList<HRegion> closeAllRegions() {
-    List<HRegion> closedRegions = new ArrayList<HRegion>();
-    closedRegions.addAll(closeUserRegions());
-    ArrayList<HRegion> remainingRegionsToClose = new ArrayList<HRegion>();
+  /**
+   * Closes all regions.  Called on our way out.
+   * Assumes that its not possible for new regions to be added to onlineRegions
+   * while this method runs.
+   */
+  protected void closeAllRegions(final boolean abort) {
+    closeUserRegions(abort);
+    // Only root and meta should remain.  Are we carrying root or meta?
+    HRegion meta = null;
+    HRegion root = null;
     this.lock.writeLock().lock();
     try {
-      remainingRegionsToClose.addAll(onlineRegions.values());
+      for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
+        HRegionInfo hri = e.getValue().getRegionInfo();
+        if (hri.isRootRegion()) {
+          root = e.getValue();
+        } else if (hri.isMetaRegion()) {
+          meta = e.getValue();
+        }
+        if (meta != null && root != null) break;
+      }
     } finally {
       this.lock.writeLock().unlock();
     }
-    closeRegions(remainingRegionsToClose);
-    if (!this.onlineRegions.isEmpty()) {
-      LOG.warn("Online regions is not empty: " + this.onlineRegions);
-    }
-    return remainingRegionsToClose;
+    if (meta != null) closeRegion(meta.getRegionInfo(), abort, false);
+    if (root != null) closeRegion(root.getRegionInfo(), abort, false);
   }
 
-  /*
-   * Thread to run close of a region.
+  /**
+   * Schedule closes on all user regions.
+   * @param abort Whether we're running an abort.
    */
-  private static class RegionCloserThread extends Thread {
-    private final HRegion r;
-
-    protected RegionCloserThread(final HRegion r) {
-      super(Thread.currentThread().getName() + ".regionCloser." + r.toString());
-      this.r = r;
-    }
-
-    @Override
-    public void run() {
-      try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing region " + r.toString());
-        }
-        r.close();
-      } catch (Throwable e) {
-        LOG.error("Error closing region " + r.toString(),
-            RemoteExceptionHandler.checkThrowable(e));
-      }
-    }
-  }
-
-  /** Called as the first stage of cluster shutdown. */
-  List<HRegion> closeUserRegions() {
-    ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
+  void closeUserRegions(final boolean abort) {
     this.lock.writeLock().lock();
     try {
-      synchronized (onlineRegions) {
-        for (Iterator<Map.Entry<String, HRegion>> i = onlineRegions.entrySet().iterator();
-            i.hasNext();) {
-          Map.Entry<String, HRegion> e = i.next();
+      synchronized (this.onlineRegions) {
+        for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
           HRegion r = e.getValue();
           if (!r.getRegionInfo().isMetaRegion()) {
-            regionsToClose.add(r);
-            i.remove();
+            // Don't update zk with this close transition; pass false.
+            closeRegion(r.getRegionInfo(), abort, false);
           }
         }
       }
     } finally {
       this.lock.writeLock().unlock();
     }
-    return closeRegions(regionsToClose);
-  }
-
-  List<HRegion> closeRegions(final List<HRegion> regions) {
-    // Run region closes in parallel.
-    Set<Thread> threads = new HashSet<Thread>();
-    try {
-      for (final HRegion r : regions) {
-        RegionCloserThread t = new RegionCloserThread(r);
-        t.start();
-        threads.add(t);
-      }
-    } finally {
-      for (Thread t : threads) {
-        while (t.isAlive()) {
-          try {
-            t.join();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      }
-    }
-    return regions;
   }
 
-  //
-  // HRegionInterface
-  //
-
   public HRegionInfo getRegionInfo(final byte[] regionName)
       throws NotServingRegionException {
     requestCount.incrementAndGet();
@@ -1893,19 +1887,18 @@ public class HRegionServer implements HR
     LOG.info("Received request to open region: " +
       region.getRegionNameAsString());
     if(region.isRootRegion()) {
-      new OpenRootHandler(this, catalogTracker, region).submit();
+      this.service.submit(new OpenRootHandler(this, catalogTracker, region));
     } else if(region.isMetaRegion()) {
-      new OpenMetaHandler(this, catalogTracker, region).submit();
+      this.service.submit(new OpenMetaHandler(this, catalogTracker, region));
     } else {
-      new OpenRegionHandler(this, catalogTracker, region).submit();
+      this.service.submit(new OpenRegionHandler(this, catalogTracker, region));
     }
   }
 
   @Override
   public boolean closeRegion(HRegionInfo region)
-      throws NotServingRegionException {
-    LOG.info("Received close region: "
-        + region.getRegionNameAsString());
+  throws NotServingRegionException {
+    LOG.info("Received close region: " + region.getRegionNameAsString());
     // TODO: Need to check if this is being served here but currently undergoing
     // a split (so master needs to retry close after split is complete)
     if (!onlineRegions.containsKey(region.getEncodedName())) {
@@ -1913,13 +1906,28 @@ public class HRegionServer implements HR
       throw new NotServingRegionException("Received close for "
           + region.getRegionNameAsString() + " but we are not serving it");
     }
-    if(region.isRootRegion()) {
-      new CloseRootHandler(this, region).submit();
-    } else if(region.isMetaRegion()) {
-      new CloseMetaHandler(this, region).submit();
+    return closeRegion(region, false, true);
+  }
+
+  /**
+   * @param region Region to close
+   * @param abort True if we are aborting
+   * @param zk True if we are to update zk about the region close; if the close
+   * was orchestrated by master, then update zk.  If the close is being run by
+   * the regionserver because its going down, don't update zk.
+   * @return
+   */
+  protected boolean closeRegion(HRegionInfo region, final boolean abort,
+      final boolean zk) {
+    CloseRegionHandler crh = null;
+    if (region.isRootRegion()) {
+      crh = new CloseRootHandler(this, region, abort, zk);
+    } else if (region.isMetaRegion()) {
+      crh = new CloseMetaHandler(this, region, abort, zk);
     } else {
-      new CloseRegionHandler(this, region).submit();
+      crh = new CloseRegionHandler(this, region, abort, zk);
     }
+    this.service.submit(crh);
     return true;
   }
 
@@ -2279,13 +2287,23 @@ public class HRegionServer implements HR
   }
 
   @Override
-  public HServerAddress getHServerAddress() {
-    return this.address;
+  public ZooKeeperWatcher getZooKeeper() {
+    return zooKeeper;
   }
 
   @Override
-  public ZooKeeperWatcher getZooKeeper() {
-    return zooKeeper;
+  public String getServerName() {
+    return serverInfo.getServerName();
+  }
+ 
+  @Override
+  public ExecutorService getExecutorService() {
+    return this.service;
+  }
+
+  @Override
+  public CompactionRequestor getCompactionRequester() {
+    return this.compactSplitThread;
   }
 
   //
@@ -2413,19 +2431,4 @@ public class HRegionServer implements HR
         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
     doMain(args, regionServerClass);
   }
-
-  @Override
-  public String getServerName() {
-    return serverInfo.getServerName();
-  }
-
-  @Override
-  public CompactionRequestor getCompactionRequester() {
-    return this.compactSplitThread;
-  }
-
-  @Override
-  public ServerConnection getServerConnection() {
-    return connection;
-  }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java Tue Aug 17 23:36:49 2010
@@ -24,12 +24,16 @@ import org.apache.hadoop.hbase.regionser
 
 /**
  * Handles closing of the root region on a region server.
- * <p>
- * This is executed after receiving an CLOSE RPC from the master for root.
  */
 public class CloseMetaHandler extends CloseRegionHandler {
-  public CloseMetaHandler(RegionServer server,
-      HRegionInfo regionInfo) {
-    super(server, regionInfo, false, true, EventType.M2RS_CLOSE_META);
+  // Called when master tells us shutdown a region via close rpc
+  public CloseMetaHandler(RegionServer server, HRegionInfo regionInfo) {
+    this(server, regionInfo, false, true);
+  }
+
+  // Called when regionserver determines its to go down; not master orchestrated
+  public CloseMetaHandler(RegionServer server, HRegionInfo regionInfo,
+      final boolean abort, final boolean zk) {
+    super(server, regionInfo, abort, zk, EventType.M2RS_CLOSE_META);
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java Tue Aug 17 23:36:49 2010
@@ -32,10 +32,13 @@ import org.apache.zookeeper.KeeperExcept
 
 /**
  * Handles closing of a region on a region server.
- * <p>
- * This is executed after receiving an CLOSE RPC from the master.
  */
 public class CloseRegionHandler extends EventHandler {
+  // NOTE on priorities shutting down.  There are none for close. There are some
+  // for open.  I think that is right.  On shutdown, we want the meta to close
+  // before root and both to close after the user regions have closed.  What
+  // about the case where master tells us to shutdown a catalog region and we
+  // have a running queue of user regions to close?
   private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class);
 
   private final int FAILED = -1;
@@ -54,8 +57,8 @@ public class CloseRegionHandler extends 
   // CLOSING.
   private final boolean zk;
 
-  public CloseRegionHandler(RegionServer server,
-      HRegionInfo regionInfo) {
+  // This is executed after receiving an CLOSE RPC from the master.
+  public CloseRegionHandler(RegionServer server, HRegionInfo regionInfo) {
     this(server, regionInfo, false, true);
   }
 
@@ -71,9 +74,8 @@ public class CloseRegionHandler extends 
     this(server, regionInfo, abort, zk, EventType.M2RS_CLOSE_REGION);
   }
 
-  protected CloseRegionHandler(RegionServer server,
-      HRegionInfo regionInfo, boolean abort, final boolean zk,
-      EventType eventType) {
+  protected CloseRegionHandler(RegionServer server, HRegionInfo regionInfo,
+      boolean abort, final boolean zk, EventType eventType) {
     super(server, eventType);
     this.server = server;
     this.regionInfo = regionInfo;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRootHandler.java Tue Aug 17 23:36:49 2010
@@ -24,12 +24,17 @@ import org.apache.hadoop.hbase.regionser
 
 /**
  * Handles closing of the root region on a region server.
- * <p>
- * This is executed after receiving an CLOSE RPC from the master for root.
  */
 public class CloseRootHandler extends CloseRegionHandler {
-  public CloseRootHandler(RegionServer server,
-      HRegionInfo regionInfo) {
-    super(server, regionInfo, false, true, EventType.M2RS_CLOSE_ROOT);
+  // This is executed after receiving an CLOSE RPC from the master for root.
+  public CloseRootHandler(RegionServer server, HRegionInfo regionInfo) {
+    this(server, regionInfo, false, true);
+  }
+
+  // This is called directly by the regionserver when its determined its
+  // shutting down.
+  public CloseRootHandler(RegionServer server, HRegionInfo regionInfo,
+      final boolean abort, final boolean zk) {
+    super(server, regionInfo, abort, zk, EventType.M2RS_CLOSE_ROOT);
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java Tue Aug 17 23:36:49 2010
@@ -564,7 +564,8 @@ public class ZKAssign {
   throws KeeperException {
     String encoded = region.getEncodedName();
     if(zkw.isDebugEnabled()) {
-      zkw.debug("Attempting to transition node for " + encoded +
+      zkw.debug("Attempting to transition node " +
+        HRegionInfo.prettyPrint(encoded) +
         " from " + beginState.toString() + " to " + endState.toString());
     }
 
@@ -606,7 +607,7 @@ public class ZKAssign {
         return -1;
       }
       if(zkw.isDebugEnabled()) {
-        zkw.debug("Successfully transitioned node for " + encoded +
+        zkw.debug("Successfully transitioned node " + encoded +
           " from " + beginState + " to " + endState);
       }
       return stat.getVersion() + 1;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue Aug 17 23:36:49 2010
@@ -186,14 +186,14 @@ public class ZKUtil {
   throws KeeperException {
     try {
       Stat s = zkw.getZooKeeper().exists(znode, zkw);
-      zkw.debug("Set watcher on existing znode (" + znode + ")");
+      zkw.debug("Set watcher on existing znode " + znode);
       return s != null ? true : false;
     } catch (KeeperException e) {
-      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.warn("Unable to set watcher on znode " + znode, e);
       zkw.keeperException(e);
       return false;
     } catch (InterruptedException e) {
-      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.warn("Unable to set watcher on znode " + znode, e);
       zkw.interruptedException(e);
       return false;
     }
@@ -253,15 +253,15 @@ public class ZKUtil {
       List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
       return children;
     } catch(KeeperException.NoNodeException ke) {
-      zkw.debug("Unable to list children of znode (" + znode + ") " +
+      zkw.debug("Unable to list children of znode " + znode + " " +
           "because node does not exist (not an error)");
       return null;
     } catch (KeeperException e) {
-      zkw.warn("Unable to list children of znode (" + znode + ")", e);
+      zkw.warn("Unable to list children of znode " + znode + " ", e);
       zkw.keeperException(e);
       return null;
     } catch (InterruptedException e) {
-      zkw.warn("Unable to list children of znode (" + znode + ")", e);
+      zkw.warn("Unable to list children of znode " + znode + " ", e);
       zkw.interruptedException(e);
       return null;
     }
@@ -397,15 +397,15 @@ public class ZKUtil {
     try {
       return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
     } catch(KeeperException.NoNodeException ke) {
-      zkw.debug("Unable to list children of znode (" + znode + ") " +
+      zkw.debug("Unable to list children of znode " + znode + " " +
       "because node does not exist (not an error)");
       return false;
     } catch (KeeperException e) {
-      zkw.warn("Unable to list children of znode (" + znode + ")", e);
+      zkw.warn("Unable to list children of znode " + znode, e);
       zkw.keeperException(e);
       return false;
     } catch (InterruptedException e) {
-      zkw.warn("Unable to list children of znode (" + znode + ")", e);
+      zkw.warn("Unable to list children of znode " + znode, e);
       zkw.interruptedException(e);
       return false;
     }
@@ -457,19 +457,19 @@ public class ZKUtil {
   throws KeeperException {
     try {
       byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
-      zkw.debug("Retrieved " + data.length + " bytes of data from znode (" +
-          znode + ") and set a watcher");
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " +
+          znode + " and set a watcher");
       return data;
     } catch (KeeperException.NoNodeException e) {
-      zkw.debug("Unable to get data of znode (" + znode + ") " +
+      zkw.debug("Unable to get data of znode " + znode + " " +
           "because node does not exist (not an error)");
       return null;
     } catch (KeeperException e) {
-      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.warn("Unable to get data of znode " + znode, e);
       zkw.keeperException(e);
       return null;
     } catch (InterruptedException e) {
-      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.warn("Unable to get data of znode " + znode, e);
       zkw.interruptedException(e);
       return null;
     }
@@ -495,19 +495,18 @@ public class ZKUtil {
   throws KeeperException {
     try {
       byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat);
-      zkw.debug("Retrieved " + data.length + " bytes of data from znode (" +
-          znode + ") and set a watcher");
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode);
       return data;
     } catch (KeeperException.NoNodeException e) {
-      zkw.debug("Unable to get data of znode (" + znode + ") " +
+      zkw.debug("Unable to get data of znode " + znode + " " +
           "because node does not exist (not necessarily an error)");
       return null;
     } catch (KeeperException e) {
-      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.warn("Unable to get data of znode " + znode, e);
       zkw.keeperException(e);
       return null;
     } catch (InterruptedException e) {
-      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.warn("Unable to get data of znode " + znode, e);
       zkw.interruptedException(e);
       return null;
     }
@@ -534,7 +533,7 @@ public class ZKUtil {
       return null;
     }
     String addrString = Bytes.toString(data);
-    zkw.debug("Read server address from znode (" + znode + "): " + addrString);
+    zkw.debug("Read server address from znode " + znode + ": " + addrString);
     return new HServerAddress(addrString);
   }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue Aug 17 23:36:49 2010
@@ -165,9 +165,9 @@ public class ZooKeeperWatcher implements
   @Override
   public void process(WatchedEvent event) {
     LOG.debug("<" + name + "> Received ZooKeeper Event, " +
-        "type: " + event.getType() + ", " +
-        "state:" + event.getState() + ", " +
-        "path: " + event.getPath());
+        "type=" + event.getType() + ", " +
+        "state=" + event.getState() + ", " +
+        "path=" + event.getPath());
 
     // While we are still using both ZKWs, need to call parent process()
 //    super.process(event);

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=986524&r1=986523&r2=986524&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Aug 17 23:36:49 2010
@@ -161,10 +161,7 @@ public class HBaseTestingUtility {
    * @throws IOException If a cluster -- zk, dfs, or hbase -- already running.
    */
   void isRunningCluster(String passedBuildPath) throws IOException {
-    if (this.clusterTestBuildDir == null ||
-        passedBuildPath != null) {
-      return;
-    }
+    if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
     throw new IOException("Cluster already running at " +
       this.clusterTestBuildDir);
   }
@@ -252,6 +249,7 @@ public class HBaseTestingUtility {
   public void shutdownMiniZKCluster() throws IOException {
     if (this.zkCluster != null) {
       this.zkCluster.shutdown();
+      this.zkCluster = null;
     }
   }
 
@@ -282,10 +280,10 @@ public class HBaseTestingUtility {
   throws Exception {
     LOG.info("Starting up minicluster");
     // If we already put up a cluster, fail.
-    String testBuildPath = conf.get("hbase.test.build.dir", null);
+    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
     isRunningCluster(testBuildPath);
     if(testBuildPath != null) {
-      LOG.info("\n\nUsing passed path: " + testBuildPath + "\n\n");
+      LOG.info("Using passed path: " + testBuildPath);
     }
     // Make a new random dir to home everything in.  Set it as system property.
     // minidfs reads home from system property.
@@ -373,6 +371,7 @@ public class HBaseTestingUtility {
           new Path(this.clusterTestBuildDir.toString()))) {
         LOG.warn("Failed delete of " + this.clusterTestBuildDir.toString());
       }
+      this.clusterTestBuildDir = null;
     }
     LOG.info("Minicluster is down");
   }



Mime
View raw message