hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jg...@apache.org
Subject svn commit: r953920 [1/2] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/executor/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/master/handler/ src/main/j...
Date Sat, 12 Jun 2010 01:17:46 GMT
Author: jgray
Date: Sat Jun 12 01:17:45 2010
New Revision: 953920

URL: http://svn.apache.org/viewvc?rev=953920&view=rev
Log:
HBASE-2694  Move RS to Master region open/close messaging into ZooKeeper

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Jun 12 01:17:45 2010
@@ -686,6 +686,7 @@ Release 0.21.0 - Unreleased
    HBASE-2618  Don't inherit from HConstants (Benoit Sigoure via Stack)
    HBASE-2208  TableServers # processBatchOfRows - converts from List to [ ]
                - Expensive copy 
+   HBASE-2694  Move RS to Master region open/close messaging into ZooKeeper
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Jun 12 01:17:45 2010
@@ -220,13 +220,16 @@ public class HConnectionManager {
     }
 
     /**
-     * Get this watcher's ZKW, instanciate it if necessary.
+     * Get this watcher's ZKW, instantiate it if necessary.
      * @return ZKW
      * @throws java.io.IOException if a remote or network exception occurs
      */
     public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
       if(zooKeeperWrapper == null) {
-        zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
+        String zkWrapperName = HConnectionManager.class.getName() + "-" + 
+                               ZooKeeperWrapper.getZookeeperClusterKey(conf);
+        zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, zkWrapperName);
+        zooKeeperWrapper.registerListener(this);
       }
       return zooKeeperWrapper;
     }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
+import org.apache.hadoop.hbase.master.ServerManager;
+
+
+/**
+ * Abstract base class for all HBase event handlers. Subclasses should 
+ * implement the process() method where the actual handling of the event 
+ * happens.
+ * 
+ * HBaseEventType is a list of ALL events (which also corresponds to messages - 
+ * either internal to one component or between components). The event type 
+ * names specify the component from which the event originated, and the 
+ * component which is supposed to handle it.
+ * 
+ * Listeners can listen to all the events by implementing the interface 
+ * HBaseEventHandlerListener, and by registering themselves as a listener. They 
+ * will be called back before and after the process of every event.
+ * 
+ * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType 
+ * after ZK refactor as it currently would clash with EventType from ZK and 
+ * make the code very confusing.
+ */
+public abstract class HBaseEventHandler implements Runnable
+{
+  private static final Log LOG = LogFactory.getLog(HBaseEventHandler.class);
+  // type of event this object represents
+  protected HBaseEventType eventType = HBaseEventType.NONE;
+  // is this a region server or master?
+  protected boolean isRegionServer;
+  // name of the server - this is needed for naming executors in case of tests 
+  // where region servers may be co-located.
+  protected String serverName;
+  // listeners that are called before and after an event is processed
+  protected static List<HBaseEventHandlerListener> eventHandlerListeners = 
+    Collections.synchronizedList(new ArrayList<HBaseEventHandlerListener>());  
+  // static instances needed by the handlers
+  protected static ServerManager serverManager;
+  
+  /**
+   * Note that this has to be called first BEFORE the subclass constructors.
+   * 
+   * TODO: take out after refactor
+   */
+  public static void init(ServerManager serverManager) {
+    HBaseEventHandler.serverManager = serverManager;
+  }
+  
+  /**
+   * This interface provides hooks to listen to various events received by the 
+   * queue. A class implementing this can listen to the updates by calling 
+   * registerListener and stop receiving updates by calling unregisterListener
+   */
+  public interface HBaseEventHandlerListener {
+    /**
+     * Called before any event is processed
+     */
+    public void beforeProcess(HBaseEventHandler event);
+    /**
+     * Called after any event is processed
+     */
+    public void afterProcess(HBaseEventHandler event);
+  }
+
+  /**
+   * These are a list of HBase events that can be handled by the various
+   * HBaseExecutorService's. All the events are serialized as byte values.
+   */
+  public enum HBaseEventType {
+    NONE (-1),
+    // Messages originating from RS (NOTE: there is NO direct communication from 
+    // RS to Master). These are a result of RS updates into ZK.
+    RS2ZK_REGION_CLOSING      (1),   // RS is in process of closing a region
+    RS2ZK_REGION_CLOSED       (2),   // RS has finished closing a region
+    RS2ZK_REGION_OPENING      (3),   // RS is in process of opening a region
+    RS2ZK_REGION_OPENED       (4),   // RS has finished opening a region
+    
+    // Updates from master to ZK. This is done by the master and there is 
+    // nothing to process by either Master or RS
+    M2ZK_REGION_OFFLINE       (50);  // Master adds this region as offline in ZK
+    
+    private final byte value;
+    
+    /**
+     * Called by the HMaster. Returns a name of the executor service given an 
+     * event type. Every event type has en entry - if the event should not be 
+     * handled just add the NONE executor.
+     * @return name of the executor service
+     */
+    public HBaseExecutorServiceType getMasterExecutorForEvent() {
+      HBaseExecutorServiceType executorServiceType = null;
+      switch(this) {
+      
+      case RS2ZK_REGION_CLOSING:
+      case RS2ZK_REGION_CLOSED:
+        executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
+        break;
+
+      case RS2ZK_REGION_OPENING:
+      case RS2ZK_REGION_OPENED:
+        executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
+        break;
+        
+      case M2ZK_REGION_OFFLINE:
+        executorServiceType = HBaseExecutorServiceType.NONE;
+        break;
+        
+      default:
+        throw new RuntimeException("Unhandled event type in the master.");
+      }
+      
+      return executorServiceType;
+    }
+
+    /**
+     * Called by the RegionServer. Returns a name of the executor service given an 
+     * event type. Every event type has en entry - if the event should not be 
+     * handled just return a null executor name.
+     * @return name of the event service
+     */
+    public static String getRSExecutorForEvent(String serverName) {
+      throw new RuntimeException("Unsupported operation.");
+    }
+    
+    /**
+     * Start the executor service that handles the passed in event type. The 
+     * server that starts these event executor services wants to handle these 
+     * event types.
+     */
+    public void startMasterExecutorService(String serverName) {
+      HBaseExecutorServiceType serviceType = getMasterExecutorForEvent();
+      if(serviceType == HBaseExecutorServiceType.NONE) {
+        throw new RuntimeException("Event type " + toString() + " not handled on master.");
+      }
+      serviceType.startExecutorService(serverName);
+    }
+
+    public static void startRSExecutorService() {
+      
+    }
+
+    HBaseEventType(int intValue) {
+      this.value = (byte)intValue;
+    }
+    
+    public byte getByteValue() {
+      return value;
+    }
+
+    public static HBaseEventType fromByte(byte value) {
+      switch(value) {
+        case  -1: return HBaseEventType.NONE;
+        case  1 : return HBaseEventType.RS2ZK_REGION_CLOSING;
+        case  2 : return HBaseEventType.RS2ZK_REGION_CLOSED;
+        case  3 : return HBaseEventType.RS2ZK_REGION_OPENING;
+        case  4 : return HBaseEventType.RS2ZK_REGION_OPENED;
+        case  50: return HBaseEventType.M2ZK_REGION_OFFLINE;
+
+        default:
+          throw new RuntimeException("Invalid byte value for conversion to HBaseEventType");
+      }
+    }
+  }
+  
+  /**
+   * Default base class constructor.
+   * 
+   * TODO: isRegionServer and serverName will go away once we do the HMaster 
+   * refactor. We will end up passing a ServerStatus which should tell us both 
+   * the name and if it is a RS or master.
+   */
+  public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) {
+    this.isRegionServer = isRegionServer;
+    this.eventType = eventType;
+    this.serverName = serverName;
+  }
+  
+  /**
+   * This is a wrapper around process, used to update listeners before and after 
+   * events are processed. 
+   */
+  public void run() {
+    // fire all beforeProcess listeners
+    for(HBaseEventHandlerListener listener : eventHandlerListeners) {
+      listener.beforeProcess(this);
+    }
+    
+    // call the main process function
+    process();
+
+    // fire all afterProcess listeners
+    for(HBaseEventHandlerListener listener : eventHandlerListeners) {
+      LOG.debug("Firing " + listener.getClass().getName() + 
+                ".afterProcess event listener for event " + eventType);
+      listener.afterProcess(this);
+    }
+  }
+  
+  /**
+   * This method is the main processing loop to be implemented by the various 
+   * subclasses.
+   */
+  public abstract void process();
+  
+  /**
+   * Subscribe to updates before and after processing events
+   */
+  public static void registerListener(HBaseEventHandlerListener listener) {
+    eventHandlerListeners.add(listener);
+  }
+  
+  /**
+   * Stop receiving updates before and after processing events
+   */
+  public static void unregisterListener(HBaseEventHandlerListener listener) {
+    eventHandlerListeners.remove(listener);
+  }
+  
+  public boolean isRegionServer() {
+    return isRegionServer;
+  }
+
+  /**
+   * Return the name for this event type.
+   * @return
+   */
+  public HBaseExecutorServiceType getEventHandlerName() {
+    // TODO: check for isRegionServer here
+    return eventType.getMasterExecutorForEvent();
+  }
+  
+  /**
+   * Return the event type
+   * @return
+   */
+  public HBaseEventType getHBEvent() {
+    return eventType;
+  }
+
+  /**
+   * Submits this event object to the correct executor service. This is causes
+   * this object to get executed by the correct ExecutorService.
+   */
+  public void submit() {
+    HBaseExecutorServiceType serviceType = getEventHandlerName();
+    if(serviceType == null) {
+      throw new RuntimeException("Event " + eventType + " not handled on this server " + serverName);
+    }
+    serviceType.getExecutor(serverName).submit(this);
+  }
+  
+  /**
+   * Executes this event object in the caller's thread. This is a synchronous 
+   * way of executing the event.
+   */
+  public void execute() {
+    this.run();
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,171 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a generic HBase executor service. This component abstract a
+ * threadpool, a queue to which jobs can be submitted and a Runnable that
+ * handles the object that is added to the queue.
+ *
+ * In order to create a new HBExecutorService, you need to do:
+ *   HBExecutorService.startExecutorService("myService");
+ *
+ * In order to use the service created above, you need to override the
+ * HBEventHandler class and create an event type that submits to this service.
+ *
+ */
+public class HBaseExecutorService
+{
+  private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class);
+  // default number of threads in the pool
+  private int corePoolSize = 1;
+  // max number of threads - maximum concurrency
+  private int maximumPoolSize = 5;
+  // how long to retain excess threads
+  private long keepAliveTimeInMillis = 1000;
+  // the thread pool executor that services the requests
+  ThreadPoolExecutor threadPoolExecutor;
+  // work queue to use - unbounded queue
+  BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+  // name for this executor service
+  String name;
+  // hold the all the executors created in a map addressable by their names
+  static Map<String, HBaseExecutorService> executorServicesMap =
+    Collections.synchronizedMap(new HashMap<String, HBaseExecutorService>());
+
+  
+  /**
+   * The following is a list of names for the various executor services in both 
+   * the master and the region server.
+   */
+  public enum HBaseExecutorServiceType {
+    NONE                       (-1),
+    MASTER_CLOSEREGION         (1),
+    MASTER_OPENREGION          (2);
+    
+    private final int value;
+    
+    HBaseExecutorServiceType(int intValue) {
+      this.value = intValue;
+    }
+    
+    public void startExecutorService(String serverName) {
+      // if this is NONE then there is no executor to start
+      if(value == NONE.value) {
+        throw new RuntimeException("Cannot start NONE executor type.");
+      }
+      String name = getExecutorName(serverName);
+      if(HBaseExecutorService.isExecutorServiceRunning(name)) {
+        LOG.debug("Executor service " + toString() + " already running on " + serverName);
+        return;
+      }
+      HBaseExecutorService.startExecutorService(name);
+    }
+    
+    public HBaseExecutorService getExecutor(String serverName) {
+      // if this is NONE then there is no executor
+      if(value == NONE.value) {
+        return null;
+      }
+      return HBaseExecutorService.getExecutorService(getExecutorName(serverName));
+    }
+    
+    public String getExecutorName(String serverName) {
+      // if this is NONE then there is no executor
+      if(value == NONE.value) {
+        return null;
+      }
+      return (this.toString() + "-" + serverName);
+    }
+  }
+
+
+
+  /**
+   * Start an executor service with a given name. If there was a service already
+   * started with the same name, this throws a RuntimeException.
+   * @param name Name of the service to start.
+   */
+  public static void startExecutorService(String name) {
+    if(executorServicesMap.get(name) != null) {
+      throw new RuntimeException("An executor service with the name " + name + " is already running!");
+    }
+    HBaseExecutorService hbes = new HBaseExecutorService(name);
+    executorServicesMap.put(name, hbes);
+    LOG.debug("Starting executor service: " + name);
+  }
+  
+  public static boolean isExecutorServiceRunning(String name) {
+    return (executorServicesMap.containsKey(name));
+  }
+
+  /**
+   * This method is an accessor for all the HBExecutorServices running so far
+   * addressable by name. If there is no such service, then it returns null.
+   */
+  public static HBaseExecutorService getExecutorService(String name) {
+    HBaseExecutorService executor = executorServicesMap.get(name);
+    if(executor == null) {
+      LOG.debug("Executor service [" + name + "] not found.");
+    }
+    return executor;
+  }
+  
+  public static void shutdown() {
+    for(Entry<String, HBaseExecutorService> entry : executorServicesMap.entrySet()) {
+      entry.getValue().threadPoolExecutor.shutdown();
+    }
+    executorServicesMap.clear();
+  }
+
+  protected HBaseExecutorService(String name) {
+    this.name = name;
+    // create the thread pool executor
+    threadPoolExecutor = new ThreadPoolExecutor(
+                                corePoolSize,
+                                maximumPoolSize,
+                                keepAliveTimeInMillis,
+                                TimeUnit.MILLISECONDS,
+                                workQueue
+                                );
+    // name the threads for this threadpool
+    threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name));
+  }
+
+  /**
+   * Submit the event to the queue for handling.
+   * @param event
+   */
+  public void submit(Runnable event) {
+    threadPoolExecutor.execute(event);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Returns a named thread with a specified prefix.
+ *
+ * TODO: Use guava (com.google.common.util.concurrent.NamingThreadFactory)
+ */
+public class NamedThreadFactory implements ThreadFactory
+{
+  private String threadPrefix;
+  private AtomicInteger threadId = new AtomicInteger(0);
+
+  public NamedThreadFactory(String threadPrefix) {
+    this.threadPrefix = threadPrefix;
+  }
+
+  @Override
+  public Thread newThread(Runnable r) {
+    return new Thread(r, threadPrefix + "-" + threadId.incrementAndGet());
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.io.Writable;
+
+public class RegionTransitionEventData implements Writable {
+  private HBaseEventType hbEvent;
+  private String rsName;
+  private long timeStamp;
+  private HMsg hmsg;
+  
+  public RegionTransitionEventData() {
+  }
+
+  public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) {
+    this(hbEvent, rsName, null);
+  }
+
+  public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) {
+    this.hbEvent = hbEvent;
+    this.rsName = rsName;
+    this.timeStamp = System.currentTimeMillis();
+    this.hmsg = hmsg;
+  }
+  
+  public HBaseEventType getHbEvent() {
+    return hbEvent;
+  }
+
+  public String getRsName() {
+    return rsName;
+  }
+
+  public long getTimeStamp() {
+    return timeStamp;
+  }
+
+  public HMsg getHmsg() {
+    return hmsg;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // the event type byte
+    hbEvent = HBaseEventType.fromByte(in.readByte());
+    // the hostname of the RS sending the data
+    rsName = in.readUTF();
+    // the timestamp
+    timeStamp = in.readLong();
+    if(in.readBoolean()) {
+      // deserialized the HMsg from ZK
+      hmsg = new HMsg();
+      hmsg.readFields(in);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(hbEvent.getByteValue());
+    out.writeUTF(rsName);
+    out.writeLong(System.currentTimeMillis());
+    out.writeBoolean((hmsg != null));
+    if(hmsg != null) {
+      hmsg.write(out);
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sat Jun 12 01:17:45 2010
@@ -19,6 +19,23 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Constructor;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +65,9 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler;
+import org.apache.hadoop.hbase.executor.HBaseExecutorService;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
@@ -77,23 +97,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Constructor;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * HMaster is the "master server" for HBase. An HBase cluster has one active
  * master.  If many masters are started, all compete.  Whichever wins goes on to
@@ -198,14 +201,31 @@ public class HMaster extends Thread impl
     // We'll succeed if we are only  master or if we win the race when many
     // masters.  Otherwise we park here inside in writeAddressToZooKeeper.
     // TODO: Bring up the UI to redirect to active Master.
-    this.zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
+    zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
+    zooKeeperWrapper.registerListener(this);
     this.zkMasterAddressWatcher =
       new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
+    zooKeeperWrapper.registerListener(zkMasterAddressWatcher);
     this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
     this.regionServerOperationQueue =
       new RegionServerOperationQueue(this.conf, this.closed);
 
     serverManager = new ServerManager(this);
+
+    
+    // Start the unassigned watcher - which will create the unassgined region 
+    // in ZK. This is needed before RegionManager() constructor tries to assign 
+    // the root region.
+    ZKUnassignedWatcher.start();
+    // init the various event handlers
+    HBaseEventHandler.init(serverManager);
+    // start the "close region" executor service
+    HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(MASTER);
+    // start the "open region" executor service
+    HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(MASTER);
+
+    
+    // start the region manager
     regionManager = new RegionManager(this);
 
     setName(MASTER);
@@ -411,7 +431,7 @@ public class HMaster extends Thread impl
     return this.serverManager.getAverageLoad();
   }
 
-  RegionServerOperationQueue getRegionServerOperationQueue () {
+  public RegionServerOperationQueue getRegionServerOperationQueue () {
     return this.regionServerOperationQueue;
   }
 
@@ -491,6 +511,7 @@ public class HMaster extends Thread impl
     this.rpcServer.stop();
     this.regionManager.stop();
     this.zooKeeperWrapper.close();
+    HBaseExecutorService.shutdown();
     LOG.info("HMaster main thread exiting");
   }
 
@@ -1118,7 +1139,9 @@ public class HMaster extends Thread impl
    */
   @Override
   public void process(WatchedEvent event) {
-    LOG.debug(("Event " + event.getType() +  " with path " + event.getPath()));
+    LOG.debug("Event " + event.getType() + 
+              " with state " + event.getState() +  
+              " with path " + event.getPath());
     // Master should kill itself if its session expired or if its
     // znode was deleted manually (usually for testing purposes)
     if(event.getState() == KeeperState.Expired ||
@@ -1132,7 +1155,8 @@ public class HMaster extends Thread impl
 
       zooKeeperWrapper.close();
       try {
-        zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
+        zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
+        zooKeeperWrapper.registerListener(this);
         this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper);
         if(!this.zkMasterAddressWatcher.
             writeAddressToZooKeeper(this.address,false)) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java Sat Jun 12 01:17:45 2010
@@ -32,7 +32,7 @@ import java.io.IOException;
  * or deleted doesn't actually require post processing, it's no longer
  * necessary.
  */
-class ProcessRegionClose extends ProcessRegionStatusChange {
+public class ProcessRegionClose extends ProcessRegionStatusChange {
   protected final boolean offlineRegion;
   protected final boolean reassignRegion;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Sat Jun 12 01:17:45 2010
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 
 import java.io.IOException;
 
@@ -34,7 +35,7 @@ import java.io.IOException;
  * serving a region. This applies to all meta and user regions except the
  * root region which is handled specially.
  */
-class ProcessRegionOpen extends ProcessRegionStatusChange {
+public class ProcessRegionOpen extends ProcessRegionStatusChange {
   protected final HServerInfo serverInfo;
 
   /**
@@ -114,6 +115,8 @@ class ProcessRegionOpen extends ProcessR
       } else {
         master.getRegionManager().removeRegion(regionInfo);
       }
+      ZooKeeperWrapper zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
+      zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName());
       return true;
     }
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java Sat Jun 12 01:17:45 2010
@@ -78,4 +78,8 @@ abstract class ProcessRegionStatusChange
     }
     return this.metaRegion;
   }
+  
+  public HRegionInfo getRegionInfo() {
+    return regionInfo;
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Sat Jun 12 01:17:45 2010
@@ -32,6 +32,9 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -39,6 +42,8 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.WritableUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -94,6 +99,9 @@ public class RegionManager {
    */
    final SortedMap<String, RegionState> regionsInTransition =
     Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
+   
+   // regions in transition are also recorded in ZK using the zk wrapper
+   final ZooKeeperWrapper zkWrapper;
 
   // How many regions to assign a server at a time.
   private final int maxAssignInOneGo;
@@ -124,10 +132,11 @@ public class RegionManager {
   private final int zooKeeperNumRetries;
   private final int zooKeeperPause;
 
-  RegionManager(HMaster master) {
+  RegionManager(HMaster master) throws IOException {
     Configuration conf = master.getConfiguration();
 
     this.master = master;
+    this.zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
     this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
     this.loadBalancer = new LoadBalancer(conf);
 
@@ -165,10 +174,17 @@ public class RegionManager {
     unsetRootRegion();
     if (!master.getShutdownRequested().get()) {
       synchronized (regionsInTransition) {
-        RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO,
-            RegionState.State.UNASSIGNED);
-        regionsInTransition.put(
-            HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s);
+        String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString();
+        byte[] data = null;
+        try {
+          data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
+        } catch (IOException e) {
+          LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
+        }
+        zkWrapper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data);
+        LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+        RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED);
+        regionsInTransition.put(regionName, s);
         LOG.info("ROOT inserted into regionsInTransition");
       }
     }
@@ -330,6 +346,14 @@ public class RegionManager {
     LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName());
     rs.setPendingOpen(sinfo.getServerName());
     synchronized (this.regionsInTransition) {
+      byte[] data = null;
+      try {
+        data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
+      } catch (IOException e) {
+        LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
+      }
+      zkWrapper.createUnassignedRegion(rs.getRegionInfo().getEncodedName(), data);
+      LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
       this.regionsInTransition.put(regionName, rs);
     }
 
@@ -969,6 +993,16 @@ public class RegionManager {
     synchronized(this.regionsInTransition) {
       s = regionsInTransition.get(info.getRegionNameAsString());
       if (s == null) {
+        byte[] data = null;
+        try {
+          data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
+        } catch (IOException e) {
+          // TODO: Review what we should do here.  If Writables work this
+          //       should never happen
+          LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
+        }
+        zkWrapper.createUnassignedRegion(info.getEncodedName(), data);
+        LOG.debug("Created UNASSIGNED zNode " + info.getRegionNameAsString() + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
         s = new RegionState(info, RegionState.State.UNASSIGNED);
         regionsInTransition.put(info.getRegionNameAsString(), s);
       }
@@ -1213,8 +1247,9 @@ public class RegionManager {
    */
   public void setRootRegionLocation(HServerAddress address) {
     writeRootRegionLocationToZooKeeper(address);
-
     synchronized (rootRegionLocation) {
+      // the root region has been assigned, remove it from transition in ZK
+      zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName());
       rootRegionLocation.set(new HServerAddress(address));
       rootRegionLocation.notifyAll();
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Sat Jun 12 01:17:45 2010
@@ -564,7 +564,7 @@ public class ServerManager {
    * @param region
    * @param returnMsgs
    */
-  private void processRegionOpen(HServerInfo serverInfo,
+  public void processRegionOpen(HServerInfo serverInfo,
       HRegionInfo region, ArrayList<HMsg> returnMsgs) {
     boolean duplicateAssignment = false;
     synchronized (master.getRegionManager()) {
@@ -633,7 +633,7 @@ public class ServerManager {
    * @param region
    * @throws Exception
    */
-  private void processRegionClose(HRegionInfo region) {
+  public void processRegionClose(HRegionInfo region) {
     synchronized (this.master.getRegionManager()) {
       if (region.isRootRegion()) {
         // Root region

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java Sat Jun 12 01:17:45 2010
@@ -74,7 +74,7 @@ class ZKMasterAddressWatcher implements 
     } else if(type.equals(EventType.NodeCreated) &&
         event.getPath().equals(this.zookeeper.clusterStateZNode)) {
       LOG.debug("Resetting watch on cluster state node.");
-      this.zookeeper.setClusterStateWatch(this);
+      this.zookeeper.setClusterStateWatch();
     }
   }
 
@@ -87,7 +87,7 @@ class ZKMasterAddressWatcher implements 
       try {
         LOG.debug("Waiting for master address ZNode to be deleted " +
           "(Also watching cluster state node)");
-        this.zookeeper.setClusterStateWatch(this);
+        this.zookeeper.setClusterStateWatch();
         wait();
       } catch (InterruptedException e) {
       }
@@ -110,7 +110,7 @@ class ZKMasterAddressWatcher implements 
       }
       if(this.zookeeper.writeMasterAddress(address)) {
         this.zookeeper.setClusterState(true);
-        this.zookeeper.setClusterStateWatch(this);
+        this.zookeeper.setClusterStateWatch();
         // Watch our own node
         this.zookeeper.readMasterAddress(this);
         return true;

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,159 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
+import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+/**
+ * Watches the UNASSIGNED znode in ZK for the master, and handles all events 
+ * relating to region transitions.
+ */
+public class ZKUnassignedWatcher implements Watcher {
+  private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
+  
+  // TODO: Start move this to HConstants
+  static final String ROOT_TABLE_NAME_STR = "-ROOT-";
+  static final String META_TABLE_NAME_STR = ".META.";
+  // TODO: End move this to HConstants
+
+  private ZooKeeperWrapper zkWrapper = null;
+
+  public static void start() throws IOException {
+    new ZKUnassignedWatcher();
+    LOG.debug("Started ZKUnassigned watcher");
+  }
+
+  public ZKUnassignedWatcher() throws IOException {
+    zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
+    // If the UNASSIGNED ZNode does not exist, create it.
+    zkWrapper.createZNodeIfNotExists(zkWrapper.getRegionInTransitionZNode());
+    // TODO: get the outstanding changes in UNASSIGNED
+    
+    // Set a watch on Zookeeper's UNASSIGNED node if it exists.
+    zkWrapper.registerListener(this);
+  }
+
+  /**
+   * This is the processing loop that gets triggerred from the ZooKeeperWrapper.
+   * This zookeeper events process function dies the following:
+   *   - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged
+   *   - IGNORES the following events: None, NodeDeleted
+   */
+  @Override
+  public synchronized void process(WatchedEvent event) {
+    EventType type = event.getType();
+    LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type +
+              " state:" + event.getState() +
+              " path:" + event.getPath());
+
+    // Handle the ignored events
+    if(type.equals(EventType.None)       ||
+       type.equals(EventType.NodeDeleted)) {
+      return;
+    }
+
+    // check if the path is for the UNASSIGNED directory we care about
+    if(event.getPath() == null ||
+       !event.getPath().startsWith(zkWrapper.getZNodePathForHBase(zkWrapper.getRegionInTransitionZNode()))) {
+      return;
+    }
+
+    try
+    {
+      /*
+       * If a node is created in the UNASSIGNED directory in zookeeper, then:
+       *   1. watch its updates (this is an unassigned region).
+       *   2. read to see what its state is and handle as needed (state may have
+       *      changed before we started watching it)
+       */
+      if(type.equals(EventType.NodeCreated)) {
+        zkWrapper.watchZNode(event.getPath());
+        handleRegionStateInZK(event.getPath());
+      }
+      /*
+       * Data on some node has changed. Read to see what the state is and handle
+       * as needed.
+       */
+      else if(type.equals(EventType.NodeDataChanged)) {
+        handleRegionStateInZK(event.getPath());
+      }
+      /*
+       * If there were some nodes created then watch those nodes
+       */
+      else if(type.equals(EventType.NodeChildrenChanged)) {
+        List<ZNodePathAndData> newZNodes = zkWrapper.watchAndGetNewChildren(event.getPath());
+        for(ZNodePathAndData zNodePathAndData : newZNodes) {
+          LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
+          handleRegionStateInZK(zNodePathAndData.getzNodePath(), zNodePathAndData.getData());
+        }
+      }
+    }
+    catch (IOException e)
+    {
+      LOG.error("Could not process event from ZooKeeper", e);
+    }
+  }
+
+  /**
+   * Read the state of a node in ZK, and do the needful. We want to do the
+   * following:
+   *   1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler.
+   *   2. If region's state is updated as OPENED, invoke the OpenRegionHandler.
+   * @param zNodePath
+   * @throws IOException
+   */
+  private void handleRegionStateInZK(String zNodePath) throws IOException {
+    byte[] data = zkWrapper.readZNode(zNodePath, null);
+    handleRegionStateInZK(zNodePath, data);
+  }
+  
+  private void handleRegionStateInZK(String zNodePath, byte[] data) {
+    // a null value is set when a node is created, we don't need to handle this
+    if(data == null) {
+      return;
+    }
+    String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
+    String region = zNodePath.substring(zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
+    HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
+
+    // if the node was CLOSED then handle it
+    if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
+      new MasterCloseRegionHandler(rsEvent, region, data).submit();
+    }
+    // if the region was OPENED then handle that
+    else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED || 
+            rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) {
+      new MasterOpenRegionHandler(rsEvent, region, data).submit();
+    }
+  }
+}
+

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * This is the event handler for all events relating to closing regions on the
+ * HMaster. The following event types map to this handler:
+ *   - RS_REGION_CLOSING
+ *   - RS_REGION_CLOSED
+ */
+public class MasterCloseRegionHandler extends HBaseEventHandler
+{
+  private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class);
+  
+  private String regionName;
+  protected byte[] serializedData;
+  RegionTransitionEventData hbEventData;
+  
+  public MasterCloseRegionHandler(HBaseEventType eventType, String regionName, byte[] serializedData) {
+    super(false, HMaster.MASTER, eventType);
+    this.regionName = regionName;
+    this.serializedData = serializedData;
+  }
+
+  /**
+   * Handle the various events relating to closing regions. We can get the 
+   * following events here:
+   *   - RS_REGION_CLOSING : No-op
+   *   - RS_REGION_CLOSED  : The region is closed. If we are not in a shutdown 
+   *                         state, find the RS to open this region. This could 
+   *                         be a part of a region move, or just that the RS has 
+   *                         died. Should result in a M_REQUEST_OPENREGION event 
+   *                         getting created.
+   */
+  @Override
+  public void process()
+  {
+    LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
+    // handle RS_REGION_CLOSED events
+    handleRegionClosedEvent();
+  }
+  
+  private void handleRegionClosedEvent() {
+    try {
+      if(hbEventData == null) {
+        hbEventData = new RegionTransitionEventData();
+        Writables.getWritable(serializedData, hbEventData);
+      }
+    } catch (IOException e) {
+      LOG.error("Could not deserialize additional args for Close region", e);
+    }
+    // process the region close - this will cause the reopening of the 
+    // region as a part of the heartbeat of some RS
+    serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo());
+    LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString());
+  }
+  
+  public String getRegionName() {
+    return regionName;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * This is the event handler for all events relating to opening regions on the
+ * HMaster. This could be one of the following:
+ *   - notification that a region server is "OPENING" a region
+ *   - notification that a region server has "OPENED" a region
+ * The following event types map to this handler:
+ *   - RS_REGION_OPENING
+ *   - RS_REGION_OPENED
+ */
+public class MasterOpenRegionHandler extends HBaseEventHandler {
+  private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class);
+  // other args passed in a byte array form
+  protected byte[] serializedData;
+  private String regionName;
+  private RegionTransitionEventData hbEventData;
+
+  public MasterOpenRegionHandler(HBaseEventType eventType, String regionName, byte[] serData) {
+    super(false, HMaster.MASTER, eventType);
+    this.regionName = regionName;
+    this.serializedData = serData;
+  }
+
+  /**
+   * Handle the various events relating to opening regions. We can get the 
+   * following events here:
+   *   - RS_REGION_OPENING : Keep track to see how long the region open takes. 
+   *                         If the RS is taking too long, then revert the 
+   *                         region back to closed state so that it can be 
+   *                         re-assigned.
+   *   - RS_REGION_OPENED  : The region is opened. Add an entry into META for  
+   *                         the RS having opened this region. Then delete this 
+   *                         entry in ZK.
+   */
+  @Override
+  public void process()
+  {
+    LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
+    if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENING) {
+      handleRegionOpeningEvent();
+    }
+    else if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENED) {
+      handleRegionOpenedEvent();
+    }
+  }
+  
+  private void handleRegionOpeningEvent() {
+    // TODO: not implemented. 
+    LOG.debug("NO-OP call to handling region opening event");
+    // Keep track to see how long the region open takes. If the RS is taking too 
+    // long, then revert the region back to closed state so that it can be 
+    // re-assigned.
+  }
+
+  private void handleRegionOpenedEvent() {
+    try {
+      if(hbEventData == null) {
+        hbEventData = new RegionTransitionEventData();
+        Writables.getWritable(serializedData, hbEventData);
+      }
+    } catch (IOException e) {
+      LOG.error("Could not deserialize additional args for Open region", e);
+    }
+    LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName);
+    HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName());
+    ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
+    serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs);
+    if(returnMsgs.size() > 0) {
+      LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + 
+                " about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString());
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Jun 12 01:17:45 2010
@@ -319,7 +319,8 @@ public class HRegionServer implements HR
   }
 
   private void reinitializeZooKeeper() throws IOException {
-    zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
+    zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName());
+    zooKeeperWrapper.registerListener(this);
     watchMasterAddress();
   }
 
@@ -1217,14 +1218,7 @@ public class HRegionServer implements HR
         if (LOG.isDebugEnabled())
           LOG.debug("sending initial server load: " + hsl);
         lastMsg = System.currentTimeMillis();
-        boolean startCodeOk = false;
-        while(!startCodeOk) {
-          this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo);
-          startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo);
-          if(!startCodeOk) {
-           LOG.debug("Start code already taken, trying another one");
-          }
-        }
+        zooKeeperWrapper.writeRSLocation(this.serverInfo);
         result = this.hbaseMaster.regionServerStartup(this.serverInfo);
         break;
       } catch (IOException e) {
@@ -1419,8 +1413,11 @@ public class HRegionServer implements HR
   void openRegion(final HRegionInfo regionInfo) {
     Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
     HRegion region = this.onlineRegions.get(mapKey);
+    RSZookeeperUpdater zkUpdater = 
+      new RSZookeeperUpdater(serverInfo.getServerName(), regionInfo.getEncodedName());
     if (region == null) {
       try {
+        zkUpdater.startRegionOpenEvent(null, true);
         region = instantiateRegion(regionInfo);
         // Startup a compaction early if one is needed, if region has references
         // or if a store has too many store files
@@ -1435,7 +1432,15 @@ public class HRegionServer implements HR
         // TODO: add an extra field in HRegionInfo to indicate that there is
         // an error. We can't do that now because that would be an incompatible
         // change that would require a migration
-        reportClose(regionInfo, StringUtils.stringifyException(t).getBytes());
+        try {
+          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, 
+                               regionInfo, 
+                               StringUtils.stringifyException(t).getBytes());
+          zkUpdater.abortOpenRegion(hmsg);
+        } catch (IOException e1) {
+          // TODO: Can we recover? Should be throw RTE?
+          LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1);
+        }
         return;
       }
       this.lock.writeLock().lock();
@@ -1446,7 +1451,12 @@ public class HRegionServer implements HR
         this.lock.writeLock().unlock();
       }
     }
-    reportOpen(regionInfo);
+    try {
+      HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
+      zkUpdater.finishRegionOpenEvent(hmsg);
+    } catch (IOException e) {
+      LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e);
+    }
   }
 
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
@@ -1475,11 +1485,19 @@ public class HRegionServer implements HR
 
   protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
   throws IOException {
+    RSZookeeperUpdater zkUpdater = null;
+    if(reportWhenCompleted) {
+      zkUpdater = new RSZookeeperUpdater(serverInfo.getServerName(), hri.getEncodedName());
+      zkUpdater.startRegionCloseEvent(null, false);
+    }
     HRegion region = this.removeFromOnlineRegions(hri);
     if (region != null) {
       region.close();
       if(reportWhenCompleted) {
-        reportClose(hri);
+        if(zkUpdater != null) {
+          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null);
+          zkUpdater.finishRegionCloseEvent(hmsg);
+        }
       }
     }
   }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java?rev=953920&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java Sat Jun 12 01:17:45 2010
@@ -0,0 +1,160 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This is a helper class for region servers to update various states in 
+ * Zookeeper. The various updates are abstracted out here. 
+ * 
+ * The "startRegionXXX" methods are to be called first, followed by the 
+ * "finishRegionXXX" methods. Supports updating zookeeper periodically as a 
+ * part of the "startRegionXXX". Currently handles the following state updates:
+ *   - Close region
+ *   - Open region
+ */
+// TODO: make this thread local, in which case it is re-usable per thread
+public class RSZookeeperUpdater {
+  private static final Log LOG = LogFactory.getLog(RSZookeeperUpdater.class);
+  private final String regionServerName;
+  private String regionName = null;
+  private String regionZNode = null;
+  private ZooKeeperWrapper zkWrapper = null;
+  private int zkVersion = 0;
+  HBaseEventType lastUpdatedState;
+
+  public RSZookeeperUpdater(String regionServerName, String regionName) {
+    this(regionServerName, regionName, 0);
+  }
+  
+  public RSZookeeperUpdater(String regionServerName, String regionName, int zkVersion) {
+    this.zkWrapper = ZooKeeperWrapper.getInstance(regionServerName);
+    this.regionServerName = regionServerName;
+    this.regionName = regionName;
+    // get the region ZNode we have to create
+    this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName);
+    this.zkVersion = zkVersion;
+  }
+  
+  /**
+   * This method updates the various states in ZK to inform the master that the 
+   * region server has started closing the region.
+   * @param updatePeriodically - if true, periodically updates the state in ZK
+   */
+  public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
+    // if this ZNode already exists, something is wrong
+    if(zkWrapper.exists(regionZNode, true)) {
+      String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    
+    // create the region node in the unassigned directory first
+    zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
+
+    // update the data for "regionName" ZNode in unassigned to CLOSING
+    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg);
+    
+    // TODO: implement the updatePeriodically logic here
+  }
+
+  /**
+   * This method updates the states in ZK to signal that the region has been 
+   * closed. This will stop the periodic updater thread if one was started.
+   * @throws IOException
+   */
+  public void finishRegionCloseEvent(HMsg hmsg) throws IOException {    
+    // TODO: stop the updatePeriodically here
+
+    // update the data for "regionName" ZNode in unassigned to CLOSED
+    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
+  }
+  
+  /**
+   * This method updates the various states in ZK to inform the master that the 
+   * region server has started opening the region.
+   * @param updatePeriodically - if true, periodically updates the state in ZK
+   */
+  public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
+    Stat stat = new Stat();
+    byte[] data = zkWrapper.readZNode(regionZNode, stat);
+    // if there is no ZNode for this region, something is wrong
+    if(data == null) {
+      String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    // if the ZNode is not in the closed state, something is wrong
+    HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
+    if(rsEvent != HBaseEventType.RS2ZK_REGION_CLOSED && rsEvent != HBaseEventType.M2ZK_REGION_OFFLINE) {
+      String msg = "ZNode " + regionZNode + " is not in CLOSED/OFFLINE state (state = " + rsEvent + "), will NOT open region.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    // get the version to update from ZK
+    zkVersion = stat.getVersion();
+
+    // update the data for "regionName" ZNode in unassigned to CLOSING
+    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENING, hmsg);
+    
+    // TODO: implement the updatePeriodically logic here
+  }
+  
+  /**
+   * This method updates the states in ZK to signal that the region has been 
+   * opened. This will stop the periodic updater thread if one was started.
+   * @throws IOException
+   */
+  public void finishRegionOpenEvent(HMsg hmsg) throws IOException {
+    // TODO: stop the updatePeriodically here
+
+    // update the data for "regionName" ZNode in unassigned to CLOSED
+    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENED, hmsg);
+  }
+  
+  public boolean isClosingRegion() {
+    return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_CLOSING);
+  }
+
+  public boolean isOpeningRegion() {
+    return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_OPENING);
+  }
+
+  public void abortOpenRegion(HMsg hmsg) throws IOException {
+    LOG.error("Aborting open of region " + regionName);
+
+    // TODO: stop the updatePeriodically for start open region here
+
+    // update the data for "regionName" ZNode in unassigned to CLOSED
+    updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
+  }
+
+  private void updateZKWithEventData(HBaseEventType hbEventType, HMsg hmsg) throws IOException {
+    // update the data for "regionName" ZNode in unassigned to "hbEventType"
+    byte[] data = null;
+    try {
+      data = Writables.getBytes(new RegionTransitionEventData(hbEventType, regionServerName, hmsg));
+    } catch (IOException e) {
+      LOG.error("Error creating event data for " + hbEventType, e);
+    }
+    LOG.debug("Updating ZNode " + regionZNode + 
+              " with [" + hbEventType + "]" +
+              " expected version = " + zkVersion);
+    lastUpdatedState = hbEventType;
+    zkWrapper.writeZNode(regionZNode, data, zkVersion, true);
+    zkVersion++;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java?rev=953920&r1=953919&r2=953920&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java Sat Jun 12 01:17:45 2010
@@ -220,6 +220,68 @@ public class HQuorumPeer {
 
     return zkProperties;
   }
+  
+  /**
+   * Return the ZK Quorum servers string given zk properties returned by 
+   * makeZKProps
+   * @param properties
+   * @return
+   */
+  public static String getZKQuorumServersString(Properties properties) {
+    String clientPort = null;
+    List<String> servers = new ArrayList<String>();
+
+    // The clientPort option may come after the server.X hosts, so we need to
+    // grab everything and then create the final host:port comma separated list.
+    boolean anyValid = false;
+    for (Entry<Object,Object> property : properties.entrySet()) {
+      String key = property.getKey().toString().trim();
+      String value = property.getValue().toString().trim();
+      if (key.equals("clientPort")) {
+        clientPort = value;
+      }
+      else if (key.startsWith("server.")) {
+        String host = value.substring(0, value.indexOf(':'));
+        servers.add(host);
+        try {
+          //noinspection ResultOfMethodCallIgnored
+          InetAddress.getByName(host);
+          anyValid = true;
+        } catch (UnknownHostException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+    }
+
+    if (!anyValid) {
+      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (clientPort == null) {
+      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (servers.isEmpty()) {
+      LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
+                "ZooKeeper cluster configured for its operation.");
+      return null;
+    }
+
+    StringBuilder hostPortBuilder = new StringBuilder();
+    for (int i = 0; i < servers.size(); ++i) {
+      String host = servers.get(i);
+      if (i > 0) {
+        hostPortBuilder.append(',');
+      }
+      hostPortBuilder.append(host);
+      hostPortBuilder.append(':');
+      hostPortBuilder.append(clientPort);
+    }
+
+    return hostPortBuilder.toString();
+  }
 
   /**
    * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.



Mime
View raw message