Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 49780 invoked from network); 28 Aug 2010 00:32:59 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 28 Aug 2010 00:32:59 -0000 Received: (qmail 29066 invoked by uid 500); 28 Aug 2010 00:32:59 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 28987 invoked by uid 500); 28 Aug 2010 00:32:59 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 28980 invoked by uid 99); 28 Aug 2010 00:32:59 -0000 Received: from Unknown (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Aug 2010 00:32:59 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Aug 2010 00:32:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6528323888FE; Sat, 28 Aug 2010 00:31:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r990310 - in /hbase/branches/0.90_master_rewrite/src: main/java/org/apache/hadoop/hbase/replication/ main/java/org/apache/hadoop/hbase/replication/master/ main/java/org/apache/hadoop/hbase/replication/regionserver/ main/java/org/apache/hado... Date: Sat, 28 Aug 2010 00:31:19 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100828003119.6528323888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Sat Aug 28 00:31:18 2010 New Revision: 990310 URL: http://svn.apache.org/viewvc?rev=990310&view=rev Log: Bringing over ReplicationZooKeeperWatcher to use new ZK regime. I renamed RZW as RZ. M src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java M src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Renamed RZW as RZ. M src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Javadoc. M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Added getData. M src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java javadoc. M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java M src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Renamed RZW as RZ. private float ratio; A src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java D src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Renamed Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Removed: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=990310&view=auto ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (added) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Sat Aug 28 00:31:18 2010 @@ -0,0 +1,558 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; + +/** + * This class serves as a helper for all things related to zookeeper + * in replication. + *

+ * The layout looks something like this under zookeeper.znode.parent + * for the master cluster: + *

+ *

+ * replication/
+ *  master     {contains a full cluster address}
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * 
+ */ +public class ReplicationZookeeper { + private static final Log LOG = + LogFactory.getLog(ReplicationZookeeper.class); + // Name of znode we use to lock when failover + private final static String RS_LOCK_ZNODE = "lock"; + // Our handle on zookeeper + private final ZooKeeperWatcher zookeeper; + // Map of addresses of peer clusters with their ZKW + private final Map peerClusters; + // Path to the root replication znode + private final String replicationZNode; + // Path to the peer clusters znode + private final String peersZNode; + // Path to the znode that contains all RS that replicates + private final String rsZNode; + // Path to this region server's name under rsZNode + private final String rsServerNameZnode; + // Name node if the replicationState znode + private final String replicationStateNodeName; + // If this RS is part of a master cluster + private final boolean replicationMaster; + private final Configuration conf; + // Is this cluster replicating at the moment? + private final AtomicBoolean replicating; + // Byte (stored as string here) that identifies this cluster + private final String clusterId; + // Abortable + private final Abortable abortable; + + /** + * Constructor used by region servers, connects to the peer cluster right away. + * + * @param zookeeper + * @param conf conf to use + * @param replicating atomic boolean to start/stop replication + * @param rsName the name of this region server, null if + * using RZH only to use the helping methods + * @throws IOException + * @throws KeeperException + */ + public ReplicationZookeeper(final Server server, + final Configuration conf, final AtomicBoolean replicating, String rsName) + throws IOException, KeeperException { + this.abortable = server; + this.zookeeper = server.getZooKeeper(); + this.conf = conf; + String replicationZNodeName = + conf.get("zookeeper.znode.replication", "replication"); + String peersZNodeName = + conf.get("zookeeper.znode.replication.peers", "peers"); + String repMasterZNodeName = + conf.get("zookeeper.znode.replication.master", "master"); + this.replicationStateNodeName = + conf.get("zookeeper.znode.replication.state", "state"); + String clusterIdZNodeName = + conf.get("zookeeper.znode.replication.clusterId", "clusterId"); + String rsZNodeName = + conf.get("zookeeper.znode.replication.rs", "rs"); + String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + + this.conf.get("hbase.zookeeper.property.clientPort") + ":" + + this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + + this.peerClusters = new HashMap(); + this.replicationZNode = + ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); + this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); + this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); + + this.replicating = replicating; + setReplicating(); + String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName); + byte [] data = ZKUtil.getData(this.zookeeper, znode); + String idResult = Bytes.toString(data); + this.clusterId = idResult == null? + Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult; + + znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName); + data = ZKUtil.getData(this.zookeeper, znode); + String address = Bytes.toString(data); + this.replicationMaster = thisCluster.equals(address); + LOG.info("This cluster (" + thisCluster + ") is a " + + (this.replicationMaster ? "master" : "slave") + " for replication" + + ", compared with (" + address + ")"); + + if (rsName != null) { + this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, rsName); + // Set a tracker on replicationStateNodeNode + ReplicationStatusTracker tracker = + new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server); + tracker.start(); + + List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + if (znodes != null) { + for (String z : znodes) { + connectToPeer(z); + } + } + } else { + this.rsServerNameZnode = null; + } + + } + + /** + * Returns all region servers from given peer + * + * @param peerClusterId (byte) the cluster to interrogate + * @return addresses of all region servers + */ + public List getPeersAddresses(String peerClusterId) { + if (this.peerClusters.size() == 0) { + return new ArrayList(0); + } + ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId); + return zkw == null? + new ArrayList(0): + zkw.scanAddressDirectory(this.zookeeper.rsZNode); + } + + /** + * Scan a directory of address data. + * @param znode The parent node + * @return The directory contents as HServerAddresses + */ + public List scanAddressDirectory(String znode) { + List list = new ArrayList(); + List nodes = null; + try { + nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Scanning " + znode, e); + } + if (nodes == null) { + return list; + } + for (String node : nodes) { + String path = ZKUtil.joinZNode(znode, node); + list.add(readAddress(path)); + } + return list; + } + + private HServerAddress readAddress(String znode) { + byte [] data = null; + try { + data = ZKUtil.getData(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Getting address", e); + } + return new HServerAddress(Bytes.toString(data)); + } + + /** + * This method connects this cluster to another one and registers it + * in this region server's replication znode + * @param peerId id of the peer cluster + * @throws KeeperException + */ + private void connectToPeer(String peerId) throws IOException, KeeperException { + String znode = ZKUtil.joinZNode(this.peersZNode, peerId); + byte [] data = ZKUtil.getData(this.zookeeper, znode); + String [] ensemble = Bytes.toString(data).split(":"); + if (ensemble.length != 3) { + throw new IllegalArgumentException("Wrong format of cluster address: " + + Bytes.toStringBinary(data)); + } + Configuration otherConf = new Configuration(this.conf); + otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]); + otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]); + otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]); + // REENABLE -- FIX!!!! + /* + ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf, + "connection to cluster: " + peerId); + zkw.registerListener(new ReplicationStatusWatcher()); + this.peerClusters.put(peerId, zkw); + this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode( + this.rsServerNameZnode, peerId)); + */ + LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble)); + } + + /** + * This reads the state znode for replication and sets the atomic boolean + */ + private void setReplicating() { + try { + byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode()); + String value = Bytes.toString(data); + if (value == null) LOG.info(getRepStateNode() + " data is null"); + else { + this.replicating.set(Boolean.parseBoolean(value)); + LOG.info("Replication is now " + (this.replicating.get()? + "started" : "stopped")); + } + } catch (KeeperException e) { + this.abortable.abort("Failed getting data on from " + getRepStateNode(), e); + } + } + + private String getRepStateNode() { + return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName); + } + + /** + * Add a new log to the list of hlogs in zookeeper + * @param filename name of the hlog's znode + * @param clusterId name of the cluster's znode + */ + public void addLogToList(String filename, String clusterId) { + try { + String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes("")); + } catch (KeeperException e) { + this.abortable.abort("Failed add log to list", e); + } + } + + /** + * Remove a log from the list of hlogs in zookeeper + * @param filename name of the hlog's znode + * @param clusterId name of the cluster's znode + */ + public void removeLogFromList(String filename, String clusterId) { + try { + String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.deleteChildrenRecursively(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed remove from list", e); + } + } + + /** + * Set the current position of the specified cluster in the current hlog + * @param filename filename name of the hlog's znode + * @param clusterId clusterId name of the cluster's znode + * @param position the position in the file + * @throws IOException + */ + public void writeReplicationStatus(String filename, String clusterId, + long position) { + try { + String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); + znode = ZKUtil.joinZNode(znode, filename); + // Why serialize String of Long and note Long as bytes? + ZKUtil.createAndWatch(this.zookeeper, znode, + Bytes.toBytes(Long.toString(position))); + } catch (KeeperException e) { + this.abortable.abort("Writing replication status", e); + } + } + + /** + * Get a list of all the other region servers in this cluster + * and set a watch + * @param watch the watch to set + * @return a list of server nanes + */ + public List getRegisteredRegionServers(Watcher watch) { + List result = null; + try { + // TODO: This is rsZNode from zk which is like getListOfReplicators + // but maybe these are from different zk instances? + result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); + } catch (KeeperException e) { + this.abortable.abort("Get list of registered region servers", e); + } + return result; + } + + /** + * Get the list of the replicators that have queues, they can be alive, dead + * or simply from a previous run + * @param watch the watche to set + * @return a list of server names + */ + public List getListOfReplicators() { + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); + } catch (KeeperException e) { + this.abortable.abort("Get list of replicators", e); + } + return result; + } + + /** + * Get the list of peer clusters for the specified server names + * @param rs server names of the rs + * @param watch the watch to set + * @return a list of peer cluster + */ + public List getListPeersForRS(String rs) { + String znode = ZKUtil.joinZNode(rsZNode, rs); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Get list of peers for rs", e); + } + return result; + } + + /** + * Get the list of hlogs for the specified region server and peer cluster + * @param rs server names of the rs + * @param id peer cluster + * @param watch the watch to set + * @return a list of hlogs + */ + public List getListHLogsForPeerForRS(String rs, String id) { + String znode = ZKUtil.joinZNode(rsZNode, rs); + znode = ZKUtil.joinZNode(znode, id); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Get list of hlogs for peer", e); + } + return result; + } + + /** + * Try to set a lock in another server's znode. + * @param znode the server names of the other server + * @return true if the lock was acquired, false in every other cases + */ + public boolean lockOtherRS(String znode) { + try { + String parent = ZKUtil.joinZNode(this.rsZNode, znode); + String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); + ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode)); + } catch (KeeperException e) { + this.abortable.abort("Failed lock other rs", e); + } + return true; + } + + /** + * This methods copies all the hlogs queues from another region server + * and returns them all sorted per peer cluster (appended with the dead + * server's znode) + * @param znode server names to copy + * @return all hlogs for all peers of that cluster, null if an error occurred + */ + public SortedMap> copyQueuesFromRS(String znode) { + // TODO this method isn't atomic enough, we could start copying and then + // TODO fail for some reason and we would end up with znodes we don't want. + SortedMap> queues = + new TreeMap>(); + try { + String nodePath = ZKUtil.joinZNode(rsZNode, znode); + List clusters = + ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); + // We have a lock znode in there, it will count as one. + if (clusters == null || clusters.size() <= 1) { + return queues; + } + // The lock isn't a peer cluster, remove it + clusters.remove(RS_LOCK_ZNODE); + for (String cluster : clusters) { + // We add the name of the recovered RS to the new znode, we can even + // do that for queues that were recovered 10 times giving a znode like + // number-startcode-number-otherstartcode-number-anotherstartcode-etc + String newCluster = cluster+"-"+znode; + String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster); + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, + HConstants.EMPTY_BYTE_ARRAY); + String clusterPath = ZKUtil.joinZNode(nodePath, cluster); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); + // That region server didn't have anything to replicate for this cluster + if (hlogs == null || hlogs.size() == 0) { + continue; + } + SortedSet logQueue = new TreeSet(); + queues.put(newCluster, logQueue); + for (String hlog : hlogs) { + String z = ZKUtil.joinZNode(clusterPath, hlog); + byte [] position = ZKUtil.getData(this.zookeeper, z); + LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position)); + String child = ZKUtil.joinZNode(newClusterZnode, hlog); + ZKUtil.createAndWatch(this.zookeeper, child, position); + logQueue.add(hlog); + } + } + } catch (KeeperException e) { + this.abortable.abort("Copy queues from rs", e); + } + return queues; + } + + /** + * Delete a complete queue of hlogs + * @param peerZnode znode of the peer cluster queue of hlogs to delete + */ + public void deleteSource(String peerZnode) { + try { + ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.joinZNode(rsServerNameZnode, peerZnode)); + } catch (KeeperException e) { + this.abortable.abort("Failed delete of " + peerZnode, e); + } + } + + /** + * Recursive deletion of all znodes in specified rs' znode + * @param znode + */ + public void deleteRsQueues(String znode) { + try { + ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.joinZNode(rsZNode, znode)); + } catch (KeeperException e) { + this.abortable.abort("Failed delete of " + znode, e); + } + } + + /** + * Delete this cluster's queues + */ + public void deleteOwnRSZNode() { + deleteRsQueues(this.rsServerNameZnode); + } + + /** + * Get the position of the specified hlog in the specified peer znode + * @param peerId znode of the peer cluster + * @param hlog name of the hlog + * @return the position in that hlog + * @throws KeeperException + */ + public long getHLogRepPosition(String peerId, String hlog) + throws KeeperException { + String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId); + String znode = ZKUtil.joinZNode(clusterZnode, hlog); + String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode)); + return data == null || data.length() == 0 ? 0 : Long.parseLong(data); + } + + /** + * Tells if this cluster replicates or not + * + * @return if this is a master + */ + public boolean isReplicationMaster() { + return this.replicationMaster; + } + + /** + * Get the identification of the cluster + * + * @return the id for the cluster + */ + public String getClusterId() { + return this.clusterId; + } + + /** + * Get a map of all peer clusters + * @return map of peer cluster, zk address to ZKW + */ + public Map getPeerClusters() { + return this.peerClusters; + } + + /** + * Tracker for status of the replication + */ + public class ReplicationStatusTracker extends ZooKeeperNodeTracker { + public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node, + Abortable abortable) { + super(watcher, node, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + super.nodeDataChanged(path); + setReplicating(); + } + } +} \ No newline at end of file Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Sat Aug 28 00:31:18 2010 @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; // REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -45,7 +45,7 @@ public class ReplicationLogCleaner imple private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private Configuration conf; - private ReplicationZookeeperWrapper zkHelper; + private ReplicationZookeeper zkHelper; private Set hlogs = new HashSet(); /** Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Sat Aug 28 00:31:18 2010 @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALObserver; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; /** @@ -47,7 +47,7 @@ public class Replication implements WALO private final ReplicationSourceManager replicationManager; private boolean replicationMaster; private final AtomicBoolean replicating = new AtomicBoolean(true); - private final ReplicationZookeeperWrapper zkHelper; + private final ReplicationZookeeper zkHelper; private final Configuration conf; private ReplicationSink replicationSink; // Hosting server @@ -68,7 +68,7 @@ public class Replication implements WALO this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); if (replication) { - this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(), + this.zkHelper = new ReplicationZookeeper(server.getZooKeeper(), this.conf, this.replicating, this.server.getServerName()); this.replicationMaster = zkHelper.isReplicationMaster(); this.replicationManager = this.replicationMaster ? Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Aug 28 00:31:18 2010 @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.ipc.HRegi import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -76,7 +76,7 @@ public class ReplicationSource extends T private HLog.Entry[] entriesArray; private HConnection conn; // Helper class for zookeeper - private ReplicationZookeeperWrapper zkHelper; + private ReplicationZookeeper zkHelper; private Configuration conf; // ratio of region servers to chose from a slave cluster private float ratio; Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Sat Aug 28 00:31:18 2010 @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.Stoppable import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALObserver; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -64,7 +64,7 @@ public class ReplicationSourceManager { // Indicates if we are currently replicating private final AtomicBoolean replicating; // Helper for zookeeper - private final ReplicationZookeeperWrapper zkHelper; + private final ReplicationZookeeper zkHelper; // All about stopping private final Stoppable stopper; // All logs we are currently trackign @@ -91,7 +91,7 @@ public class ReplicationSourceManager { * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ - public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper, + public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, final Stoppable stopper, final FileSystem fs, @@ -231,7 +231,7 @@ public class ReplicationSourceManager { * Get the ZK help of this manager * @return the helper */ - public ReplicationZookeeperWrapper getRepZkWrapper() { + public ReplicationZookeeper getRepZkWrapper() { return zkHelper; } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java Sat Aug 28 00:31:18 2010 @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hbase.HBaseConfiguration; /** - * Tool for reading ZooKeeper servers from HBase XML configuation and producing + * Tool for reading ZooKeeper servers from HBase XML configuration and producing * a line-by-line list for use by bash scripts. */ public class ZKServerTool { Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sat Aug 28 00:31:18 2010 @@ -443,6 +443,31 @@ public class ZKUtil { // /** + * Get znode data. Does not set a watcher. + * @return ZNode data + */ + public static byte [] getData(ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + byte [] data = zkw.getZooKeeper().getData(znode, null, null); + zkw.debug("Retrieved " + data.length + " bytes of data from znode " + znode); + return data; + } catch (KeeperException.NoNodeException e) { + zkw.debug("Unable to get data of znode " + znode + " " + + "because node does not exist (not an error)"); + return null; + } catch (KeeperException e) { + zkw.warn("Unable to get data of znode " + znode, e); + zkw.keeperException(e); + return null; + } catch (InterruptedException e) { + zkw.warn("Unable to get data of znode " + znode, e); + zkw.interruptedException(e); + return null; + } + } + + /** * Get the data at the specified znode and set a watch. * * Returns the data and sets a watch if the node exists. Returns null and no Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Sat Aug 28 00:31:18 2010 @@ -38,11 +38,11 @@ import org.apache.zookeeper.ZooKeeper; * Acts as the single ZooKeeper Watcher. One instance of this is instantiated * for each Master, RegionServer, and client process. * - * This is the only class that implements {@link Watcher}. Other internal + *

This is the only class that implements {@link Watcher}. Other internal * classes which need to be notified of ZooKeeper events must register with * the local instance of this watcher via {@link #registerListener}. * - * This class also holds and manages the connection to ZooKeeper. Code to deal + *

This class also holds and manages the connection to ZooKeeper. Code to deal * with connection related events and exceptions are handled here. */ public class ZooKeeperWatcher implements Watcher { Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Sat Aug 28 00:31:18 2010 @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -41,7 +41,7 @@ public class TestLogsCleaner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private ReplicationZookeeperWrapper zkHelper; + private ReplicationZookeeper zkHelper; /** * @throws java.lang.Exception Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=990310&r1=990309&r2=990310&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Sat Aug 28 00:31:18 2010 @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; // REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.junit.After;