hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jg...@apache.org
Subject svn commit: r961028 [1/2] - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver...
Date Tue, 06 Jul 2010 21:56:28 GMT
Author: jgray
Date: Tue Jul  6 21:56:27 2010
New Revision: 961028

URL: http://svn.apache.org/viewvc?rev=961028&view=rev
Log:
HBASE-2696  [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup and refactor

Added:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
Modified:
    hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
    hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java

Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Tue Jul  6 21:56:27 2010
@@ -9,6 +9,8 @@ Branch 0.90.0 - Master Rewrite Branch
                 (Karthik R via jgray)
     HBASE-2695  [MasterStatus-part2.1] HMaster clean and refactor
                 (Karthik R via jgray)
+    HBASE-2696  [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup
+                and refactor
 
   NEW FEATURES
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java Tue Jul  6 21:56:27 2010
@@ -104,7 +104,7 @@ public class MiniZooKeeperCluster {
         standaloneServerFactory =
           new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
       } catch (BindException e) {
-        LOG.info("Faild binding ZK Server to client port: " + clientPort);
+        LOG.info("Failed binding ZK Server to client port: " + clientPort);
         //this port is already in use. try to use another
         clientPort++;
         continue;
@@ -118,7 +118,7 @@ public class MiniZooKeeperCluster {
     }
 
     started = true;
-
+    LOG.info("Started MiniZK Server on client port: " + clientPort);
     return clientPort;
   }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java Tue Jul  6 21:56:27 2010
@@ -19,9 +19,8 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
  * Set of functions that are exposed by any HBase server (implemented by the 
@@ -37,4 +36,14 @@ public interface ServerStatus {
    * Get the configuration object for this server.
    */
   public Configuration getConfiguration();
+  
+  /**
+   * Get the ZooKeeper instance for this server.
+   */
+  public ZooKeeperWatcher getZooKeeper();
+  
+  /**
+   * Stub method into ServerStatus to move forward with ZK cleanup.
+   */
+  public void abortServer();
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Jul  6 21:56:27 2010
@@ -19,6 +19,24 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,30 +60,13 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.MetaUtils;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.CopyOnWriteArraySet;
-
 /**
  * A non-instantiable class that manages connections to multiple tables in
  * multiple HBase instances.
@@ -147,6 +148,7 @@ public class HConnectionManager {
   /**
    * Delete information for all connections.
    * @param stopProxy stop the proxy as well
+   * @throws IOException 
    */
   public static void deleteAllConnections(boolean stopProxy) {
     synchronized (HBASE_INSTANCES) {
@@ -228,8 +230,8 @@ public class HConnectionManager {
      */
     public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
       if(zooKeeperWrapper == null) {
-        zooKeeperWrapper =
-            ZooKeeperWrapper.createInstance(conf, HConnectionManager.class.getName());
+        zooKeeperWrapper = new ZooKeeperWatcher(conf, 
+            HConnectionManager.class.getName(), null);
         zooKeeperWrapper.registerListener(this);
       }
       return zooKeeperWrapper;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Jul  6 21:56:27 2010
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
@@ -133,8 +134,8 @@ public class HMaster extends Thread impl
   // Metrics is set when we call run.
   private final MasterMetrics metrics;
 
-  // Our zk client.
-  private ZooKeeperWrapper zooKeeperWrapper;
+  // Our zk client. TODO: rename variable once we settle on naming
+  private ZooKeeperWatcher zooKeeperWrapper;
   // Watcher for master address and for cluster shutdown.
   private final ZKMasterAddressWatcher zkMasterAddressWatcher;
   // A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo.
@@ -182,7 +183,8 @@ public class HMaster extends Thread impl
     // number of RS ephemeral nodes. RS ephemeral nodes are created only after 
     // the primary master has written the address to ZK. So this has to be done 
     // before we race to write our address to zookeeper.
-    zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, getHServerAddress().toString());
+    zooKeeperWrapper = 
+      new ZooKeeperWatcher(conf, getHServerAddress().toString(), this);
     isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0);
     
     // Create the filesystem manager, which in turn does the following:
@@ -919,8 +921,9 @@ public class HMaster extends Thread impl
 
       zooKeeperWrapper.close();
       try {
+        // TODO: this is broken, we should just shutdown now not restart
         zooKeeperWrapper =
-            ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
+          new ZooKeeperWatcher(conf, HMaster.class.getName(), this);
         zooKeeperWrapper.registerListener(this);
         this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper);
         if(!this.zkMasterAddressWatcher.
@@ -1102,4 +1105,14 @@ public class HMaster extends Thread impl
   public static void main(String [] args) {
     doMain(args, HMaster.class);
   }
+
+  @Override
+  public void abortServer() {
+    this.startShutdown();
+  }
+
+  @Override
+  public ZooKeeperWatcher getZooKeeper() {
+    return zooKeeperWrapper;
+  }
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Tue Jul  6 21:56:27 2010
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
@@ -26,9 +28,7 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-import java.io.IOException;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
  * ProcessRegionOpen is instantiated when a region server reports that it is
@@ -115,10 +115,8 @@ public class ProcessRegionOpen extends P
       } else {
         masterStatus.getRegionManager().removeRegion(regionInfo);
       }
-      ZooKeeperWrapper zkWrapper =
-          ZooKeeperWrapper.getInstance(masterStatus.getConfiguration(),
-              masterStatus.getHServerAddress().toString());
-      zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName());
+      masterStatus.getZooKeeper().deleteUnassignedRegion(
+          regionInfo.getEncodedName());
       return true;
     }
   }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Jul  6 21:56:27 2010
@@ -140,8 +140,7 @@ public class RegionManager {
       masterStatus.getConfiguration().getInt(
           HConstants.THREAD_WAKE_FREQUENCY, 
           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
-    this.zkWrapper =
-        ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString());
+    this.zkWrapper = masterStatus.getZooKeeper();
     this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
     this.loadBalancer = new LoadBalancer(conf);
 
@@ -640,11 +639,12 @@ public class RegionManager {
     } catch(Exception iex) {
       LOG.warn("meta scanner", iex);
     }
-    ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
-                             masterStatus.getConfiguration(), 
-                             masterStatus.getHServerAddress().toString());
-    zkw.clearRSDirectory();
-    zkw.close();
+    // TODO: Why did we getInstance again?  We should have it local?
+//    ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
+//                             masterStatus.getConfiguration(), 
+//                             masterStatus.getHServerAddress().toString());
+    zkWrapper.clearRSDirectory();
+    zkWrapper.close();
   }
 
   /**
@@ -1233,10 +1233,7 @@ public class RegionManager {
 
   private void writeRootRegionLocationToZooKeeper(HServerAddress address) {
     for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
-      ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
-                               masterStatus.getConfiguration(), 
-                               masterStatus.getHServerAddress().toString());
-      if (zkw.writeRootRegionLocation(address)) {
+      if (zkWrapper.writeRootRegionLocation(address)) {
         return;
       }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Jul  6 21:56:27 2010
@@ -246,10 +246,7 @@ public class ServerManager {
     // We must set this watcher here because it can be set on a fresh start
     // or on a failover
     Watcher watcher = new ServerExpirer(new HServerInfo(info));
-    ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
-        masterStatus.getConfiguration(), 
-        masterStatus.getHServerAddress().toString());
-    zkw.updateRSLocationGetWatch(info, watcher);
+    masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
     this.serversToServerInfo.put(serverName, info);
     this.serversToLoad.put(serverName, load);
     synchronized (this.loadToServers) {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Tue Jul  6 21:56:27 2010
@@ -56,7 +56,7 @@ public class ZKUnassignedWatcher impleme
   throws IOException {
     this.serverName = masterStatus.getHServerAddress().toString();
     this.serverManager = masterStatus.getServerManager();
-    zkWrapper = ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString());
+    zkWrapper = masterStatus.getZooKeeper();
     String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
     
     // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul  6 21:56:27 2010
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerStatus;
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
@@ -101,23 +102,20 @@ import org.apache.hadoop.hbase.util.Info
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 /**
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 public class HRegionServer implements HRegionInterface,
-    HBaseRPCErrorHandler, Runnable, Watcher, Stoppable {
+    HBaseRPCErrorHandler, Runnable, Stoppable, ServerStatus {
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
   private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
@@ -219,7 +217,11 @@ public class HRegionServer implements HR
   final Map<String, InternalScanner> scanners =
     new ConcurrentHashMap<String, InternalScanner>();
 
-  private ZooKeeperWrapper zooKeeperWrapper;
+  // zookeeper connection and watcher
+  private ZooKeeperWatcher zooKeeper;
+  
+  // master address manager and watecher
+  private MasterAddressManager masterAddressManager;
 
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
@@ -284,7 +286,7 @@ public class HRegionServer implements HR
       conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
           HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
 
-    reinitialize();
+    initialize();
   }
 
   /**
@@ -293,7 +295,7 @@ public class HRegionServer implements HR
    * Both call it.
    * @throws IOException
    */
-  private void reinitialize() throws IOException {
+  private void initialize() throws IOException {
     this.abortRequested = false;
     this.stopRequested.set(false);
 
@@ -312,22 +314,25 @@ public class HRegionServer implements HR
       throw new NullPointerException("Server address cannot be null; " +
         "hbase-958 debugging");
     }
-    reinitializeThreads();
-    reinitializeZooKeeper();
+    initializeThreads();
+    initializeZooKeeper();
     int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
     for(int i = 0; i < nbBlocks; i++)  {
       reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
     }
   }
 
-  private void reinitializeZooKeeper() throws IOException {
-    zooKeeperWrapper =
-        ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName());
-    zooKeeperWrapper.registerListener(this);
-    watchMasterAddress();
+  private void initializeZooKeeper() throws IOException {
+    // open connection to zookeeper and set primary watcher
+    zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this);
+    
+    // create the master address manager, register with zk, and start it
+    masterAddressManager = new MasterAddressManager(zooKeeper);
+    zooKeeper.registerListener(masterAddressManager);
+    masterAddressManager.monitorMaster();
   }
 
-  private void reinitializeThreads() {
+  private void initializeThreads() {
     this.workerThread = new Thread(worker);
 
     // Cache flushing thread.
@@ -353,74 +358,6 @@ public class HRegionServer implements HR
   }
 
   /**
-   * We register ourselves as a watcher on the master address ZNode. This is
-   * called by ZooKeeper when we get an event on that ZNode. When this method
-   * is called it means either our master has died, or a new one has come up.
-   * Either way we need to update our knowledge of the master.
-   * @param event WatchedEvent from ZooKeeper.
-   */
-  public void process(WatchedEvent event) {
-    EventType type = event.getType();
-    KeeperState state = event.getState();
-    LOG.info("Got ZooKeeper event, state: " + state + ", type: " +
-      type + ", path: " + event.getPath());
-
-    // Ignore events if we're shutting down.
-    if (this.stopRequested.get()) {
-      LOG.debug("Ignoring ZooKeeper event while shutting down");
-      return;
-    }
-
-    if (state == KeeperState.Expired) {
-      LOG.error("ZooKeeper session expired");
-      boolean restart =
-        this.conf.getBoolean("hbase.regionserver.restart.on.zk.expire", false);
-      if (restart) {
-        restart();
-      } else {
-        abort("ZooKeeper session expired");
-      }
-    } else if (type == EventType.NodeDeleted) {
-      watchMasterAddress();
-    } else if (type == EventType.NodeCreated) {
-      getMaster();
-
-      // ZooKeeper watches are one time only, so we need to re-register our watch.
-      watchMasterAddress();
-    }
-  }
-
-  private void watchMasterAddress() {
-    while (!stopRequested.get() && !zooKeeperWrapper.watchMasterAddress(this)) {
-      LOG.warn("Unable to set watcher on ZooKeeper master address. Retrying.");
-      sleeper.sleep();
-    }
-  }
-
-  private void restart() {
-    abort("Restarting region server");
-    Threads.shutdown(regionServerThread);
-    boolean done = false;
-    while (!done) {
-      try {
-        reinitialize();
-        done = true;
-      } catch (IOException e) {
-        LOG.debug("Error trying to reinitialize ZooKeeper", e);
-      }
-    }
-    Thread t = new Thread(this);
-    String name = regionServerThread.getName();
-    t.setName(name);
-    t.start();
-  }
-
-  /** @return ZooKeeperWrapper used by RegionServer. */
-  public ZooKeeperWrapper getZooKeeperWrapper() {
-    return zooKeeperWrapper;
-  }
-
-  /**
    * The HRegionServer sticks in this loop until closed. It repeatedly checks
    * in with the HMaster, sending heartbeats & reports, and receiving HRegion
    * load/unload instructions.
@@ -446,7 +383,8 @@ public class HRegionServer implements HR
       for (int tries = 0; !stopRequested.get() && isHealthy();) {
         // Try to get the root region location from the master.
         if (!haveRootRegion.get()) {
-          HServerAddress rootServer = zooKeeperWrapper.readRootRegionLocation();
+          HServerAddress rootServer = 
+            ZKUtil.getDataAsAddress(zooKeeper, zooKeeper.rootServerZNode);
           if (rootServer != null) {
             // By setting the root region location, we bypass the wait imposed on
             // HTable for all regions being assigned.
@@ -646,8 +584,9 @@ public class HRegionServer implements HR
       this.hbaseMaster = null;
     }
 
+    this.zooKeeper.close();
+    
     if (!killed) {
-      this.zooKeeperWrapper.close();
       join();
     }
     LOG.info(Thread.currentThread().getName() + " exiting");
@@ -1174,21 +1113,23 @@ public class HRegionServer implements HR
     Threads.shutdown(this.hlogRoller);
   }
 
+  /**
+   * Get the current master from ZooKeeper and open the RPC connection to it.
+   * 
+   * Method will block until a master is available.  You can break from this
+   * block by requesting the server stop.
+   * 
+   * @return
+   */
   private boolean getMaster() {
     HServerAddress masterAddress = null;
-    while (masterAddress == null) {
-      if (stopRequested.get()) {
+    while((masterAddress = masterAddressManager.getMasterAddress()) == null) {
+      if(stopRequested.get()) {
         return false;
       }
-      try {
-        masterAddress = zooKeeperWrapper.readMasterAddressOrThrow();
-      } catch (IOException e) {
-        LOG.warn("Unable to read master address from ZooKeeper. Retrying." +
-                 " Error was:", e);
-        sleeper.sleep();
-      }
+      LOG.debug("No master found, will retry");
+      sleeper.sleep();
     }
-
     LOG.info("Telling master at " + masterAddress + " that we are up");
     HMasterRegionInterface master = null;
     while (!stopRequested.get() && master == null) {
@@ -1230,7 +1171,7 @@ public class HRegionServer implements HR
         if (LOG.isDebugEnabled())
           LOG.debug("sending initial server load: " + hsl);
         lastMsg = System.currentTimeMillis();
-        zooKeeperWrapper.writeRSLocation(this.serverInfo);
+        zooKeeper.writeRSLocation(this.serverInfo);
         result = this.hbaseMaster.regionServerStartup(this.serverInfo);
         break;
       } catch (IOException e) {
@@ -1406,7 +1347,7 @@ public class HRegionServer implements HR
     Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
     HRegion region = this.onlineRegions.get(mapKey);
     RSZookeeperUpdater zkUpdater = 
-      new RSZookeeperUpdater(conf, serverInfo.getServerName(),
+      new RSZookeeperUpdater(zooKeeper, serverInfo.getServerName(),
           regionInfo.getEncodedName());
     if (region == null) {
       try {
@@ -1491,7 +1432,7 @@ public class HRegionServer implements HR
   throws IOException {
     RSZookeeperUpdater zkUpdater = null;
     if(reportWhenCompleted) {
-      zkUpdater = new RSZookeeperUpdater(conf,
+      zkUpdater = new RSZookeeperUpdater(zooKeeper,
           serverInfo.getServerName(), hri.getEncodedName());
       zkUpdater.startRegionCloseEvent(null, false);
     }
@@ -2380,6 +2321,23 @@ public class HRegionServer implements HR
     return threadWakeFrequency;
   }
 
+  // ServerStatus
+  
+  @Override
+  public void abortServer() {
+    this.abort("Received abortServer call");
+  }
+
+  @Override
+  public HServerAddress getHServerAddress() {
+    return this.address;
+  }
+
+  @Override
+  public ZooKeeperWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+  
   //
   // Main program and support routines
   //

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java Tue Jul  6 21:56:27 2010
@@ -0,0 +1,185 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Manages the location of the current active Master for this RegionServer.
+ * 
+ * Listens for ZooKeeper events related to the master address. The node /master 
+ * will contain the address of the current master. This listener is interested 
+ * in NodeDeleted and NodeCreated events on /master.
+ * 
+ * This class is thread-safe and takes care of re-setting all watchers to
+ * ensure it always knows the up-to-date master.  To kick it off, instantiate
+ * the class and run the {@link #monitorMaster()} method.
+ * 
+ * You can get the current master via {@link #getMasterAddress()} or the
+ * blocking method {@link #waitMasterAddress()}.
+ */
+public class MasterAddressManager extends ZooKeeperListener {
+  private static final Log LOG = LogFactory.getLog(MasterAddressManager.class);
+
+  // Address of the current primary master, null if no primary master
+  private HServerAddress masterAddress;
+  
+  /**
+   * Construct a master address listener with the specified zookeeper reference.
+   * 
+   * This constructor does not trigger any actions, you must call methods
+   * explicitly.  Normally you will just want to execute {@link #monitorMaster()}
+   * and you will ensure to 
+   * 
+   * @param watcher zk reference and watcher
+   */
+  public MasterAddressManager(ZooKeeperWatcher watcher) {
+    super(watcher);
+    masterAddress = null;
+  }
+  
+  /**
+   * Get the address of the current master if one is available.  Returns null
+   * if no current master.
+   * 
+   * Use {@link #waitMasterAddress} if you want to block until the master is
+   * available.
+   * @return server address of current active master, or null if none available
+   */
+  public synchronized HServerAddress getMasterAddress() {
+    return masterAddress;
+  }
+  
+  /**
+   * Check if there is a master available.
+   * @return true if there is a master set, false if not.
+   */
+  public synchronized boolean hasMaster() {
+    return masterAddress != null;
+  }
+  
+  /**
+   * Get the address of the current master.  If no master is available, method
+   * will block until one is available, the thread is interrupted, or timeout
+   * has passed.
+   * 
+   * TODO: Make this work, currently unused, kept with existing retry semantics.
+   *       
+   * @return server address of current active master, null if timed out
+   * @throws InterruptedException if the thread is interrupted while waiting
+   */
+  public synchronized HServerAddress waitForMaster()
+  throws InterruptedException {
+    return masterAddress;
+  }
+  
+  /**
+   * Setup to watch for the primary master of the cluster.
+   * 
+   * If the master is already available in ZooKeeper, this method will ensure
+   * it gets set and that any further changes are also watched for.
+   * 
+   * If no master is available, this method ensures we become aware of it and
+   * will take care of setting it.
+   */
+  public void monitorMaster() {
+    if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
+      handleNewMaster();
+    }
+  }
+  
+  @Override
+  public void nodeCreated(String path) {
+    LOG.info("nodeCreated(" + path + ")");
+    if(path.equals(watcher.masterAddressZNode)) {
+      handleNewMaster();
+    }
+    monitorMaster();
+  }
+  
+  @Override
+  public void nodeDeleted(String path) {
+    if(path.equals(watcher.masterAddressZNode)) {
+      handleDeadMaster();
+    }
+    monitorMaster();
+  }
+  
+  /**
+   * Set the master address to the specified address.  This operation is
+   * idempotent, a master will only be set if there is currently no master set.
+   */
+  private synchronized void setMasterAddress(HServerAddress address) {
+    if(masterAddress == null) {
+      LOG.info("Found and set master address: " + address);
+      masterAddress = address;
+    }
+  }
+  
+  /**
+   * Unsets the master address.  Used when the master goes offline so none is
+   * available.
+   */
+  private synchronized void unsetMasterAddress() {
+    if(masterAddress != null) {
+      LOG.info("Master has been unset.  There is no current master available");
+      masterAddress = null;
+    }
+  }
+  
+  /**
+   * Handle a new master being set.
+   * 
+   * This method should be called to check if there is a new master.  If there
+   * is already a master set, this method returns immediately.  If none is set,
+   * this will attempt to grab the master location from ZooKeeper and will set
+   * it.
+   * 
+   * This method uses an atomic operation to ensure a new master is only set 
+   * once.
+   */
+  private void handleNewMaster() {
+    if(hasMaster()) {
+      return;
+    }
+    HServerAddress address = 
+      ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode);
+    if(address != null) {
+      setMasterAddress(address);
+    }
+  }
+  
+  /**
+   * Handle a master failure.
+   * 
+   * Triggered when a master node is deleted.
+   * 
+   * TODO: Other ways we figure master is "dead"?  What do we do if set in ZK
+   *       but we can't communicate with TCP?
+   */
+  private void handleDeadMaster() {
+    unsetMasterAddress();
+  }
+}

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java Tue Jul  6 21:56:27 2010
@@ -1,19 +1,14 @@
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 
@@ -33,22 +28,22 @@ public class RSZookeeperUpdater {
   private final String regionServerName;
   private String regionName = null;
   private String regionZNode = null;
-  private ZooKeeperWrapper zkWrapper = null;
+  private ZooKeeperWatcher zooKeeper = null;
   private int zkVersion = 0;
   HBaseEventType lastUpdatedState;
 
-  public RSZookeeperUpdater(Configuration conf,
-                            String regionServerName, String regionName) {
-    this(conf, regionServerName, regionName, 0);
+  public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName,
+      String regionName) {
+    this(zooKeeper, regionServerName, regionName, 0);
   }
   
-  public RSZookeeperUpdater(Configuration conf, String regionServerName,
-                            String regionName, int zkVersion) {
-    this.zkWrapper = ZooKeeperWrapper.getInstance(conf, regionServerName);
+  public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName,
+      String regionName, int zkVersion) {
+    this.zooKeeper = zooKeeper;
     this.regionServerName = regionServerName;
     this.regionName = regionName;
     // get the region ZNode we have to create
-    this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName);
+    this.regionZNode = zooKeeper.getZNode(zooKeeper.assignmentZNode, regionName);
     this.zkVersion = zkVersion;
   }
   
@@ -59,14 +54,14 @@ public class RSZookeeperUpdater {
    */
   public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
     // if this ZNode already exists, something is wrong
-    if(zkWrapper.exists(regionZNode, true)) {
+    if(zooKeeper.exists(regionZNode, true)) {
       String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
       LOG.error(msg);
       throw new IOException(msg);
     }
     
     // create the region node in the unassigned directory first
-    zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
+    zooKeeper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
 
     // update the data for "regionName" ZNode in unassigned to CLOSING
     updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg);
@@ -93,7 +88,7 @@ public class RSZookeeperUpdater {
    */
   public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
     Stat stat = new Stat();
-    byte[] data = zkWrapper.readZNode(regionZNode, stat);
+    byte[] data = zooKeeper.readZNode(regionZNode, stat);
     // if there is no ZNode for this region, something is wrong
     if(data == null) {
       String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region.";
@@ -158,7 +153,7 @@ public class RSZookeeperUpdater {
               " with [" + hbEventType + "]" +
               " expected version = " + zkVersion);
     lastUpdatedState = hbEventType;
-    zkWrapper.writeZNode(regionZNode, data, zkVersion, true);
+    zooKeeper.writeZNode(regionZNode, data, zkVersion, true);
     zkVersion++;
   }
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java Tue Jul  6 21:56:27 2010
@@ -19,21 +19,8 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
@@ -41,29 +28,27 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 
 /**
  * HBase's version of ZooKeeper's QuorumPeer. When HBase is set to manage
  * ZooKeeper, this class is used to start up QuorumPeer instances. By doing
  * things in here rather than directly calling to ZooKeeper, we have more
- * control over the process. Currently, this class allows us to parse the
+ * control over the process. This class uses {@link ZKConfig} to parse the
  * zoo.cfg and inject variables from HBase's site.xml configuration in.
  */
 public class HQuorumPeer {
-  private static final Log LOG = LogFactory.getLog(HQuorumPeer.class);
-
-  private static final String VARIABLE_START = "${";
-  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
-  private static final String VARIABLE_END = "}";
-  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
-
-  private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property.";
-  private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length();
-  private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY
-      + "clientPort";
-
+  
   /**
    * Parse ZooKeeper configuration from HBase XML config and run a QuorumPeer.
    * @param args String[] of command line arguments. Not used.
@@ -71,7 +56,7 @@ public class HQuorumPeer {
   public static void main(String[] args) {
     Configuration conf = HBaseConfiguration.create();
     try {
-      Properties zkProperties = makeZKProps(conf);
+      Properties zkProperties = ZKConfig.makeZKProps(conf);
       writeMyID(zkProperties);
       QuorumPeerConfig zkConfig = new QuorumPeerConfig();
       zkConfig.parseProperties(zkProperties);
@@ -158,195 +143,4 @@ public class HQuorumPeer {
     w.println(myId);
     w.close();
   }
-
-  /**
-   * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg.
-   * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse
-   * the corresponding config options from the HBase XML configs and generate
-   * the appropriate ZooKeeper properties.
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper zoo.cfg file.
-   */
-  public static Properties makeZKProps(Configuration conf) {
-    // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
-    // it and grab its configuration properties.
-    ClassLoader cl = HQuorumPeer.class.getClassLoader();
-    final InputStream inputStream =
-      cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
-    if (inputStream != null) {
-      try {
-        return parseZooCfg(conf, inputStream);
-      } catch (IOException e) {
-        LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
-                 ", loading from XML files", e);
-      }
-    }
-
-    // Otherwise, use the configuration options from HBase's XML files.
-    Properties zkProperties = new Properties();
-
-    // Directly map all of the hbase.zookeeper.property.KEY properties.
-    for (Entry<String, String> entry : conf) {
-      String key = entry.getKey();
-      if (key.startsWith(ZK_CFG_PROPERTY)) {
-        String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE);
-        String value = entry.getValue();
-        // If the value has variables substitutions, need to do a get.
-        if (value.contains(VARIABLE_START)) {
-          value = conf.get(key);
-        }
-        zkProperties.put(zkKey, value);
-      }
-    }
-
-    // If clientPort is not set, assign the default
-    if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) {
-      zkProperties.put(ZK_CLIENT_PORT_KEY,
-                       HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-    }
-
-    // Create the server.X properties.
-    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
-    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
-    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
-                                                 "localhost");
-    for (int i = 0; i < serverHosts.length; ++i) {
-      String serverHost = serverHosts[i];
-      String address = serverHost + ":" + peerPort + ":" + leaderPort;
-      String key = "server." + i;
-      zkProperties.put(key, address);
-    }
-
-    return zkProperties;
-  }
-  
-  /**
-   * Return the ZK Quorum servers string given zk properties returned by 
-   * makeZKProps
-   * @param properties
-   * @return
-   */
-  public static String getZKQuorumServersString(Properties properties) {
-    String clientPort = null;
-    List<String> servers = new ArrayList<String>();
-
-    // The clientPort option may come after the server.X hosts, so we need to
-    // grab everything and then create the final host:port comma separated list.
-    boolean anyValid = false;
-    for (Entry<Object,Object> property : properties.entrySet()) {
-      String key = property.getKey().toString().trim();
-      String value = property.getValue().toString().trim();
-      if (key.equals("clientPort")) {
-        clientPort = value;
-      }
-      else if (key.startsWith("server.")) {
-        String host = value.substring(0, value.indexOf(':'));
-        servers.add(host);
-        try {
-          //noinspection ResultOfMethodCallIgnored
-          InetAddress.getByName(host);
-          anyValid = true;
-        } catch (UnknownHostException e) {
-          LOG.warn(StringUtils.stringifyException(e));
-        }
-      }
-    }
-
-    if (!anyValid) {
-      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (clientPort == null) {
-      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (servers.isEmpty()) {
-      LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
-                "ZooKeeper cluster configured for its operation.");
-      return null;
-    }
-
-    StringBuilder hostPortBuilder = new StringBuilder();
-    for (int i = 0; i < servers.size(); ++i) {
-      String host = servers.get(i);
-      if (i > 0) {
-        hostPortBuilder.append(',');
-      }
-      hostPortBuilder.append(host);
-      hostPortBuilder.append(':');
-      hostPortBuilder.append(clientPort);
-    }
-
-    return hostPortBuilder.toString();
-  }
-
-  /**
-   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
-   * This method is used for testing so we can pass our own InputStream.
-   * @param conf HBaseConfiguration to use for injecting variables.
-   * @param inputStream InputStream to read from.
-   * @return Properties parsed from config stream with variables substituted.
-   * @throws IOException if anything goes wrong parsing config
-   */
-  public static Properties parseZooCfg(Configuration conf,
-      InputStream inputStream) throws IOException {
-    Properties properties = new Properties();
-    try {
-      properties.load(inputStream);
-    } catch (IOException e) {
-      final String msg = "fail to read properties from "
-        + HConstants.ZOOKEEPER_CONFIG_NAME;
-      LOG.fatal(msg);
-      throw new IOException(msg, e);
-    }
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      String value = entry.getValue().toString().trim();
-      String key = entry.getKey().toString().trim();
-      StringBuilder newValue = new StringBuilder();
-      int varStart = value.indexOf(VARIABLE_START);
-      int varEnd = 0;
-      while (varStart != -1) {
-        varEnd = value.indexOf(VARIABLE_END, varStart);
-        if (varEnd == -1) {
-          String msg = "variable at " + varStart + " has no end marker";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
-
-        String substituteValue = System.getProperty(variable);
-        if (substituteValue == null) {
-          substituteValue = conf.get(variable);
-        }
-        if (substituteValue == null) {
-          String msg = "variable " + variable + " not set in system property "
-                     + "or hbase configs";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-
-        newValue.append(substituteValue);
-
-        varEnd += VARIABLE_END_LENGTH;
-        varStart = value.indexOf(VARIABLE_START, varEnd);
-      }
-      // Special case for 'hbase.cluster.distributed' property being 'true'
-      if (key.startsWith("server.")) {
-        if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED)
-            && value.startsWith("localhost")) {
-          String msg = "The server in zoo.cfg cannot be set to localhost " +
-              "in a fully-distributed setup because it won't be reachable. " +
-              "See \"Getting Started\" for more information.";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-      }
-      newValue.append(value.substring(varEnd));
-      properties.setProperty(key, newValue.toString());
-    }
-    return properties;
-  }
 }

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java Tue Jul  6 21:56:27 2010
@@ -0,0 +1,252 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Utility methods for reading, parsing, and building zookeeper configuration.
+ */
+public class ZKConfig {
+  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+  
+  private static final String VARIABLE_START = "${";
+  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+  private static final String VARIABLE_END = "}";
+  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+  private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property.";
+  private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length();
+  private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY
+      + "clientPort";
+  
+  /**
+   * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg.
+   * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse
+   * the corresponding config options from the HBase XML configs and generate
+   * the appropriate ZooKeeper properties.
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper zoo.cfg file.
+   */
+  public static Properties makeZKProps(Configuration conf) {
+    // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+    // it and grab its configuration properties.
+    ClassLoader cl = HQuorumPeer.class.getClassLoader();
+    final InputStream inputStream =
+      cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+    if (inputStream != null) {
+      try {
+        return parseZooCfg(conf, inputStream);
+      } catch (IOException e) {
+        LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+                 ", loading from XML files", e);
+      }
+    }
+
+    // Otherwise, use the configuration options from HBase's XML files.
+    Properties zkProperties = new Properties();
+
+    // Directly map all of the hbase.zookeeper.property.KEY properties.
+    for (Entry<String, String> entry : conf) {
+      String key = entry.getKey();
+      if (key.startsWith(ZK_CFG_PROPERTY)) {
+        String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE);
+        String value = entry.getValue();
+        // If the value has variables substitutions, need to do a get.
+        if (value.contains(VARIABLE_START)) {
+          value = conf.get(key);
+        }
+        zkProperties.put(zkKey, value);
+      }
+    }
+
+    // If clientPort is not set, assign the default
+    if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) {
+      zkProperties.put(ZK_CLIENT_PORT_KEY,
+                       HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    }
+
+    // Create the server.X properties.
+    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+                                                 "localhost");
+    for (int i = 0; i < serverHosts.length; ++i) {
+      String serverHost = serverHosts[i];
+      String address = serverHost + ":" + peerPort + ":" + leaderPort;
+      String key = "server." + i;
+      zkProperties.put(key, address);
+    }
+
+    return zkProperties;
+  }
+
+  /**
+   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+   * This method is used for testing so we can pass our own InputStream.
+   * @param conf HBaseConfiguration to use for injecting variables.
+   * @param inputStream InputStream to read from.
+   * @return Properties parsed from config stream with variables substituted.
+   * @throws IOException if anything goes wrong parsing config
+   */
+  public static Properties parseZooCfg(Configuration conf,
+      InputStream inputStream) throws IOException {
+    Properties properties = new Properties();
+    try {
+      properties.load(inputStream);
+    } catch (IOException e) {
+      final String msg = "fail to read properties from "
+        + HConstants.ZOOKEEPER_CONFIG_NAME;
+      LOG.fatal(msg);
+      throw new IOException(msg, e);
+    }
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String value = entry.getValue().toString().trim();
+      String key = entry.getKey().toString().trim();
+      StringBuilder newValue = new StringBuilder();
+      int varStart = value.indexOf(VARIABLE_START);
+      int varEnd = 0;
+      while (varStart != -1) {
+        varEnd = value.indexOf(VARIABLE_END, varStart);
+        if (varEnd == -1) {
+          String msg = "variable at " + varStart + " has no end marker";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+        String substituteValue = System.getProperty(variable);
+        if (substituteValue == null) {
+          substituteValue = conf.get(variable);
+        }
+        if (substituteValue == null) {
+          String msg = "variable " + variable + " not set in system property "
+                     + "or hbase configs";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+
+        newValue.append(substituteValue);
+
+        varEnd += VARIABLE_END_LENGTH;
+        varStart = value.indexOf(VARIABLE_START, varEnd);
+      }
+      // Special case for 'hbase.cluster.distributed' property being 'true'
+      if (key.startsWith("server.")) {
+        if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED)
+            && value.startsWith("localhost")) {
+          String msg = "The server in zoo.cfg cannot be set to localhost " +
+              "in a fully-distributed setup because it won't be reachable. " +
+              "See \"Getting Started\" for more information.";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+      }
+      newValue.append(value.substring(varEnd));
+      properties.setProperty(key, newValue.toString());
+    }
+    return properties;
+  }
+  
+  /**
+   * Return the ZK Quorum servers string given zk properties returned by 
+   * makeZKProps
+   * @param properties
+   * @return
+   */
+  public static String getZKQuorumServersString(Properties properties) {
+    String clientPort = null;
+    List<String> servers = new ArrayList<String>();
+
+    // The clientPort option may come after the server.X hosts, so we need to
+    // grab everything and then create the final host:port comma separated list.
+    boolean anyValid = false;
+    for (Entry<Object,Object> property : properties.entrySet()) {
+      String key = property.getKey().toString().trim();
+      String value = property.getValue().toString().trim();
+      if (key.equals("clientPort")) {
+        clientPort = value;
+      }
+      else if (key.startsWith("server.")) {
+        String host = value.substring(0, value.indexOf(':'));
+        servers.add(host);
+        try {
+          //noinspection ResultOfMethodCallIgnored
+          InetAddress.getByName(host);
+          anyValid = true;
+        } catch (UnknownHostException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+    }
+
+    if (!anyValid) {
+      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (clientPort == null) {
+      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (servers.isEmpty()) {
+      LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
+                "ZooKeeper cluster configured for its operation.");
+      return null;
+    }
+
+    StringBuilder hostPortBuilder = new StringBuilder();
+    for (int i = 0; i < servers.size(); ++i) {
+      String host = servers.get(i);
+      if (i > 0) {
+        hostPortBuilder.append(',');
+      }
+      hostPortBuilder.append(host);
+      hostPortBuilder.append(':');
+      hostPortBuilder.append(clientPort);
+    }
+
+    return hostPortBuilder.toString();
+  }
+  
+  /**
+   * Return the ZK Quorum servers string given the specified configuration.
+   * @param properties
+   * @return
+   */
+  public static String getZKQuorumServersString(Configuration conf) {
+    return getZKQuorumServersString(makeZKProps(conf));
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java Tue Jul  6 21:56:27 2010
@@ -20,12 +20,11 @@
 
 package org.apache.hadoop.hbase.zookeeper;
 
+import java.util.Properties;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.util.Map.Entry;
-import java.util.Properties;
 
 /**
  * Tool for reading ZooKeeper servers from HBase XML configuation and producing
@@ -41,7 +40,7 @@ public class ZKServerTool {
     // Note that we do not simply grab the property
     // HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the
     // user may be using a zoo.cfg file.
-    Properties zkProps = HQuorumPeer.makeZKProps(conf);
+    Properties zkProps = ZKConfig.makeZKProps(conf);
     for (Entry<Object, Object> entry : zkProps.entrySet()) {
       String key = entry.getKey().toString().trim();
       String value = entry.getValue().toString().trim();

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue Jul  6 21:56:27 2010
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Internal HBase utility class for ZooKeeper.
+ * 
+ * Contains only static methods and constants.
+ */
+public class ZKUtil {
+  private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+  // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
+  private static final char ZNODE_PATH_SEPARATOR = '/';
+  
+  /**
+   * Creates a new connection to ZooKeeper, pulling settings and quorum config
+   * from the specified configuration object using methods from {@link ZKConfig}.
+   * 
+   * Sets the connection status monitoring watcher to the specified watcher.
+   * 
+   * @param conf configuration to pull quorum and other settings from
+   * @param watcher watcher to monitor connection changes
+   * @return connection to zookeeper
+   * @throws IOException if unable to connect to zk or config problem
+   */
+  public static ZooKeeper connect(Configuration conf, Watcher watcher)
+  throws IOException {
+    Properties properties = ZKConfig.makeZKProps(conf);
+    String quorum = ZKConfig.getZKQuorumServersString(properties);
+    if(quorum == null) {
+      throw new IOException("Unable to determine ZooKeeper quorum");
+    }
+    int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
+    LOG.debug("Opening connection to ZooKeeper with quorum (" + quorum + ")");
+    return new ZooKeeper(quorum, timeout, watcher);
+  }
+
+  /**
+   * Join the prefix znode name with the suffix znode name to generate a proper
+   * full znode name.
+   * 
+   * Assumes prefix does not end with slash and suffix does not begin with it.
+   * 
+   * @param prefix beginning of znode name
+   * @param suffix ending of znode name
+   * @return result of properly joining prefix with suffix
+   */
+  public static String joinZNode(String prefix, String suffix) {
+    return prefix + ZNODE_PATH_SEPARATOR + suffix;
+  }
+
+  /**
+   * Watch the specified znode for delete/create/change events.  The watcher is
+   * set whether or not the node exists.  If the node already exists, the method
+   * returns true.  If the node does not exist, the method returns false.
+   * 
+   * @param zkw zk reference
+   * @param znode path of node to watch
+   * @return true if znode exists, false if does not exist or error
+   */
+  public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) {
+    try {
+      Stat s = zkw.getZooKeeper().exists(znode, zkw);
+      zkw.debug("Set watcher on existing znode (" + znode + ")");
+      return s != null ? true : false;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.keeperException(e);
+      return false;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.interruptedException(e);
+      return false;
+    }
+  }
+  
+  /**
+   * Get the data at the specified znode and set a watch.
+   * 
+   * Returns the data and sets a watch if the node exists.  Returns null and no
+   * watch is set if the node does not exist or there is an exception.
+   * 
+   * @param zkw zk reference
+   * @param znode path of node
+   * @return data of the specified znode, or null
+   */
+  public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) {
+    try {
+      byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode (" + 
+          znode + ") and set a watcher");
+      return data;
+    } catch (KeeperException.NoNodeException e) {
+      zkw.debug("Unable to get data of znode (" + znode + ") " +
+          "because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to get data of znode (" + znode + ")", e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+  
+  /**
+   * Get the data at the specified znode, deserialize it as an HServerAddress,
+   * and set a watch.
+   * 
+   * Returns the data as a server address and sets a watch if the node exists.
+   * Returns null and no watch is set if the node does not exist or there is an
+   * exception.
+   * 
+   * @param zkw zk reference
+   * @param znode path of node
+   * @return data of the specified node as a server address, or null
+   */
+  public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw, 
+      String znode) {
+    byte [] data = getDataAndWatch(zkw, znode);
+    if(data == null) {
+      return null;
+    }
+    String addrString = Bytes.toString(data);
+    zkw.debug("Read server address from znode (" + znode + "): " + addrString);
+    return new HServerAddress(addrString);
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java Tue Jul  6 21:56:27 2010
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+
+/**
+ * Base class for internal listeners of ZooKeeper events.
+ * 
+ * The {@link ZooKeeperWatcher} for a process will execute the appropriate
+ * methods of implementations of this class.  In order to receive events from
+ * the watcher, every listener must register itself via {@link ZooKeeperWatcher#registerListener}.
+ * 
+ * Subclasses need only override those methods in which they are interested.
+ * 
+ * Note that the watcher will be blocked when invoking methods in listeners so
+ * they must not be long-running.
+ */
+public class ZooKeeperListener {
+
+  // Reference to the zk watcher which also contains configuration and constants
+  protected ZooKeeperWatcher watcher;
+  
+  /**
+   * Construct a ZooKeeper event listener.
+   * TODO: This should take ServerStatus which will contain ZKWatcher ref?
+   */
+  public ZooKeeperListener(ZooKeeperWatcher watcher) {
+    this.watcher = watcher;
+  }
+
+  /**
+   * Called when a new node has been created.
+   * @param path full path of the new node
+   */
+  public void nodeCreated(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when a node has been deleted
+   * @param path full path of the deleted node
+   */
+  public void nodeDeleted(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has changed data.
+   * @param path full path of the updated node
+   */
+  public void nodeDataChanged(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has a child node added or removed.
+   * @param path full path of the node whose children have changed
+   */
+  public void nodeChildrenChanged(String path) {
+    // no-op
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue Jul  6 21:56:27 2010
@@ -0,0 +1,313 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerStatus;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
+ * for each Master, RegionServer, and client process.
+ * 
+ * This is the only class that implements {@link Watcher}.  Other internal
+ * classes which need to be notified of ZooKeeper events must register with
+ * the local instance of this watcher via {@link #registerListener}.
+ * 
+ * This class also holds and manages the connection to ZooKeeper.  Code to deal
+ * with connection related events and exceptions are handled here.
+ */
+public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher {
+  private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
+  
+  // name of this watcher (for logging only)
+  private String name;
+  
+  // zookeeper connection
+  private ZooKeeper zooKeeper;
+  
+  // server controller
+  private ServerStatus server;
+  
+  // listeners to be notified
+  private final Set<ZooKeeperListener> listeners =
+    new CopyOnWriteArraySet<ZooKeeperListener>();
+
+  // node names
+  
+  // base znode for this cluster
+  public String baseZNode;
+  // znode containing location of server hosting root region
+  public String rootServerZNode;
+  // znode containing ephemeral nodes of the regionservers
+  public String rsZNode;
+  // znode of currently active master
+  public String masterAddressZNode;
+  // znode containing the current cluster state
+  public String clusterStateZNode;
+  // znode used for region transitioning and assignment
+  public String assignmentZNode;
+  
+  /**
+   * Instantiate a ZooKeeper connection and watcher.
+   * @param name name of this watcher, for logging/debug purposes only
+   * @throws IOException 
+   */
+  public ZooKeeperWatcher(Configuration conf, String name, ServerStatus server)
+  throws IOException {
+    super(conf, name);
+    this.name = name;
+    this.zooKeeper = ZKUtil.connect(conf, this);
+    this.server = server;
+    info("Connected to ZooKeeper");
+    setNodeNames(conf);
+  }
+  
+  /**
+   * Set the local variable node names using the specified configuration.
+   */
+  private void setNodeNames(Configuration conf) {
+    baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, 
+        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+    rootServerZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.rootserver", "root-region-server"));
+    rsZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.rs", "rs"));
+    masterAddressZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.master", "master"));
+    clusterStateZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.state", "shutdown"));
+    assignmentZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.regionInTransition", "unassigned"));
+  }
+  
+  /**
+   * Register the specified listener to receive ZooKeeper events.
+   * @param listener
+   */
+  public void registerListener(ZooKeeperListener listener) {
+    listeners.add(listener);
+  }
+  
+  /**
+   * Get the connection to ZooKeeper.
+   * @return connection reference to zookeeper
+   */
+  public ZooKeeper getZooKeeper() {
+    return zooKeeper;
+  }
+  
+  /**
+   * Method called from ZooKeeper for events and connection status.
+   * 
+   * Valid events are passed along to listeners.  Connection status changes
+   * are dealt with locally.
+   */
+  @Override
+  public void process(WatchedEvent event) {
+    LOG.debug("<" + name + "> Received ZooKeeper Event, " +
+        "type: " + event.getType() + ", " +
+        "state:" + event.getState() + ", " +
+        "path: " + event.getPath());
+    
+    // While we are still using both ZKWs, need to call parent process()
+    super.process(event);
+    
+    switch(event.getType()) {
+
+      // If event type is NONE, this is a connection status change
+      case None: {
+        connectionEvent(event);
+        break;
+      }
+      
+      // Otherwise pass along to the listeners
+      
+      case NodeCreated: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeCreated(event.getPath());
+        }
+        break;
+      }
+      
+      case NodeDeleted: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeDeleted(event.getPath());
+        }
+        break;
+      }
+      
+      case NodeDataChanged: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeDataChanged(event.getPath());
+        }
+        break;
+      }
+      
+      case NodeChildrenChanged: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeChildrenChanged(event.getPath());
+        }
+        break;
+      }
+    }
+  }
+
+  // Connection management
+  
+  /**
+   * Called when there is a connection-related event via the Watcher callback.
+   * 
+   * If Disconnected or Expired, this should shutdown the cluster.
+   * 
+   * @param event
+   */
+  private void connectionEvent(WatchedEvent event) {
+    switch(event.getState()) {
+      // SyncConnected is normal, ignore
+      case SyncConnected:
+        break;
+        
+      // Abort the server if Disconnected or Expired
+      // TODO: Åny reason to handle these two differently?
+      case Disconnected:
+      case Expired:
+        error("Received Disconnected/Expired [" + event.getState() + "] " +
+              "from ZooKeeper, aborting server");
+        if(server != null) {
+          server.abortServer();
+        }
+        break;
+    }
+  }
+  
+  /**
+   * Handles KeeperExceptions in client calls.
+   * 
+   * This may be temporary but for now this gives one place to deal with these.
+   * 
+   * TODO: Currently this method aborts the server.
+   * 
+   * @param ke
+   */
+  public void keeperException(KeeperException ke) {
+    error("Received unexpected KeeperException, aborting server", ke);
+    server.abortServer();
+  }
+  
+  /**
+   * Handles InterruptedExceptions in client calls.
+   * 
+   * This may be temporary but for now this gives one place to deal with these.
+   * 
+   * TODO: Currently, this method does nothing.
+   *       Is this ever expected to happen?  Do we abort or can we let it run?
+   * 
+   * @param ie
+   */
+  public void interruptedException(InterruptedException ie) {
+    debug("Received InterruptedException, doing nothing here", ie);
+    // no-op
+  }
+
+  // Logging methods
+  
+  /**
+   * Exposed info logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void info(String string) {
+    LOG.info("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed debug logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void debug(String string) {
+    LOG.debug("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed debug logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void debug(String string, Throwable t) {
+    LOG.debug("<" + name + "> " + string, t);
+  }
+
+  /**
+   * Exposed warn logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void warn(String string) {
+    LOG.warn("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed warn logging method so our zookeeper output is named.
+   * @param string log line
+   * @param t exception
+   */
+  public void warn(String string, Throwable t) {
+    LOG.warn("<" + name + "> " + string, t);
+  }
+
+  /**
+   * Exposed error logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void error(String string) {
+    LOG.error("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed error logging method so our zookeeper output is named.
+   * @param string log line
+   * @param t exception
+   */
+  public void error(String string, Throwable t) {
+    LOG.error("<" + name + "> " + string, t);
+  }
+
+  /**
+   * Close the connection to ZooKeeper.
+   * @throws InterruptedException 
+   */
+  public void close() {
+    try {
+      if(zooKeeper != null) {
+        zooKeeper.close();
+        super.close();
+      }
+    } catch (InterruptedException e) {
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message