hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r991397 [12/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/o...
Date Tue, 31 Aug 2010 23:51:50 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,999 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Internal HBase utility class for ZooKeeper.
+ *
+ * <p>Contains only static methods and constants.
+ *
+ * <p>Methods all throw {@link KeeperException} if there is an unexpected
+ * zookeeper exception, so callers of these methods must handle appropriately.
+ * If ZK is required for the operation, the server will need to be aborted.
+ */
+public class ZKUtil {
+  private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+  // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
+  private static final char ZNODE_PATH_SEPARATOR = '/';
+
+  /**
+   * Creates a new connection to ZooKeeper, pulling settings and quorum config
+   * from the specified configuration object using methods from {@link ZKConfig}.
+   *
+   * Sets the connection status monitoring watcher to the specified watcher.
+   *
+   * @param conf configuration to pull quorum and other settings from
+   * @param watcher watcher to monitor connection changes
+   * @return connection to zookeeper
+   * @throws IOException if unable to connect to zk or config problem
+   */
+  public static ZooKeeper connect(Configuration conf, Watcher watcher)
+  throws IOException {
+    Properties properties = ZKConfig.makeZKProps(conf);
+    String quorum = ZKConfig.getZKQuorumServersString(properties);
+    return connect(conf, quorum, watcher);
+  }
+
+  public static ZooKeeper connect(Configuration conf, String quorum,
+      Watcher watcher)
+  throws IOException {
+    if(quorum == null) {
+      throw new IOException("Unable to determine ZooKeeper quorum");
+    }
+    int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
+    LOG.debug("Opening connection to ZooKeeper with quorum (" + quorum + ")");
+    return new ZooKeeper(quorum, timeout, watcher);
+  }
+
+  //
+  // Helper methods
+  //
+
+  /**
+   * Join the prefix znode name with the suffix znode name to generate a proper
+   * full znode name.
+   *
+   * Assumes prefix does not end with slash and suffix does not begin with it.
+   *
+   * @param prefix beginning of znode name
+   * @param suffix ending of znode name
+   * @return result of properly joining prefix with suffix
+   */
+  public static String joinZNode(String prefix, String suffix) {
+    return prefix + ZNODE_PATH_SEPARATOR + suffix;
+  }
+
+  /**
+   * Returns the full path of the immediate parent of the specified node.
+   * @param node path to get parent of
+   * @return parent of path, null if passed the root node or an invalid node
+   */
+  public static String getParent(String node) {
+    int idx = node.lastIndexOf(ZNODE_PATH_SEPARATOR);
+    return idx <= 0 ? null : node.substring(0, idx);
+  }
+
+  /**
+   * Get the unique node-name for the specified regionserver.
+   *
+   * Used when a server puts up an ephemeral node for itself and needs to use
+   * a unique name.
+   *
+   * @param serverInfo server information
+   * @return unique, zookeeper-safe znode path for the server instance
+   */
+  public static String getNodeName(HServerInfo serverInfo) {
+    return serverInfo.getServerName();
+  }
+
+  /**
+   * Get the name of the current node from the specified fully-qualified path.
+   * @param path fully-qualified path
+   * @return name of the current node
+   */
+  public static String getNodeName(String path) {
+    return path.substring(path.lastIndexOf("/")+1);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration without
+   * adding a name at the end
+   * @param conf Configuration to use to build the key
+   * @return ensemble key without a name
+   */
+  public static String getZooKeeperClusterKey(Configuration conf) {
+    return getZooKeeperClusterKey(conf, null);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration and append
+   * a name at the end
+   * @param conf Configuration to use to build the key
+   * @param name Name that should be appended at the end if not empty or null
+   * @return ensemble key with a name (if any)
+   */
+  public static String getZooKeeperClusterKey(Configuration conf, String name) {
+    String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
+        "[\\t\\n\\x0B\\f\\r]", ""));
+    StringBuilder builder = new StringBuilder(quorum);
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    if (name != null && !name.isEmpty()) {
+      builder.append(",");
+      builder.append(name);
+    }
+    return builder.toString();
+  }
+
+  //
+  // Existence checks and watches
+  //
+
+  /**
+   * Watch the specified znode for delete/create/change events.  The watcher is
+   * set whether or not the node exists.  If the node already exists, the method
+   * returns true.  If the node does not exist, the method returns false.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to watch
+   * @return true if znode exists, false if does not exist or error
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      Stat s = zkw.getZooKeeper().exists(znode, zkw);
+      zkw.debug("Set watcher on existing znode " + znode);
+      return s != null ? true : false;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to set watcher on znode " + znode, e);
+      zkw.keeperException(e);
+      return false;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to set watcher on znode " + znode, e);
+      zkw.interruptedException(e);
+      return false;
+    }
+  }
+
+  /**
+   * Check if the specified node exists.  Sets no watches.
+   *
+   * Returns true if node exists, false if not.  Returns an exception if there
+   * is an unexpected zookeeper exception.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to watch
+   * @return version of the node if it exists, -1 if does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int checkExists(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      Stat s = zkw.getZooKeeper().exists(znode, null);
+      return s != null ? s.getVersion() : -1;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.keeperException(e);
+      return -1;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+      zkw.interruptedException(e);
+      return -1;
+    }
+  }
+
+  //
+  // Znode listings
+  //
+
+  /**
+   * Lists the children znodes of the specified znode.  Also sets a watch on
+   * the specified znode which will capture a NodeDeleted event on the specified
+   * znode as well as NodeChildrenChanged if any children of the specified znode
+   * are created or deleted.
+   *
+   * Returns null if the specified node does not exist.  Otherwise returns a
+   * list of children of the specified node.  If the node exists but it has no
+   * children, an empty list will be returned.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to list and watch children of
+   * @returns list of children of the specified node, an empty list if the node
+   *          exists but has no children, and null if the node does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static List<String> listChildrenAndWatchForNewChildren(
+      ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
+      return children;
+    } catch(KeeperException.NoNodeException ke) {
+      zkw.debug("Unable to list children of znode " + znode + " " +
+          "because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to list children of znode " + znode + " ", e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to list children of znode " + znode + " ", e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
+   * Lists the children of the specified znode, retrieving the data of each
+   * child as a server address.
+   *
+   * Used to list the currently online regionservers and their addresses.
+   *
+   * Sets no watches at all, this method is best effort.
+   *
+   * Returns an empty list if the node has no children.  Returns null if the
+   * parent node itself does not exist.
+   *
+   * @param zkw zookeeper reference
+   * @param znode node to get children of as addresses
+   * @return list of data of children of specified znode, empty if no children,
+   *         null if parent does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static List<HServerAddress> listChildrenAndGetAsAddresses(
+      ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    List<String> children = listChildrenNoWatch(zkw, znode);
+    if(children == null) {
+      return null;
+    }
+    List<HServerAddress> addresses =
+      new ArrayList<HServerAddress>(children.size());
+    for(String child : children) {
+      addresses.add(getDataAsAddress(zkw, joinZNode(znode, child)));
+    }
+    return addresses;
+  }
+
+  /**
+   * Lists the children of the specified znode without setting any watches.
+   *
+   * Used to list the currently online regionservers and their addresses.
+   *
+   * Sets no watches at all, this method is best effort.
+   *
+   * Returns an empty list if the node has no children.  Returns null if the
+   * parent node itself does not exist.
+   *
+   * @param zkw zookeeper reference
+   * @param znode node to get children of as addresses
+   * @return list of data of children of specified znode, empty if no children,
+   *         null if parent does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static List<String> listChildrenNoWatch(
+      ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    List<String> children = null;
+    try {
+      // List the children without watching
+      children = zkw.getZooKeeper().getChildren(znode, null);
+    } catch(KeeperException.NoNodeException nne) {
+      return null;
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+    return children;
+  }
+
+  /**
+   * Atomically add watches and read data from all unwatched unassigned nodes.
+   *
+   * <p>This works because master is the only person deleting nodes.
+   */
+  public static List<NodeAndData> watchAndGetNewChildren(ZooKeeperWatcher zkw,
+      String baseNode)
+  throws KeeperException {
+    List<NodeAndData> newNodes = new ArrayList<NodeAndData>();
+    synchronized(zkw.getNodes()) {
+      List<String> nodes =
+        ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
+      for(String node : nodes) {
+        String nodePath = ZKUtil.joinZNode(baseNode, node);
+        if(!zkw.getNodes().contains(nodePath)) {
+          byte [] data = ZKUtil.getDataAndWatch(zkw, nodePath);
+          newNodes.add(new NodeAndData(nodePath, data));
+          zkw.getNodes().add(nodePath);
+        }
+      }
+    }
+    return newNodes;
+  }
+
+  /**
+   * Simple class to hold a node path and node data.
+   */
+  public static class NodeAndData {
+    private String node;
+    private byte [] data;
+    public NodeAndData(String node, byte [] data) {
+      this.node = node;
+      this.data = data;
+    }
+    public String getNode() {
+      return node;
+    }
+    public byte [] getData() {
+      return data;
+    }
+    @Override
+    public String toString() {
+      return node + " (" + RegionTransitionData.fromBytes(data) + ")";
+    }
+  }
+
+  /**
+   * Checks if the specified znode has any children.  Sets no watches.
+   *
+   * Returns true if the node exists and has children.  Returns false if the
+   * node does not exist or if the node does not have any children.
+   *
+   * Used during master initialization to determine if the master is a
+   * failed-over-to master or the first master during initial cluster startup.
+   * If the directory for regionserver ephemeral nodes is empty then this is
+   * a cluster startup, if not then it is not cluster startup.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to check for children of
+   * @return true if node has children, false if not or node does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
+    } catch(KeeperException.NoNodeException ke) {
+      zkw.debug("Unable to list children of znode " + znode + " " +
+      "because node does not exist (not an error)");
+      return false;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to list children of znode " + znode, e);
+      zkw.keeperException(e);
+      return false;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to list children of znode " + znode, e);
+      zkw.interruptedException(e);
+      return false;
+    }
+  }
+
+  /**
+   * Get the number of children of the specified node.
+   *
+   * If the node does not exist or has no children, returns 0.
+   *
+   * Sets no watches at all.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to count children of
+   * @return number of children of specified node, 0 if none or parent does not
+   *         exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      Stat stat = zkw.getZooKeeper().exists(znode, null);
+      return stat == null ? 0 : stat.getNumChildren();
+    } catch(KeeperException e) {
+      zkw.warn("Unable to get children of node " + znode);
+      zkw.keeperException(e);
+    } catch(InterruptedException e) {
+      zkw.interruptedException(e);
+    }
+    return 0;
+  }
+
+  //
+  // Data retrieval
+  //
+
+  /**
+   * Get znode data. Does not set a watcher.
+   * @return ZNode data
+   */
+  public static byte [] getData(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      byte [] data = zkw.getZooKeeper().getData(znode, null, null);
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode);
+      return data;
+    } catch (KeeperException.NoNodeException e) {
+      zkw.debug("Unable to get data of znode " + znode + " " +
+          "because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the data at the specified znode and set a watch.
+   *
+   * Returns the data and sets a watch if the node exists.  Returns null and no
+   * watch is set if the node does not exist or there is an exception.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @return data of the specified znode, or null
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    try {
+      byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " +
+          znode + " and set a watcher");
+      return data;
+    } catch (KeeperException.NoNodeException e) {
+      zkw.debug("Unable to get data of znode " + znode + " " +
+          "because node does not exist (not an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the data at the specified znode without setting a watch.
+   *
+   * Returns the data if the node exists.  Returns null if the node does not
+   * exist.
+   *
+   * Sets the stats of the node in the passed Stat object.  Pass a null stat if
+   * not interested.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param stat node status to set if node exists
+   * @return data of the specified znode, or null if does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static byte [] getDataNoWatch(ZooKeeperWatcher zkw, String znode,
+      Stat stat)
+  throws KeeperException {
+    try {
+      byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat);
+      zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode);
+      return data;
+    } catch (KeeperException.NoNodeException e) {
+      zkw.debug("Unable to get data of znode " + znode + " " +
+          "because node does not exist (not necessarily an error)");
+      return null;
+    } catch (KeeperException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.keeperException(e);
+      return null;
+    } catch (InterruptedException e) {
+      zkw.warn("Unable to get data of znode " + znode, e);
+      zkw.interruptedException(e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the data at the specified znode, deserialize it as an HServerAddress,
+   * and set a watch.
+   *
+   * Returns the data as a server address and sets a watch if the node exists.
+   * Returns null and no watch is set if the node does not exist or there is an
+   * exception.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @return data of the specified node as a server address, or null
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw,
+      String znode)
+  throws KeeperException {
+    byte [] data = getDataAndWatch(zkw, znode);
+    if(data == null) {
+      return null;
+    }
+    String addrString = Bytes.toString(data);
+    zkw.debug("Read server address from znode " + znode + ": " + addrString);
+    return new HServerAddress(addrString);
+  }
+
+  /**
+   * Update the data of an existing node with the expected version to have the
+   * specified data.
+   *
+   * Throws an exception if there is a version mismatch or some other problem.
+   *
+   * Sets no watches under any conditions.
+   *
+   * @param zkw zk reference
+   * @param znode
+   * @param data
+   * @param expectedVersion
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.BadVersionException if version mismatch
+   */
+  public static void updateExistingNodeData(ZooKeeperWatcher zkw, String znode,
+      byte [] data, int expectedVersion)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().setData(znode, data, expectedVersion);
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+  }
+
+  //
+  // Data setting
+  //
+
+  /**
+   * Set the specified znode to be an ephemeral node carrying the specified
+   * server address.  Used by masters for their ephemeral node and regionservers
+   * for their ephemeral node.
+   *
+   * If the node is created successfully, a watcher is also set on the node.
+   *
+   * If the node is not created successfully because it already exists, this
+   * method will also set a watcher on the node.
+   *
+   * If there is another problem, a KeeperException will be thrown.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param address server address
+   * @return true if address set, false if not, watch set in both cases
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean setAddressAndWatch(ZooKeeperWatcher zkw,
+      String znode, HServerAddress address)
+  throws KeeperException {
+    return createEphemeralNodeAndWatch(zkw, znode,
+        Bytes.toBytes(address.toString()));
+  }
+
+  /**
+   * Sets the data of the existing znode to be the specified data.  Ensures that
+   * the current data has the specified expected version.
+   *
+   * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+   *
+   * <p>If their is a version mismatch, method returns null.
+   *
+   * <p>No watches are set but setting data will trigger other watchers of this
+   * node.
+   *
+   * <p>If there is another problem, a KeeperException will be thrown.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param data data to set for node
+   * @param expectedVersion version expected when setting data
+   * @return true if data set, false if version mismatch
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean setData(ZooKeeperWatcher zkw, String znode,
+      byte [] data, int expectedVersion)
+  throws KeeperException, KeeperException.NoNodeException {
+    try {
+      return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null;
+    } catch (InterruptedException e) {
+      zkw.interruptedException(e);
+      return false;
+    }
+  }
+
+  /**
+   * Sets the data of the existing znode to be the specified data.  The node
+   * must exist but no checks are done on the existing data or version.
+   *
+   * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+   *
+   * <p>No watches are set but setting data will trigger other watchers of this
+   * node.
+   *
+   * <p>If there is another problem, a KeeperException will be thrown.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param data data to set for node
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void setData(ZooKeeperWatcher zkw, String znode,
+      byte [] data)
+  throws KeeperException, KeeperException.NoNodeException {
+    setData(zkw, znode, data, -1);
+  }
+
+  //
+  // Node creation
+  //
+
+  /**
+   *
+   * Set the specified znode to be an ephemeral node carrying the specified
+   * data.
+   *
+   * If the node is created successfully, a watcher is also set on the node.
+   *
+   * If the node is not created successfully because it already exists, this
+   * method will also set a watcher on the node.
+   *
+   * If there is another problem, a KeeperException will be thrown.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param data data of node
+   * @return true if node created, false if not, watch set in both cases
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw,
+      String znode, byte [] data)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL);
+    } catch (KeeperException.NodeExistsException nee) {
+      if(!watchAndCheckExists(zkw, znode)) {
+        // It did exist but now it doesn't, try again
+        return createEphemeralNodeAndWatch(zkw, znode, data);
+      }
+      return false;
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted", e);
+    }
+    return true;
+  }
+
+  /**
+   * Creates the specified znode to be a persistent node carrying the specified
+   * data.
+   *
+   * Returns true if the node was successfully created, false if the node
+   * already existed.
+   *
+   * If the node is created successfully, a watcher is also set on the node.
+   *
+   * If the node is not created successfully because it already exists, this
+   * method will also set a watcher on the node but return false.
+   *
+   * If there is another problem, a KeeperException will be thrown.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @param data data of node
+   * @return true if node created, false if not, watch set in both cases
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static boolean createNodeIfNotExistsAndWatch(
+      ZooKeeperWatcher zkw, String znode, byte [] data)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    } catch (KeeperException.NodeExistsException nee) {
+      try {
+        zkw.getZooKeeper().exists(znode, zkw);
+      } catch (InterruptedException e) {
+        zkw.interruptedException(e);
+        return false;
+      }
+      return false;
+    } catch (InterruptedException e) {
+      zkw.interruptedException(e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Creates the specified node with the specified data and watches it.
+   *
+   * <p>Throws an exception if the node already exists.
+   *
+   * <p>The node created is persistent and open access.
+   *
+   * <p>Returns the version number of the created node if successful.
+   *
+   * @param zkw zk reference
+   * @param znode path of node to create
+   * @param data data of node to create
+   * @return version of node created
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public static int createAndWatch(ZooKeeperWatcher zkw,
+      String znode, byte [] data)
+  throws KeeperException, KeeperException.NodeExistsException {
+    try {
+      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+      return zkw.getZooKeeper().exists(znode, zkw).getVersion();
+    } catch (InterruptedException e) {
+      zkw.interruptedException(e);
+      return -1;
+    }
+  }
+
+  /**
+   * Creates the specified node, if the node does not exist.  Does not set a
+   * watch and fails silently if the node already exists.
+   *
+   * The node created is persistent and open access.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void createAndFailSilent(ZooKeeperWatcher zkw,
+      String znode)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    } catch(KeeperException.NodeExistsException nee) {
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+  }
+
+  /**
+   * Creates the specified node and all parent nodes required for it to exist.
+   *
+   * No watches are set and no errors are thrown if the node already exists.
+   *
+   * The nodes created are persistent and open access.
+   *
+   * @param zkw zk reference
+   * @param znode path of node
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void createWithParents(ZooKeeperWatcher zkw,
+      String znode)
+  throws KeeperException {
+    try {
+      if(znode == null) {
+        return;
+      }
+      zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    } catch(KeeperException.NodeExistsException nee) {
+      return;
+    } catch(KeeperException.NoNodeException nne) {
+      createWithParents(zkw, getParent(znode));
+      createWithParents(zkw, znode);
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+  }
+
+  //
+  // Deletes
+  //
+
+  /**
+   * Delete the specified node.  Sets no watches.  Throws all exceptions.
+   */
+  public static void deleteNode(ZooKeeperWatcher zkw, String node)
+  throws KeeperException {
+    deleteNode(zkw, node, -1);
+  }
+
+  /**
+   * Delete the specified node with the specified version.  Sets no watches.
+   * Throws all exceptions.
+   */
+  public static boolean deleteNode(ZooKeeperWatcher zkw, String node,
+      int version)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().delete(node, version);
+      return true;
+    } catch(KeeperException.BadVersionException bve) {
+      return false;
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+      return false;
+    }
+  }
+
+  /**
+   * Deletes the specified node.  Fails silent if the node does not exist.
+   * @param zkw
+   * @param joinZNode
+   * @throws KeeperException
+   */
+  public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
+  throws KeeperException {
+    try {
+      zkw.getZooKeeper().delete(node, -1);
+    } catch(KeeperException.NoNodeException nne) {
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+  }
+
+  /**
+   * Delete the specified node and all of it's children.
+   *
+   * Sets no watches.  Throws all exceptions besides dealing with deletion of
+   * children.
+   */
+  public static void deleteNodeRecursively(ZooKeeperWatcher zkw, String node)
+  throws KeeperException {
+    try {
+      List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
+      if(!children.isEmpty()) {
+        for(String child : children) {
+          deleteNodeRecursively(zkw, joinZNode(node, child));
+        }
+      }
+      zkw.getZooKeeper().delete(node, -1);
+    } catch(InterruptedException ie) {
+      zkw.interruptedException(ie);
+    }
+  }
+
+  /**
+   * Delete all the children of the specified node but not the node itself.
+   *
+   * Sets no watches.  Throws all exceptions besides dealing with deletion of
+   * children.
+   */
+  public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
+  throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
+    if(!children.isEmpty()) {
+      for(String child : children) {
+        deleteNodeRecursively(zkw, joinZNode(node, child));
+      }
+    }
+  }
+
+  //
+  // ZooKeeper cluster information
+  //
+
+  /** @return String dump of everything in ZooKeeper. */
+  public static String dump(ZooKeeperWatcher zkw) {
+    StringBuilder sb = new StringBuilder();
+    try {
+      sb.append("\nHBase tree in ZooKeeper is rooted at ").append(zkw.baseZNode);
+      sb.append("\n  Cluster up? ").append(checkExists(zkw, zkw.clusterStateZNode));
+      sb.append("\n  Master address: ").append(
+          getDataAsAddress(zkw, zkw.masterAddressZNode));
+      sb.append("\n  Region server holding ROOT: ").append(
+          getDataAsAddress(zkw, zkw.rootServerZNode));
+      sb.append("\n  Region servers:");
+      for (HServerAddress address : listChildrenAndGetAsAddresses(zkw,
+          zkw.rsZNode)) {
+        sb.append("\n    - ").append(address);
+      }
+      sb.append("\n  Quorum Server Statistics:");
+      String[] servers = zkw.getQuorum().split(",");
+      for (String server : servers) {
+        sb.append("\n    - ").append(server);
+        try {
+          String[] stat = getServerStats(server);
+          for (String s : stat) {
+            sb.append("\n        ").append(s);
+          }
+        } catch (Exception e) {
+          sb.append("\n        ERROR: ").append(e.getMessage());
+        }
+      }
+    } catch(KeeperException ke) {
+      sb.append("\n  FATAL ZooKeeper Exception!\n");
+      sb.append("\n  " + ke.getMessage());
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Gets the statistics from the given server. Uses a 1 minute timeout.
+   *
+   * @param server  The server to get the statistics from.
+   * @return The array of response strings.
+   * @throws IOException When the socket communication fails.
+   */
+  public static String[] getServerStats(String server)
+  throws IOException {
+    return getServerStats(server, 60 * 1000);
+  }
+
+  /**
+   * Gets the statistics from the given server.
+   *
+   * @param server  The server to get the statistics from.
+   * @param timeout  The socket timeout to use.
+   * @return The array of response strings.
+   * @throws IOException When the socket communication fails.
+   */
+  public static String[] getServerStats(String server, int timeout)
+  throws IOException {
+    String[] sp = server.split(":");
+    Socket socket = new Socket(sp[0],
+      sp.length > 1 ? Integer.parseInt(sp[1]) : 2181);
+    socket.setSoTimeout(timeout);
+    PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
+    BufferedReader in = new BufferedReader(new InputStreamReader(
+      socket.getInputStream()));
+    out.println("stat");
+    out.flush();
+    ArrayList<String> res = new ArrayList<String>();
+    while (true) {
+      String line = in.readLine();
+      if (line != null) {
+        res.add(line);
+      } else {
+        break;
+      }
+    }
+    socket.close();
+    return res.toArray(new String[res.size()]);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+
+/**
+ * Base class for internal listeners of ZooKeeper events.
+ *
+ * The {@link ZooKeeperWatcher} for a process will execute the appropriate
+ * methods of implementations of this class.  In order to receive events from
+ * the watcher, every listener must register itself via {@link ZooKeeperWatcher#registerListener}.
+ *
+ * Subclasses need only override those methods in which they are interested.
+ *
+ * Note that the watcher will be blocked when invoking methods in listeners so
+ * they must not be long-running.
+ */
+public class ZooKeeperListener {
+
+  // Reference to the zk watcher which also contains configuration and constants
+  protected ZooKeeperWatcher watcher;
+
+  /**
+   * Construct a ZooKeeper event listener.
+   */
+  public ZooKeeperListener(ZooKeeperWatcher watcher) {
+    this.watcher = watcher;
+  }
+
+  /**
+   * Called when a new node has been created.
+   * @param path full path of the new node
+   */
+  public void nodeCreated(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when a node has been deleted
+   * @param path full path of the deleted node
+   */
+  public void nodeDeleted(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has changed data.
+   * @param path full path of the updated node
+   */
+  public void nodeDataChanged(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has a child node added or removed.
+   * @param path full path of the node whose children have changed
+   */
+  public void nodeChildrenChanged(String path) {
+    // no-op
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,173 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the availability and value of a single ZooKeeper node.
+ *
+ * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
+ * ZooKeeper events related to the node.
+ *
+ * <p>This is the base class used by trackers in both the Master and
+ * RegionServers.
+ */
+public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
+  /**
+   * Pass this if you do not want a timeout.
+   */
+  public final static long NO_TIMEOUT = -1;
+
+  /** Path of node being tracked */
+  protected final String node;
+
+  /** Data of the node being tracked */
+  private byte [] data;
+
+  /** Used to abort if a fatal error occurs */
+  protected final Abortable abortable;
+
+  /**
+   * Constructs a new ZK node tracker.
+   *
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher
+   * @param node
+   * @param abortable
+   */
+  public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
+      Abortable abortable) {
+    super(watcher);
+    this.node = node;
+    this.abortable = abortable;
+    this.data = null;
+  }
+
+  /**
+   * Starts the tracking of the node in ZooKeeper.
+   *
+   * <p>Use {@link blockUntilAvailable} to block until the node is available
+   * or {@link getData} to get the data of the node if it is available.
+   */
+  public synchronized void start() {
+    this.watcher.registerListener(this);
+    try {
+      if(ZKUtil.watchAndCheckExists(watcher, node)) {
+        byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+        if(data != null) {
+          this.data = data;
+        } else {
+          // It existed but now does not, try again to ensure a watch is set
+          start();
+        }
+      }
+    } catch (KeeperException e) {
+      abortable.abort("Unexpected exception during initialization, aborting", e);
+    }
+  }
+
+  /**
+   * Gets the data of the node, blocking until the node is available.
+   *
+   * @return data of the node
+   * @throws InterruptedException if the waiting thread is interrupted
+   */
+  public synchronized byte [] blockUntilAvailable()
+  throws InterruptedException {
+    return blockUntilAvailable(NO_TIMEOUT);
+  }
+
+  /**
+   * Gets the data of the node, blocking until the node is available or the
+   * specified timeout has elapsed.
+   *
+   * @param timeout maximum time to wait for the node data to be available,
+   *                in milliseconds.  Pass {@link #NO_TIMEOUT} for no timeout.
+   * @return data of the node
+   * @throws InterruptedException if the waiting thread is interrupted
+   */
+  public synchronized byte [] blockUntilAvailable(long timeout)
+  throws InterruptedException {
+    if (timeout != NO_TIMEOUT && timeout < 0) throw new IllegalArgumentException();
+    long startTime = System.currentTimeMillis();
+    long remaining = timeout;
+    while ((remaining == NO_TIMEOUT || remaining > 0) && this.data == null) {
+      if (remaining == NO_TIMEOUT) wait();
+      else wait(remaining);
+      remaining = timeout - (System.currentTimeMillis() - startTime);
+    }
+    return data;
+  }
+
+  /**
+   * Gets the data of the node.
+   *
+   * <p>If the node is currently available, the most up-to-date known version of
+   * the data is returned.  If the node is not currently available, null is
+   * returned.
+   *
+   * @return data of the node, null if unavailable
+   */
+  public synchronized byte [] getData() {
+    return data;
+  }
+
+  @Override
+  public synchronized void nodeCreated(String path) {
+    if(path.equals(node)) {
+      try {
+        byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+        if(data != null) {
+          this.data = data;
+          notifyAll();
+        } else {
+          nodeDeleted(path);
+        }
+      } catch(KeeperException e) {
+        abortable.abort("Unexpected exception handling nodeCreated event", e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void nodeDeleted(String path) {
+    if(path.equals(node)) {
+      try {
+        if(ZKUtil.watchAndCheckExists(watcher, node)) {
+          nodeCreated(path);
+        } else {
+          this.data = null;
+        }
+      } catch(KeeperException e) {
+        abortable.abort("Unexpected exception handling nodeDeleted event", e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void nodeDataChanged(String path) {
+    if(path.equals(node)) {
+      nodeCreated(path);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,361 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
+ * for each Master, RegionServer, and client process.
+ *
+ * <p>This is the only class that implements {@link Watcher}.  Other internal
+ * classes which need to be notified of ZooKeeper events must register with
+ * the local instance of this watcher via {@link #registerListener}.
+ *
+ * <p>This class also holds and manages the connection to ZooKeeper.  Code to deal
+ * with connection related events and exceptions are handled here.
+ */
+public class ZooKeeperWatcher implements Watcher {
+  private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
+
+  // name of this watcher (for logging only)
+  private String name;
+
+  // zookeeper quorum
+  private String quorum;
+
+  // zookeeper connection
+  private ZooKeeper zooKeeper;
+
+  // abortable in case of zk failure
+  private Abortable abortable;
+
+  // listeners to be notified
+  private final Set<ZooKeeperListener> listeners =
+    new CopyOnWriteArraySet<ZooKeeperListener>();
+
+  // set of unassigned nodes watched
+  private Set<String> unassignedNodes = new HashSet<String>();
+
+  // node names
+
+  // base znode for this cluster
+  public String baseZNode;
+  // znode containing location of server hosting root region
+  public String rootServerZNode;
+  // znode containing ephemeral nodes of the regionservers
+  public String rsZNode;
+  // znode of currently active master
+  public String masterAddressZNode;
+  // znode containing the current cluster state
+  public String clusterStateZNode;
+  // znode used for region transitioning and assignment
+  public String assignmentZNode;
+  // znode used for table disabling/enabling
+  public String tableZNode;
+
+  /**
+   * Instantiate a ZooKeeper connection and watcher.
+   * @param name name of this watcher, for logging/debug purposes only
+   * @throws IOException
+   */
+  public ZooKeeperWatcher(Configuration conf, String name,
+      Abortable abortable)
+  throws IOException {
+    this.name = name;
+    this.quorum = ZKConfig.getZKQuorumServersString(conf);
+    this.zooKeeper = ZKUtil.connect(conf, quorum, this);
+    this.abortable = abortable;
+    info("Connected to ZooKeeper");
+    setNodeNames(conf);
+    try {
+      // Create all the necessary "directories" of znodes
+      // TODO: Move this to an init method somewhere so not everyone calls it?
+      ZKUtil.createAndFailSilent(this, baseZNode);
+      ZKUtil.createAndFailSilent(this, assignmentZNode);
+      ZKUtil.createAndFailSilent(this, rsZNode);
+      ZKUtil.createAndFailSilent(this, tableZNode);
+    } catch (KeeperException e) {
+      error("Unexpected KeeperException creating base node", e);
+      error("Message: " + e.getMessage());
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Set the local variable node names using the specified configuration.
+   */
+  private void setNodeNames(Configuration conf) {
+    baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+    rootServerZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.rootserver", "root-region-server"));
+    rsZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.rs", "rs"));
+    masterAddressZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.master", "master"));
+    clusterStateZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.state", "shutdown"));
+    assignmentZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.unassigned", "unassigned"));
+    tableZNode = ZKUtil.joinZNode(baseZNode,
+        conf.get("zookeeper.znode.tableEnableDisable", "table"));
+  }
+
+  /**
+   * Register the specified listener to receive ZooKeeper events.
+   * @param listener
+   */
+  public void registerListener(ZooKeeperListener listener) {
+    listeners.add(listener);
+  }
+
+  /**
+   * Get the connection to ZooKeeper.
+   * @return connection reference to zookeeper
+   */
+  public ZooKeeper getZooKeeper() {
+    return zooKeeper;
+  }
+
+  /**
+   * Get the quorum address of this instance.
+   * @returns quorum string of this zookeeper connection instance
+   */
+  public String getQuorum() {
+    return quorum;
+  }
+
+  /**
+   * Method called from ZooKeeper for events and connection status.
+   *
+   * Valid events are passed along to listeners.  Connection status changes
+   * are dealt with locally.
+   */
+  @Override
+  public void process(WatchedEvent event) {
+    LOG.debug("<" + name + "> Received ZooKeeper Event, " +
+        "type=" + event.getType() + ", " +
+        "state=" + event.getState() + ", " +
+        "path=" + event.getPath());
+
+    // While we are still using both ZKWs, need to call parent process()
+//    super.process(event);
+
+    switch(event.getType()) {
+
+      // If event type is NONE, this is a connection status change
+      case None: {
+        connectionEvent(event);
+        break;
+      }
+
+      // Otherwise pass along to the listeners
+
+      case NodeCreated: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeCreated(event.getPath());
+        }
+        break;
+      }
+
+      case NodeDeleted: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeDeleted(event.getPath());
+        }
+        break;
+      }
+
+      case NodeDataChanged: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeDataChanged(event.getPath());
+        }
+        break;
+      }
+
+      case NodeChildrenChanged: {
+        for(ZooKeeperListener listener : listeners) {
+          listener.nodeChildrenChanged(event.getPath());
+        }
+        break;
+      }
+    }
+  }
+
+  // Connection management
+
+  /**
+   * Called when there is a connection-related event via the Watcher callback.
+   *
+   * If Disconnected or Expired, this should shutdown the cluster.
+   *
+   * @param event
+   */
+  private void connectionEvent(WatchedEvent event) {
+    switch(event.getState()) {
+      // SyncConnected is normal, ignore
+      case SyncConnected:
+        break;
+
+      // Abort the server if Disconnected or Expired
+      // TODO: Åny reason to handle these two differently?
+      case Disconnected:
+        info("Received Disconnected from ZooKeeper, ignoring");
+        break;
+      case Expired:
+        error("Received Expired from ZooKeeper, aborting server");
+        if(abortable != null) {
+          abortable.abort("Received Expired from ZooKeeper, aborting server", null);
+        }
+        break;
+    }
+  }
+
+  /**
+   * Get the set of already watched unassigned nodes.
+   * @return
+   */
+  public Set<String> getNodes() {
+    return unassignedNodes;
+  }
+
+  /**
+   * Handles KeeperExceptions in client calls.
+   *
+   * This may be temporary but for now this gives one place to deal with these.
+   *
+   * TODO: Currently this method rethrows the exception to let the caller handle
+   *
+   * @param ke
+   * @throws KeeperException
+   */
+  public void keeperException(KeeperException ke)
+  throws KeeperException {
+    error("Received unexpected KeeperException, re-throwing exception", ke);
+    throw ke;
+  }
+
+  /**
+   * Handles InterruptedExceptions in client calls.
+   *
+   * This may be temporary but for now this gives one place to deal with these.
+   *
+   * TODO: Currently, this method does nothing.
+   *       Is this ever expected to happen?  Do we abort or can we let it run?
+   *       Maybe this should be logged as WARN?  It shouldn't happen?
+   *
+   * @param ie
+   */
+  public void interruptedException(InterruptedException ie) {
+    debug("Received InterruptedException, doing nothing here", ie);
+    // no-op
+  }
+
+  // Logging methods
+
+  /**
+   * Exposed info logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void info(String string) {
+    LOG.info("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed debug logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void debug(String string) {
+    LOG.debug("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed debug logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void debug(String string, Throwable t) {
+    LOG.debug("<" + name + "> " + string, t);
+  }
+
+  /**
+   * Exposed warn logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void warn(String string) {
+    LOG.warn("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed warn logging method so our zookeeper output is named.
+   * @param string log line
+   * @param t exception
+   */
+  public void warn(String string, Throwable t) {
+    LOG.warn("<" + name + "> " + string, t);
+  }
+
+  /**
+   * Exposed error logging method so our zookeeper output is named.
+   * @param string log line
+   */
+  public void error(String string) {
+    LOG.error("<" + name + "> " + string);
+  }
+
+  /**
+   * Exposed error logging method so our zookeeper output is named.
+   * @param string log line
+   * @param t exception
+   */
+  public void error(String string, Throwable t) {
+    LOG.error("<" + name + "> " + string, t);
+  }
+
+  public boolean isDebugEnabled() {
+    return LOG.isDebugEnabled();
+  }
+
+  /**
+   * Close the connection to ZooKeeper.
+   * @throws InterruptedException
+   */
+  public void close() {
+    try {
+      if(zooKeeper != null) {
+        zooKeeper.close();
+//        super.close();
+      }
+    } catch (InterruptedException e) {
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/resources/hbase-webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-webapps/master/master.jsp?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-webapps/master/master.jsp (original)
+++ hbase/trunk/src/main/resources/hbase-webapps/master/master.jsp Tue Aug 31 23:51:44 2010
@@ -6,17 +6,16 @@
   import="org.apache.hadoop.hbase.util.FSUtils"
   import="org.apache.hadoop.hbase.master.HMaster"
   import="org.apache.hadoop.hbase.HConstants"
-  import="org.apache.hadoop.hbase.master.MetaRegion"
   import="org.apache.hadoop.hbase.client.HBaseAdmin"
   import="org.apache.hadoop.hbase.HServerInfo"
   import="org.apache.hadoop.hbase.HServerAddress"
   import="org.apache.hadoop.hbase.HTableDescriptor" %><%
   HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
   Configuration conf = master.getConfiguration();
-  HServerAddress rootLocation = master.getRegionManager().getRootRegionLocation();
-  Map<byte [], MetaRegion> onlineRegions = master.getRegionManager().getOnlineMetaRegions();
+  HServerAddress rootLocation = master.getCatalogTracker().getRootLocation();
+  boolean metaOnline = master.getCatalogTracker().getMetaLocation() != null;
   Map<String, HServerInfo> serverToServerInfos =
-    master.getServerManager().getServersToServerInfo();
+    master.getServerManager().getOnlineServers();
   int interval = conf.getInt("hbase.regionserver.msginterval", 1000)/1000;
   if (interval == 0) {
       interval = 1;
@@ -24,7 +23,7 @@
   boolean showFragmentation = conf.getBoolean("hbase.master.ui.fragmentation.enabled", false);
   Map<String, Integer> frags = null;
   if (showFragmentation) {
-      frags = master.getTableFragmentation();
+      frags = FSUtils.getTableFragmentation(master);
   }
 %><?xml version="1.0" encoding="UTF-8" ?>
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
@@ -66,13 +65,12 @@
 <tr><td>HBase Compiled</td><td><%= org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <%= org.apache.hadoop.hbase.util.VersionInfo.getUser() %></td><td>When HBase version was compiled and by whom</td></tr>
 <tr><td>Hadoop Version</td><td><%= org.apache.hadoop.util.VersionInfo.getVersion() %>, r<%= org.apache.hadoop.util.VersionInfo.getRevision() %></td><td>Hadoop version and svn revision</td></tr>
 <tr><td>Hadoop Compiled</td><td><%= org.apache.hadoop.util.VersionInfo.getDate() %>, <%= org.apache.hadoop.util.VersionInfo.getUser() %></td><td>When Hadoop version was compiled and by whom</td></tr>
-<tr><td>HBase Root Directory</td><td><%= master.getRootDir().toString() %></td><td>Location of HBase home directory</td></tr>
+<tr><td>HBase Root Directory</td><td><%= FSUtils.getRootDir(master.getConfiguration()).toString() %></td><td>Location of HBase home directory</td></tr>
 <tr><td>Load average</td><td><%= master.getServerManager().getAverageLoad() %></td><td>Average number of regions per regionserver. Naive computation.</td></tr>
-<tr><td>Regions On FS</td><td><%= master.getRegionManager().countRegionsOnFS() %></td><td>Number of regions on FileSystem. Rough count.</td></tr>
 <%  if (showFragmentation) { %>
         <tr><td>Fragmentation</td><td><%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
 <%  } %>
-<tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWrapper().getQuorumServers() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
+<tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
 </table>
 
 <h2>Catalog Tables</h2>
@@ -94,7 +92,7 @@
     <td>The -ROOT- table holds references to all .META. regions.</td>
 </tr>
 <%
-    if (onlineRegions != null && onlineRegions.size() > 0) { %>
+    if (metaOnline) { %>
 <tr>
     <td><a href="table.jsp?name=<%= Bytes.toString(HConstants.META_TABLE_NAME) %>"><%= Bytes.toString(HConstants.META_TABLE_NAME) %></a></td>
 <%  if (showFragmentation) { %>

Modified: hbase/trunk/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-webapps/master/table.jsp?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/trunk/src/main/resources/hbase-webapps/master/table.jsp Tue Aug 31 23:51:44 2010
@@ -9,8 +9,8 @@
   import="org.apache.hadoop.hbase.HServerInfo"
   import="org.apache.hadoop.hbase.io.ImmutableBytesWritable"
   import="org.apache.hadoop.hbase.master.HMaster" 
-  import="org.apache.hadoop.hbase.master.MetaRegion"
   import="org.apache.hadoop.hbase.util.Bytes"
+  import="org.apache.hadoop.hbase.util.FSUtils"
   import="java.util.Map"
   import="org.apache.hadoop.hbase.HConstants"%><%
   HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
@@ -19,11 +19,11 @@
   String tableName = request.getParameter("name");
   HTable table = new HTable(conf, tableName);
   String tableHeader = "<h2>Table Regions</h2><table><tr><th>Name</th><th>Region Server</th><th>Start Key</th><th>End Key</th></tr>";
-  HServerAddress rootLocation = master.getRegionManager().getRootRegionLocation();
+  HServerAddress rl = master.getCatalogTracker().getRootLocation();
   boolean showFragmentation = conf.getBoolean("hbase.master.ui.fragmentation.enabled", false);
   Map<String, Integer> frags = null;
   if (showFragmentation) {
-      frags = master.getTableFragmentation();
+      frags = FSUtils.getTableFragmentation(master);
   }
 %>
 
@@ -46,15 +46,19 @@
 <p><hr><p>
 <%
   if (action.equals("split")) {
+  /*
     if (key != null && key.length() > 0) {
       Writable[] arr = new Writable[1];
       arr[0] = new ImmutableBytesWritable(Bytes.toBytes(key));
       master.modifyTable(Bytes.toBytes(tableName), HConstants.Modify.TABLE_SPLIT, arr);
     } else {
-      master.modifyTable(Bytes.toBytes(tableName), HConstants.Modify.TABLE_SPLIT, null);
+      master.modifyTable(Bytes.toBytes(tableName), HConstants.Modify.TABLE_SPLIT);
     }
-    %> Split request accepted. <%
+    */
+    
+    %> Split request accepted -- BUT CURRENTLY A NOOP -- FIX!. <%
   } else if (action.equals("compact")) {
+  /*
     if (key != null && key.length() > 0) {
       Writable[] arr = new Writable[1];
       arr[0] = new ImmutableBytesWritable(Bytes.toBytes(key));
@@ -62,7 +66,8 @@
     } else {
       master.modifyTable(Bytes.toBytes(tableName), HConstants.Modify.TABLE_COMPACT, null);
     }
-    %> Compact request accepted. <%
+    */
+    %> Compact request accepted  -- BUT CURRENTLY A NOOP -- FIX! <%
   }
 %>
 <p>Reload.
@@ -84,12 +89,12 @@
 %>
 <%= tableHeader %>
 <%
-  int infoPort = master.getServerManager().getHServerInfo(rootLocation).getInfoPort();
-  String url = "http://" + rootLocation.getHostname() + ":" + infoPort + "/";
+  int infoPort = master.getServerManager().getHServerInfo(rl).getInfoPort();
+  String url = "http://" + rl.getHostname() + ":" + infoPort + "/";
 %>
 <tr>
   <td><%= tableName %></td>
-  <td><a href="<%= url %>"><%= rootLocation.getHostname() %>:<%= rootLocation.getPort() %></a></td>
+  <td><a href="<%= url %>"><%= rl.getHostname() %>:<%= rl.getPort() %></a></td>
   <td>-</td>
   <td></td>
   <td>-</td>
@@ -100,14 +105,16 @@
 %>
 <%= tableHeader %>
 <%
-  Map<byte [], MetaRegion> onlineRegions = master.getRegionManager().getOnlineMetaRegions();
-  for (MetaRegion meta: onlineRegions.values()) {
-    int infoPort = master.getServerManager().getHServerInfo(meta.getServer()).getInfoPort();
-    String url = "http://" + meta.getServer().getHostname() + ":" + infoPort + "/";
+  // NOTE: Presumes one meta region only.
+  HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO;
+  HServerAddress metaLocation = master.getCatalogTracker().getMetaLocation();
+  for (int i = 0; i <= 1; i++) {
+    int infoPort = master.getServerManager().getHServerInfo(metaLocation).getInfoPort();
+    String url = "http://" + metaLocation.getHostname() + ":" + infoPort + "/";
 %>
 <tr>
-  <td><%= Bytes.toString(meta.getRegionName()) %></td>
-    <td><a href="<%= url %>"><%= meta.getServer().getHostname().toString() + ":" + infoPort %></a></td>
+  <td><%= meta.getRegionNameAsString() %></td>
+    <td><a href="<%= url %>"><%= metaLocation.getHostname().toString() + ":" + infoPort %></a></td>
     <td>-</td><td><%= Bytes.toString(meta.getStartKey()) %></td><td><%= Bytes.toString(meta.getEndKey()) %></td>
 </tr>
 <%  } %>

Modified: hbase/trunk/src/main/resources/hbase-webapps/master/zk.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-webapps/master/zk.jsp?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-webapps/master/zk.jsp (original)
+++ hbase/trunk/src/main/resources/hbase-webapps/master/zk.jsp Tue Aug 31 23:51:44 2010
@@ -4,7 +4,8 @@
   import="org.apache.hadoop.hbase.client.HBaseAdmin"
   import="org.apache.hadoop.hbase.client.HConnection"
   import="org.apache.hadoop.hbase.HRegionInfo"
-  import="org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper"
+  import="org.apache.hadoop.hbase.zookeeper.ZKUtil"
+  import="org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher"
   import="org.apache.hadoop.hbase.HBaseConfiguration"
   import="org.apache.hadoop.hbase.master.HMaster" 
   import="org.apache.hadoop.hbase.HConstants"%><%
@@ -12,7 +13,7 @@
   Configuration conf = master.getConfiguration();
   HBaseAdmin hbadmin = new HBaseAdmin(conf);
   HConnection connection = hbadmin.getConnection();
-  ZooKeeperWrapper wrapper = connection.getZooKeeperWrapper();
+  ZooKeeperWatcher watcher = connection.getZooKeeperWatcher();
 %>
 
 <?xml version="1.0" encoding="UTF-8" ?>
@@ -29,7 +30,7 @@
 <p id="links_menu"><a href="/master.jsp">Master</a>, <a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a></p>
 <hr id="head_rule" />
 <pre>
-<%= wrapper.dump() %>
+<%= ZKUtil.dump(watcher) %>
 </pre>
 
 </body>

Modified: hbase/trunk/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original)
+++ hbase/trunk/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Tue Aug 31 23:51:44 2010
@@ -18,7 +18,7 @@
     e.printStackTrace();
   }
   RegionServerMetrics metrics = regionServer.getMetrics();
-  Collection<HRegionInfo> onlineRegions = regionServer.getSortedOnlineRegionInfos();
+  Collection<HRegionInfo> onlineRegions = regionServer.getOnlineRegions();
   int interval = regionServer.getConfiguration().getInt("hbase.regionserver.msginterval", 3000)/1000;
 
 %><?xml version="1.0" encoding="UTF-8" ?>
@@ -42,7 +42,7 @@
 <tr><td>HBase Version</td><td><%= org.apache.hadoop.hbase.util.VersionInfo.getVersion() %>, r<%= org.apache.hadoop.hbase.util.VersionInfo.getRevision() %></td><td>HBase version and svn revision</td></tr>
 <tr><td>HBase Compiled</td><td><%= org.apache.hadoop.hbase.util.VersionInfo.getDate() %>, <%= org.apache.hadoop.hbase.util.VersionInfo.getUser() %></td><td>When HBase version was compiled and by whom</td></tr>
 <tr><td>Metrics</td><td><%= metrics.toString() %></td><td>RegionServer Metrics; file and heap sizes are in megabytes</td></tr>
-<tr><td>Zookeeper Quorum</td><td><%= regionServer.getZooKeeperWrapper().getQuorumServers() %></td><td>Addresses of all registered ZK servers</td></tr>
+<tr><td>Zookeeper Quorum</td><td><%= regionServer.getZooKeeper().getQuorum() %></td><td>Addresses of all registered ZK servers</td></tr>
 </table>
 
 <h2>Online Regions</h2>
@@ -50,7 +50,7 @@
 <table>
 <tr><th>Region Name</th><th>Start Key</th><th>End Key</th><th>Metrics</th></tr>
 <%   for (HRegionInfo r: onlineRegions) { 
-        HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getRegionName());
+        HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getEncodedName());
  %>
 <tr><td><%= r.getRegionNameAsString() %></td>
     <td><%= Bytes.toStringBinary(r.getStartKey()) %></td><td><%= Bytes.toStringBinary(r.getEndKey()) %></td>

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,330 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test case that uses multiple threads to read and write multifamily rows
+ * into a table, verifying that reads never see partially-complete writes.
+ * 
+ * This can run as a junit test, or with a main() function which runs against
+ * a real cluster (eg for testing with failures, region movement, etc)
+ */
+public class BROKE_TODO_FIX_TestAcidGuarantees {
+  protected static final Log LOG = LogFactory.getLog(BROKE_TODO_FIX_TestAcidGuarantees.class);
+  public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
+  public static final byte [] FAMILY_A = Bytes.toBytes("A");
+  public static final byte [] FAMILY_B = Bytes.toBytes("B");
+  public static final byte [] FAMILY_C = Bytes.toBytes("C");
+  public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
+
+  public static final byte[][] FAMILIES = new byte[][] {
+    FAMILY_A, FAMILY_B, FAMILY_C };
+
+  private HBaseTestingUtility util;
+
+  public static int NUM_COLS_TO_CHECK = 50;
+
+  private void createTableIfMissing()
+    throws IOException {
+    try {
+      util.createTable(TABLE_NAME, FAMILIES);
+    } catch (TableExistsException tee) {
+    }
+  }
+
+  public BROKE_TODO_FIX_TestAcidGuarantees() {
+    // Set small flush size for minicluster so we exercise reseeking scanners
+    Configuration conf = HBaseConfiguration.create();
+    conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+    util = new HBaseTestingUtility(conf);
+  }
+  
+  /**
+   * Thread that does random full-row writes into a table.
+   */
+  public static class AtomicityWriter extends RepeatingTestThread {
+    Random rand = new Random();
+    byte data[] = new byte[10];
+    byte targetRows[][];
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numWritten = new AtomicLong();
+    
+    public AtomicityWriter(TestContext ctx, byte targetRows[][],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRows = targetRows;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+    public void doAnAction() throws Exception {
+      // Pick a random row to write into
+      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+      Put p = new Put(targetRow); 
+      rand.nextBytes(data);
+
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          p.add(family, qualifier, data);
+        }
+      }
+      table.put(p);
+      numWritten.getAndIncrement();
+    }
+  }
+  
+  /**
+   * Thread that does single-row reads in a table, looking for partially
+   * completed rows.
+   */
+  public static class AtomicGetReader extends RepeatingTestThread {
+    byte targetRow[];
+    byte targetFamilies[][];
+    HTable table;
+    int numVerified = 0;
+    AtomicLong numRead = new AtomicLong();
+
+    public AtomicGetReader(TestContext ctx, byte targetRow[],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRow = targetRow;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Get g = new Get(targetRow);
+      Result res = table.get(g);
+      byte[] gotValue = null;
+      if (res.getRow() == null) {
+        // Trying to verify but we didn't find the row - the writing
+        // thread probably just hasn't started writing yet, so we can
+        // ignore this action
+        return;
+      }
+      
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+            gotFailure(gotValue, res);
+          }
+          numVerified++;
+          gotValue = thisValue;
+        }
+      }
+      numRead.getAndIncrement();
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numVerified).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (KeyValue kv : res.list()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(kv.getValue()));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+  
+  /**
+   * Thread that does full scans of the table looking for any partially completed
+   * rows.
+   */
+  public static class AtomicScanReader extends RepeatingTestThread {
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numScans = new AtomicLong();
+    AtomicLong numRowsScanned = new AtomicLong();
+
+    public AtomicScanReader(TestContext ctx,
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Scan s = new Scan();
+      for (byte[] family : targetFamilies) {
+        s.addFamily(family);
+      }
+      ResultScanner scanner = table.getScanner(s);
+      
+      for (Result res : scanner) {
+        byte[] gotValue = null;
+  
+        for (byte[] family : targetFamilies) {
+          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+            byte qualifier[] = Bytes.toBytes("col" + i);
+            byte thisValue[] = res.getValue(family, qualifier);
+            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+              gotFailure(gotValue, res);
+            }
+            gotValue = thisValue;
+          }
+        }
+        numRowsScanned.getAndIncrement();
+      }
+      numScans.getAndIncrement();
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numRowsScanned).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (KeyValue kv : res.list()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(kv.getValue()));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+
+
+  public void runTestAtomicity(long millisToRun,
+      int numWriters,
+      int numGetters,
+      int numScanners,
+      int numUniqueRows) throws Exception {
+    createTableIfMissing();
+    TestContext ctx = new TestContext(util.getConfiguration());
+    
+    byte rows[][] = new byte[numUniqueRows][];
+    for (int i = 0; i < numUniqueRows; i++) {
+      rows[i] = Bytes.toBytes("test_row_" + i);
+    }
+    
+    List<AtomicityWriter> writers = Lists.newArrayList();
+    for (int i = 0; i < numWriters; i++) {
+      AtomicityWriter writer = new AtomicityWriter(
+          ctx, rows, FAMILIES);
+      writers.add(writer);
+      ctx.addThread(writer);
+    }
+
+    List<AtomicGetReader> getters = Lists.newArrayList();
+    for (int i = 0; i < numGetters; i++) {
+      AtomicGetReader getter = new AtomicGetReader(
+          ctx, rows[i % numUniqueRows], FAMILIES);
+      getters.add(getter);
+      ctx.addThread(getter);
+    }
+    
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+    
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+    
+    LOG.info("Finished test. Writers:");
+    for (AtomicityWriter writer : writers) {
+      LOG.info("  wrote " + writer.numWritten.get());
+    }
+    LOG.info("Readers:");
+    for (AtomicGetReader reader : getters) {
+      LOG.info("  read " + reader.numRead.get());
+    }
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+
+  @Test
+  public void testGetAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 5, 0, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  @Test
+  @Ignore("Currently not passing - see HBASE-2670")
+  public void testScanAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 0, 5, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  @Test
+  @Ignore("Currently not passing - see HBASE-2670")
+  public void testMixedAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 2, 2, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  public static void main(String args[]) throws Exception {
+    Configuration c = HBaseConfiguration.create();
+    BROKE_TODO_FIX_TestAcidGuarantees test = new BROKE_TODO_FIX_TestAcidGuarantees();
+    test.setConf(c);
+    test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
+  }
+
+  private void setConf(Configuration c) {
+    util = new HBaseTestingUtility(c);
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java Tue Aug 31 23:51:44 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.ReflectionUtils;
 



Mime
View raw message