hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1507498 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-s...
Date Fri, 26 Jul 2013 22:44:01 GMT
Author: stack
Date: Fri Jul 26 22:44:00 2013
New Revision: 1507498

URL: http://svn.apache.org/r1507498
Log:
HBASE-8439 [replication] Remove ReplicationZookeeper class

Added:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
Removed:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Fri Jul 26 22:44:00 2013
@@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -59,8 +61,9 @@ import java.util.Map;
 public class ReplicationAdmin implements Closeable {
   private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
 
-  private final ReplicationZookeeper replicationZk;
   private final HConnection connection;
+  private final ReplicationQueuesClient replicationQueuesClient;
+  private final ReplicationPeers replicationPeers;
 
   /**
    * Constructor that creates a connection to the local ZooKeeper ensemble.
@@ -76,7 +79,12 @@ public class ReplicationAdmin implements
     this.connection = HConnectionManager.getConnection(conf);
     ZooKeeperWatcher zkw = createZooKeeperWatcher();
     try {
-      this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw);
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
+      this.replicationPeers.init();
+      this.replicationQueuesClient =
+          ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
+      this.replicationQueuesClient.init();
+
     } catch (KeeperException e) {
       throw new IOException("Unable setup the ZooKeeper connection", e);
     }
@@ -109,7 +117,7 @@ public class ReplicationAdmin implements
    * multi-slave isn't supported yet.
    */
   public void addPeer(String id, String clusterKey) throws IOException {
-    this.replicationZk.addPeer(id, clusterKey);
+    this.replicationPeers.addPeer(id, clusterKey);
   }
 
   /**
@@ -117,7 +125,7 @@ public class ReplicationAdmin implements
    * @param id a short that identifies the cluster
    */
   public void removePeer(String id) throws IOException {
-    this.replicationZk.removePeer(id);
+    this.replicationPeers.removePeer(id);
   }
 
   /**
@@ -125,7 +133,7 @@ public class ReplicationAdmin implements
    * @param id a short that identifies the cluster
    */
   public void enablePeer(String id) throws IOException {
-    this.replicationZk.enablePeer(id);
+    this.replicationPeers.enablePeer(id);
   }
 
   /**
@@ -133,7 +141,7 @@ public class ReplicationAdmin implements
    * @param id a short that identifies the cluster
    */
   public void disablePeer(String id) throws IOException {
-    this.replicationZk.disablePeer(id);
+    this.replicationPeers.disablePeer(id);
   }
 
   /**
@@ -141,7 +149,7 @@ public class ReplicationAdmin implements
    * @return number of slave clusters
    */
   public int getPeersCount() {
-    return this.replicationZk.listPeersIdsAndWatch().size();
+    return this.replicationPeers.getAllPeerIds().size();
   }
 
   /**
@@ -149,15 +157,7 @@ public class ReplicationAdmin implements
    * @return A map of peer ids to peer cluster keys
    */
   public Map<String, String> listPeers() {
-    return this.replicationZk.listPeers();
-  }
-
-  /**
-   * Get the ZK-support tool created and used by this object for replication.
-   * @return the ZK-support tool
-   */
-  ReplicationZookeeper getReplicationZk() {
-    return replicationZk;
+    return this.replicationPeers.getAllPeerClusterKeys();
   }
 
   @Override

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java?rev=1507498&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java Fri Jul 26 22:44:00 2013
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * A factory class for instantiating replication objects that deal with replication state.
+ */
+public class ReplicationFactory {
+
+  public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk,
+      Configuration conf, Abortable abortable) {
+    return new ReplicationQueuesZKImpl(zk, conf, abortable);
+  }
+
+  public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk,
+      Configuration conf, Abortable abortable) {
+    return new ReplicationQueuesClientZKImpl(zk, conf, abortable);
+  }
+
+  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+      Abortable abortable) {
+    return new ReplicationPeersZKImpl(zk, conf, abortable);
+  }
+
+  public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
+      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
+      Stoppable stopper) {
+    return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java?rev=1507498&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java Fri Jul 26 22:44:00 2013
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * The replication listener interface can be implemented if a class needs to subscribe to events
+ * generated by the ReplicationTracker. These events include things like addition/deletion of peer
+ * clusters or failure of a local region server. To receive events, the class also needs to register
+ * itself with a Replication Tracker.
+ */
+@InterfaceAudience.Private
+public interface ReplicationListener {
+
+  /**
+   * A region server has been removed from the local cluster
+   * @param regionServer the removed region server
+   */
+  public void regionServerRemoved(String regionServer);
+
+  /**
+   * A peer cluster has been removed (i.e. unregistered) from replication.
+   * @param peerId The peer id of the cluster that has been removed
+   */
+  public void peerRemoved(String peerId);
+
+  /**
+   * The list of registered peer clusters has changed.
+   * @param peerIds A list of all currently registered peer clusters
+   */
+  public void peerListChanged(List<String> peerIds);
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java Fri Jul 26 22:44:00 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.replicat
 
 import java.util.List;
 
+import org.apache.zookeeper.KeeperException;
+
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
  * keep track of the HLogs that still need to be replicated to remote clusters.
@@ -27,6 +29,11 @@ import java.util.List;
 public interface ReplicationQueuesClient {
 
   /**
+   * Initialize the replication queue client interface.
+   */
+  public void init() throws KeeperException;
+
+  /**
    * Get a list of all region servers that have outstanding replication queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java Fri Jul 26 22:44:00 2013
@@ -30,8 +30,12 @@ public class ReplicationQueuesClientZKIm
     ReplicationQueuesClient {
 
   public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
-      Abortable abortable) throws KeeperException {
+      Abortable abortable) {
     super(zk, conf, abortable);
+  }
+
+  @Override
+  public void init() throws KeeperException {
     ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
   }
 

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Fri Jul 26 22:44:00 2013
@@ -95,7 +95,16 @@ public abstract class ReplicationStateZK
     return ProtobufUtil.prependPBMagic(bytes);
   }
 
-  public boolean peerExists(String id) throws KeeperException {
+  protected boolean peerExists(String id) throws KeeperException {
     return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
   }
+
+  /**
+   * Determine if a ZK path points to a peer node.
+   * @param path path to be checked
+   * @return true if the path points to a peer node, otherwise false
+   */
+  protected boolean isPeerPath(String path) {
+    return path.split("/").length == peersZNode.split("/").length + 1;
+  }
 }
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java?rev=1507498&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java Fri Jul 26 22:44:00 2013
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.List;
+
+/**
+ * This is the interface for a Replication Tracker. A replication tracker provides the facility to
+ * subscribe and track events that reflect a change in replication state. These events are used by
+ * the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues
+ * and queue failover. These events are defined in the ReplicationListener interface. If a class
+ * would like to listen to replication events it must implement the ReplicationListener interface
+ * and register itself with a Replication Tracker.
+ */
+@InterfaceAudience.Private
+public interface ReplicationTracker {
+
+  /**
+   * Register a replication listener to receive replication events.
+   * @param listener
+   */
+  public void registerListener(ReplicationListener listener);
+
+  public void removeListener(ReplicationListener listener);
+
+  /**
+   * Returns a list of other live region servers in the cluster.
+   * @return
+   */
+  public List<String> getListOfRegionServers();
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java?rev=1507498&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java Fri Jul 26 22:44:00 2013
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class is a Zookeeper implementation of the ReplicationTracker interface. This class is
+ * responsible for handling replication events that are defined in the ReplicationListener
+ * interface.
+ */
+public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
+  // All about stopping
+  private final Stoppable stopper;
+  // listeners to be notified
+  private final List<ReplicationListener> listeners =
+      new CopyOnWriteArrayList<ReplicationListener>();
+  // List of all the other region servers in this cluster
+  private final ArrayList<String> otherRegionServers = new ArrayList<String>();
+  private final ReplicationPeers replicationPeers;
+
+  public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
+      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
+      Stoppable stopper) {
+    super(zookeeper, conf, abortable);
+    this.replicationPeers = replicationPeers;
+    this.stopper = stopper;
+    this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
+    this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
+  }
+
+  @Override
+  public void registerListener(ReplicationListener listener) {
+    listeners.add(listener);
+  }
+
+  @Override
+  public void removeListener(ReplicationListener listener) {
+    listeners.remove(listener);
+  }
+
+  /**
+   * Return a snapshot of the current region servers.
+   */
+  @Override
+  public List<String> getListOfRegionServers() {
+    refreshOtherRegionServersList();
+
+    List<String> list = null;
+    synchronized (otherRegionServers) {
+      list = new ArrayList<String>(otherRegionServers);
+    }
+    return list;
+  }
+
+  /**
+   * Watcher used to be notified of the other region server's death in the local cluster. It
+   * initiates the process to transfer the queues if it is able to grab the lock.
+   */
+  public class OtherRegionServerWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a new node has been created.
+     * @param path full path of the new node
+     */
+    public void nodeCreated(String path) {
+      refreshListIfRightPath(path);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      if (stopper.isStopped()) {
+        return;
+      }
+      boolean cont = refreshListIfRightPath(path);
+      if (!cont) {
+        return;
+      }
+      LOG.info(path + " znode expired, triggering replicatorRemoved event");
+      for (ReplicationListener rl : listeners) {
+        rl.regionServerRemoved(getZNodeName(path));
+      }
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      if (stopper.isStopped()) {
+        return;
+      }
+      refreshListIfRightPath(path);
+    }
+
+    private boolean refreshListIfRightPath(String path) {
+      if (!path.startsWith(this.watcher.rsZNode)) {
+        return false;
+      }
+      return refreshOtherRegionServersList();
+    }
+  }
+
+  /**
+   * Watcher used to follow the creation and deletion of peer clusters.
+   */
+  public class PeersWatcher extends ZooKeeperListener {
+
+    /**
+     * Construct a ZooKeeper event listener.
+     */
+    public PeersWatcher(ZooKeeperWatcher watcher) {
+      super(watcher);
+    }
+
+    /**
+     * Called when a node has been deleted
+     * @param path full path of the deleted node
+     */
+    public void nodeDeleted(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      if (isPeerPath(path)) {
+        String id = getZNodeName(path);
+        LOG.info(path + " znode expired, triggering peerRemoved event");
+        for (ReplicationListener rl : listeners) {
+          rl.peerRemoved(id);
+        }
+      }
+    }
+
+    /**
+     * Called when an existing node has a child node added or removed.
+     * @param path full path of the node whose children have changed
+     */
+    public void nodeChildrenChanged(String path) {
+      List<String> peers = refreshPeersList(path);
+      if (peers == null) {
+        return;
+      }
+      LOG.info(path + " znode expired, triggering peerListChanged event");
+      for (ReplicationListener rl : listeners) {
+        rl.peerListChanged(peers);
+      }
+    }
+  }
+
+  /**
+   * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
+   * reset the watches.
+   * @param path path to check against
+   * @return A list of peers' identifiers if the event concerns this watcher, else null.
+   */
+  private List<String> refreshPeersList(String path) {
+    if (!path.startsWith(getPeersZNode())) {
+      return null;
+    }
+    return this.replicationPeers.getAllPeerIds();
+  }
+
+  private String getPeersZNode() {
+    return this.peersZNode;
+  }
+
+  /**
+   * Extracts the znode name of a peer cluster from a ZK path
+   * @param fullPath Path to extract the id from
+   * @return the id or an empty string if path is invalid
+   */
+  private String getZNodeName(String fullPath) {
+    String[] parts = fullPath.split("/");
+    return parts.length > 0 ? parts[parts.length - 1] : "";
+  }
+
+  /**
+   * Reads the list of region servers from ZK and atomically clears our local view of it and
+   * replaces it with the updated list.
+   * @return true if the local list of the other region servers was updated with the ZK data (even
+   *         if it was empty), false if the data was missing in ZK
+   */
+  private boolean refreshOtherRegionServersList() {
+    List<String> newRsList = getRegisteredRegionServers();
+    if (newRsList == null) {
+      return false;
+    } else {
+      synchronized (otherRegionServers) {
+        otherRegionServers.clear();
+        otherRegionServers.addAll(newRsList);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster and set a watch
+   * @return a list of server nanes
+   */
+  private List<String> getRegisteredRegionServers() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of registered region servers", e);
+    }
+    return result;
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Fri Jul 26 22:44:00 2013
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -45,8 +43,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Fri Jul 26 22:44:00 2013
@@ -36,10 +36,10 @@ import org.apache.hadoop.hbase.io.Immuta
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
@@ -118,7 +118,8 @@ public class VerifyReplication {
                 @Override public void abort(String why, Throwable e) {}
                 @Override public boolean isAborted() {return false;}
               });
-              ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
+              ReplicationPeers rp =
+                  ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
               rp.init();
               Configuration peerConf = rp.getPeerConf(peerId);
               if (peerConf == null) {

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri Jul 26 22:44:00 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -123,7 +124,8 @@ public class ReplicationLogCleaner exten
     super.setConf(conf);
     try {
       this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
-      this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
+      this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
+      this.replicationQueues.init();
     } catch (KeeperException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Jul 26 22:44:00 2013
@@ -22,10 +22,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
@@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.zookeeper.KeeperException;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
@@ -66,8 +68,9 @@ public class Replication implements WALA
       LogFactory.getLog(Replication.class);
   private boolean replication;
   private ReplicationSourceManager replicationManager;
-  private ReplicationZookeeper zkHelper;
   private ReplicationQueues replicationQueues;
+  private ReplicationPeers replicationPeers;
+  private ReplicationTracker replicationTracker;
   private Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
@@ -107,23 +110,35 @@ public class Replication implements WALA
         .build());
     if (replication) {
       try {
-        this.zkHelper = new ReplicationZookeeper(server);
         this.replicationQueues =
-            new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
+            ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
         this.replicationQueues.init(this.server.getServerName().toString());
+        this.replicationPeers =
+            ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
+        this.replicationPeers.init();
+        this.replicationTracker =
+            ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
+              this.conf, this.server, this.server);
       } catch (KeeperException ke) {
         throw new IOException("Failed replication handler create", ke);
       }
+      UUID clusterId = null;
+      try {
+        clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
+      } catch (KeeperException ke) {
+        throw new IOException("Could not read cluster id", ke);
+      }
       this.replicationManager =
-          new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir,
-              oldLogDir);
+          new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
+              conf, this.server, fs, logDir, oldLogDir, clusterId);
       this.statsThreadPeriod =
           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
     } else {
       this.replicationManager = null;
-      this.zkHelper = null;
       this.replicationQueues = null;
+      this.replicationPeers = null;
+      this.replicationTracker = null;
     }
   }
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri Jul 26 22:44:00 2013
@@ -56,11 +56,11 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
@@ -86,8 +86,8 @@ public class ReplicationSource extends T
   // container of entries to replicate
   private HLog.Entry[] entriesArray;
   private HConnection conn;
-  // Helper class for zookeeper
-  private ReplicationZookeeper zkHelper;
+  private ReplicationQueues replicationQueues;
+  private ReplicationPeers replicationPeers;
   private Configuration conf;
   // ratio of region servers to chose from a slave cluster
   private float ratio;
@@ -151,12 +151,10 @@ public class ReplicationSource extends T
    * @param peerClusterZnode the name of our znode
    * @throws IOException
    */
-  public void init(final Configuration conf,
-                   final FileSystem fs,
-                   final ReplicationSourceManager manager,
-                   final Stoppable stopper,
-                   final String peerClusterZnode)
-      throws IOException {
+  public void init(final Configuration conf, final FileSystem fs,
+      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
+      final ReplicationPeers replicationPeers, final Stoppable stopper,
+      final String peerClusterZnode, final UUID clusterId) throws IOException {
     this.stopper = stopper;
     this.conf = conf;
     this.replicationQueueSizeCapacity =
@@ -178,7 +176,8 @@ public class ReplicationSource extends T
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
     this.conn = HConnectionManager.getConnection(conf);
-    this.zkHelper = manager.getRepZkWrapper();
+    this.replicationQueues = replicationQueues;
+    this.replicationPeers = replicationPeers;
     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
     this.currentPeers = new ArrayList<ServerName>();
     this.random = new Random();
@@ -188,11 +187,8 @@ public class ReplicationSource extends T
     this.fs = fs;
     this.metrics = new MetricsSource(peerClusterZnode);
     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
-    try {
-      this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
-    } catch (KeeperException ke) {
-      throw new IOException("Could not read cluster id", ke);
-    }
+    this.clusterId = clusterId;
+
     this.peerClusterZnode = peerClusterZnode;
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
@@ -204,7 +200,7 @@ public class ReplicationSource extends T
    */
   private void chooseSinks() {
     this.currentPeers.clear();
-    List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
+    List<ServerName> addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId);
     Set<ServerName> setOfAddr = new HashSet<ServerName>();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
     LOG.debug("Getting " + nbPeers +
@@ -238,7 +234,7 @@ public class ReplicationSource extends T
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
     while (this.peerClusterId == null) {
-      this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
+      this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
       if (this.peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
@@ -254,8 +250,8 @@ public class ReplicationSource extends T
     // normally has a position (unless the RS failed between 2 logs)
     if (this.replicationQueueInfo.isQueueRecovered()) {
       try {
-        this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
-            this.peerClusterZnode, this.queue.peek().getName()));
+        this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
+          this.queue.peek().getName()));
         if (LOG.isTraceEnabled()) {
           LOG.trace("Recovered queue started with log " + this.queue.peek() +
               " at position " + this.repLogReader.getPosition());
@@ -736,7 +732,7 @@ public class ReplicationSource extends T
    * @return true if the peer is enabled, otherwise false
    */
   protected boolean isPeerEnabled() {
-    return this.zkHelper.getPeerEnabled(this.peerId);
+    return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
   }
 
   /**

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Jul 26 22:44:00 2013
@@ -19,12 +19,15 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 
 /**
  * Interface that defines a replication source
@@ -41,13 +44,10 @@ public interface ReplicationSourceInterf
    * @param peerClusterId the id of the peer cluster
    * @throws IOException
    */
-  void init(
-    final Configuration conf,
-    final FileSystem fs,
-    final ReplicationSourceManager manager,
-    final Stoppable stopper,
-    final String peerClusterId
-  ) throws IOException;
+  public void init(final Configuration conf, final FileSystem fs,
+      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
+      final ReplicationPeers replicationPeers, final Stoppable stopper,
+      final String peerClusterZnode, final UUID clusterId) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Jul 26 22:44:00 2013
@@ -29,11 +29,11 @@ import java.util.Random;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,10 +42,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationListener;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -60,18 +60,23 @@ import com.google.common.util.concurrent
  * When a region server dies, this class uses a watcher to get notified and it
  * tries to grab a lock in order to transfer all the queues in a local
  * old source.
+ * 
+ * This class implements the ReplicationListener interface so that it can track changes in
+ * replication state.
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager {
+public class ReplicationSourceManager implements ReplicationListener {
   private static final Log LOG =
       LogFactory.getLog(ReplicationSourceManager.class);
   // List of all the sources that read this RS's logs
   private final List<ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
-  // Helper for zookeeper
-  private final ReplicationZookeeper zkHelper;
   private final ReplicationQueues replicationQueues;
+  private final ReplicationTracker replicationTracker;
+  private final ReplicationPeers replicationPeers;
+  // UUID for this cluster
+  private final UUID clusterId;
   // All about stopping
   private final Stoppable stopper;
   // All logs we are currently tracking
@@ -80,8 +85,6 @@ public class ReplicationSourceManager {
   private final FileSystem fs;
   // The path to the latest log we saw, for new coming sources
   private Path latestPath;
-  // List of all the other region servers in this cluster
-  private final List<String> otherRegionServers = new ArrayList<String>();
   // Path to the hlogs directories
   private final Path logDir;
   // Path to the hlog archive
@@ -104,12 +107,14 @@ public class ReplicationSourceManager {
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
    */
-  public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
-      final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
-      final FileSystem fs, final Path logDir, final Path oldLogDir) {
+  public ReplicationSourceManager(final ReplicationQueues replicationQueues,
+      final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
+      final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
+      final Path oldLogDir, final UUID clusterId) {
     this.sources = new ArrayList<ReplicationSourceInterface>();
-    this.zkHelper = zkHelper;
     this.replicationQueues = replicationQueues;
+    this.replicationPeers = replicationPeers;
+    this.replicationTracker = replicationTracker;
     this.stopper = stopper;
     this.hlogsById = new HashMap<String, SortedSet<String>>();
     this.oldsources = new ArrayList<ReplicationSourceInterface>();
@@ -118,11 +123,9 @@ public class ReplicationSourceManager {
     this.logDir = logDir;
     this.oldLogDir = oldLogDir;
     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
-    this.zkHelper.registerRegionServerListener(
-        new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
-    this.zkHelper.registerRegionServerListener(
-        new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
-    this.zkHelper.listPeersIdsAndWatch();
+    this.clusterId = clusterId;
+    this.replicationTracker.registerListener(this);
+    this.replicationPeers.getAllPeerIds();
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -150,12 +153,12 @@ public class ReplicationSourceManager {
    */
   public void logPositionAndCleanOldLogs(Path log, String id, long position, 
       boolean queueRecovered, boolean holdLogInZK) {
-    String key = log.getName();
-    this.zkHelper.writeReplicationStatus(key, id, position);
+    String fileName = log.getName();
+    this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
      return;
     }
-    cleanOldLogs(key, id, queueRecovered);
+    cleanOldLogs(fileName, id, queueRecovered);
   }
 
   /**
@@ -175,7 +178,7 @@ public class ReplicationSourceManager {
       }
       SortedSet<String> hlogSet = hlogs.headSet(key);
       for (String hlog : hlogSet) {
-        this.zkHelper.removeLogFromList(hlog, id);
+        this.replicationQueues.removeLog(id, hlog);
       }
       hlogSet.clear();
     }
@@ -186,24 +189,21 @@ public class ReplicationSourceManager {
    * old region server hlog queues
    */
   public void init() throws IOException {
-    for (String id : this.zkHelper.getPeerClusters()) {
+    for (String id : this.replicationPeers.getConnectedPeers()) {
       addSource(id);
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
     if (currentReplicators == null || currentReplicators.size() == 0) {
       return;
     }
-    synchronized (otherRegionServers) {
-      refreshOtherRegionServersList();
-      LOG.info("Current list of replicators: " + currentReplicators
-          + " other RSs: " + otherRegionServers);
-    }
+    List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
+    LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+        + otherRegionServers);
+
     // Look if there's anything to process after a restart
     for (String rs : currentReplicators) {
-      synchronized (otherRegionServers) {
-        if (!this.otherRegionServers.contains(rs)) {
-          transferQueues(rs);
-        }
+      if (!otherRegionServers.contains(rs)) {
+        transferQueues(rs);
       }
     }
   }
@@ -215,7 +215,9 @@ public class ReplicationSourceManager {
    * @throws IOException
    */
   public ReplicationSourceInterface addSource(String id) throws IOException {
-    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id);
+    ReplicationSourceInterface src =
+        getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
+          this.replicationPeers, stopper, id, this.clusterId);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -224,7 +226,7 @@ public class ReplicationSourceManager {
         String name = this.latestPath.getName();
         this.hlogsById.get(id).add(name);
         try {
-          this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
+          this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
         } catch (KeeperException ke) {
           String message = "Cannot add log to zk for" +
             " replication when creating a new source";
@@ -239,12 +241,23 @@ public class ReplicationSourceManager {
   }
 
   /**
+   * Delete a complete queue of hlogs associated with a peer cluster
+   * @param peerId Id of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerId, boolean closeConnection) {
+    this.replicationQueues.removeQueue(peerId);
+    if (closeConnection) {
+      this.replicationPeers.disconnectFromPeer(peerId);
+    }
+  }
+
+  /**
    * Terminate the replication on this region server
    */
   public void join() {
     this.executor.shutdown();
     if (this.sources.size() == 0) {
-      this.zkHelper.deleteOwnRSZNode();
+      this.replicationQueues.removeAllQueues();
     }
     for (ReplicationSourceInterface source : this.sources) {
       source.terminate("Region server is closing");
@@ -273,7 +286,7 @@ public class ReplicationSourceManager {
       String name = newLog.getName();
       for (ReplicationSourceInterface source : this.sources) {
         try {
-          this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
+          this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
         } catch (KeeperException ke) {
           throw new IOException("Cannot add log to zk for replication", ke);
         }
@@ -299,14 +312,6 @@ public class ReplicationSourceManager {
   }
 
   /**
-   * Get the ZK help of this manager
-   * @return the helper
-   */
-  public ReplicationZookeeper getRepZkWrapper() {
-    return zkHelper;
-  }
-
-  /**
    * Factory method to create a replication source
    * @param conf the configuration to use
    * @param fs the file system to use
@@ -316,12 +321,10 @@ public class ReplicationSourceManager {
    * @return the created source
    * @throws IOException
    */
-  public ReplicationSourceInterface getReplicationSource(
-      final Configuration conf,
-      final FileSystem fs,
-      final ReplicationSourceManager manager,
-      final Stoppable stopper,
-      final String peerId) throws IOException {
+  public ReplicationSourceInterface getReplicationSource(final Configuration conf,
+      final FileSystem fs, final ReplicationSourceManager manager,
+      final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
+      final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -334,7 +337,7 @@ public class ReplicationSourceManager {
       src = new ReplicationSource();
 
     }
-    src.init(conf, fs, manager, stopper, peerId);
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
     return src;
   }
 
@@ -347,7 +350,9 @@ public class ReplicationSourceManager {
    * @param rsZnode
    */
   private void transferQueues(String rsZnode) {
-    NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
+    NodeFailoverWorker transfer =
+        new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
+            this.clusterId);
     try {
       this.executor.execute(transfer);
     } catch (RejectedExecutionException ex) {
@@ -362,7 +367,7 @@ public class ReplicationSourceManager {
   public void closeRecoveredQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
     this.oldsources.remove(src);
-    this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
+    deleteSource(src.getPeerClusterZnode(), false);
   }
 
   /**
@@ -403,151 +408,34 @@ public class ReplicationSourceManager {
     }
     srcToRemove.terminate(terminateMessage);
     this.sources.remove(srcToRemove);
-    this.zkHelper.deleteSource(id, true);
+    deleteSource(id, true);
   }
 
-  /**
-   * Reads the list of region servers from ZK and atomically clears our
-   * local view of it and replaces it with the updated list.
-   * 
-   * @return true if the local list of the other region servers was updated
-   * with the ZK data (even if it was empty),
-   * false if the data was missing in ZK
-   */
-  private boolean refreshOtherRegionServersList() {
-    List<String> newRsList = zkHelper.getRegisteredRegionServers();
-    if (newRsList == null) {
-      return false;
-    } else {
-      synchronized (otherRegionServers) {
-        otherRegionServers.clear();
-        otherRegionServers.addAll(newRsList);
-      }
-    }
-    return true;
+  @Override
+  public void regionServerRemoved(String regionserver) {
+    transferQueues(regionserver);
   }
 
-  /**
-   * Watcher used to be notified of the other region server's death
-   * in the local cluster. It initiates the process to transfer the queues
-   * if it is able to grab the lock.
-   */
-  public class OtherRegionServerWatcher extends ZooKeeperListener {
-
-    /**
-     * Construct a ZooKeeper event listener.
-     */
-    public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
-      super(watcher);
-    }
-
-    /**
-     * Called when a new node has been created.
-     * @param path full path of the new node
-     */
-    public void nodeCreated(String path) {
-      refreshListIfRightPath(path);
-    }
-
-    /**
-     * Called when a node has been deleted
-     * @param path full path of the deleted node
-     */
-    public void nodeDeleted(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      boolean cont = refreshListIfRightPath(path);
-      if (!cont) {
-        return;
-      }
-      LOG.info(path + " znode expired, trying to lock it");
-      transferQueues(ReplicationZookeeper.getZNodeName(path));
-    }
-
-    /**
-     * Called when an existing node has a child node added or removed.
-     * @param path full path of the node whose children have changed
-     */
-    public void nodeChildrenChanged(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      refreshListIfRightPath(path);
-    }
-
-    private boolean refreshListIfRightPath(String path) {
-      if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
-        return false;
-      }
-      return refreshOtherRegionServersList();
-    }
+  @Override
+  public void peerRemoved(String peerId) {
+    removePeer(peerId);
   }
 
-  /**
-   * Watcher used to follow the creation and deletion of peer clusters.
-   */
-  public class PeersWatcher extends ZooKeeperListener {
-
-    /**
-     * Construct a ZooKeeper event listener.
-     */
-    public PeersWatcher(ZooKeeperWatcher watcher) {
-      super(watcher);
-    }
-
-    /**
-     * Called when a node has been deleted
-     * @param path full path of the deleted node
-     */
-    public void nodeDeleted(String path) {
-      List<String> peers = refreshPeersList(path);
-      if (peers == null) {
-        return;
-      }
-      if (zkHelper.isPeerPath(path)) {
-        String id = ReplicationZookeeper.getZNodeName(path);
-        removePeer(id);
-      }
-    }
-
-    /**
-     * Called when an existing node has a child node added or removed.
-     * @param path full path of the node whose children have changed
-     */
-    public void nodeChildrenChanged(String path) {
-      List<String> peers = refreshPeersList(path);
-      if (peers == null) {
-        return;
-      }
-      for (String id : peers) {
-        try {
-          boolean added = zkHelper.connectToPeer(id);
-          if (added) {
-            addSource(id);
-          }
-        } catch (IOException e) {
-          // TODO manage better than that ?
-          LOG.error("Error while adding a new peer", e);
-        } catch (KeeperException e) {
-          LOG.error("Error while adding a new peer", e);
+  @Override
+  public void peerListChanged(List<String> peerIds) {
+    for (String id : peerIds) {
+      try {
+        boolean added = this.replicationPeers.connectToPeer(id);
+        if (added) {
+          addSource(id);
         }
+      } catch (IOException e) {
+        // TODO manage better than that ?
+        LOG.error("Error while adding a new peer", e);
+      } catch (KeeperException e) {
+        LOG.error("Error while adding a new peer", e);
       }
     }
-
-    /**
-     * Verify if this event is meant for us, and if so then get the latest
-     * peers' list from ZK. Also reset the watches.
-     * @param path path to check against
-     * @return A list of peers' identifiers if the event concerns this watcher,
-     * else null.
-     */
-    private List<String> refreshPeersList(String path) {
-      if (!path.startsWith(zkHelper.getPeersZNode())) {
-        return null;
-      }
-      return zkHelper.listPeersIdsAndWatch();
-    }
   }
 
   /**
@@ -557,14 +445,21 @@ public class ReplicationSourceManager {
   class NodeFailoverWorker extends Thread {
 
     private String rsZnode;
+    private final ReplicationQueues rq;
+    private final ReplicationPeers rp;
+    private final UUID clusterId;
 
     /**
      *
      * @param rsZnode
      */
-    public NodeFailoverWorker(String rsZnode) {
+    public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
+        final ReplicationPeers replicationPeers, final UUID clusterId) {
       super("Failover-for-"+rsZnode);
       this.rsZnode = rsZnode;
+      this.rq = replicationQueues;
+      this.rp = replicationPeers;
+      this.clusterId = clusterId;
     }
 
     @Override
@@ -584,7 +479,7 @@ public class ReplicationSourceManager {
       }
       SortedMap<String, SortedSet<String>> newQueues = null;
 
-      newQueues = zkHelper.claimQueues(rsZnode);
+      newQueues = this.rq.claimQueues(rsZnode);
 
       // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
@@ -597,8 +492,9 @@ public class ReplicationSourceManager {
         String peerId = entry.getKey();
         try {
           ReplicationSourceInterface src =
-              getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId);
-          if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
+              getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
+                stopper, peerId, this.clusterId);
+          if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;
           }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java Fri Jul 26 22:44:00 2013
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEqu
 
 import java.io.IOException;
 import java.net.URLEncoder;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -30,7 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
@@ -68,8 +68,9 @@ public class TestLogsCleaner {
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     Replication.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
-    ReplicationZookeeper zkHelper = new ReplicationZookeeper(server);
-
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+    repQueues.init(server.getServerName().toString());
     Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
         HConstants.HREGION_OLDLOGDIR_NAME);
     String fakeMachineName =
@@ -98,7 +99,7 @@ public class TestLogsCleaner {
       // (TimeToLiveLogCleaner) but would be rejected by the second
       // (ReplicationLogCleaner)
       if (i % (30/3) == 1) {
-        zkHelper.addLogToList(fileName.getName(), fakeMachineName);
+        repQueues.addLog(fakeMachineName, fileName.getName());
         System.out.println("Replication log file: " + fileName);
       }
     }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Fri Jul 26 22:44:00 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +40,9 @@ public class ReplicationSourceDummy impl
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      Stoppable stopper, String peerClusterId) throws IOException {
+      ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
+      UUID clusterId) throws IOException {
+
     this.manager = manager;
     this.peerClusterId = peerClusterId;
   }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Fri Jul 26 22:44:00 2013
@@ -69,6 +69,7 @@ public abstract class TestReplicationSta
 
   @Test
   public void testReplicationQueuesClient() throws KeeperException {
+    rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());
     assertNull(rqc.getLogsInQueue(server1, "qId1"));

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java Fri Jul 26 22:44:00 2013
@@ -36,7 +36,10 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Test;
+
 import static org.junit.Assert.*;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -50,6 +53,7 @@ public class TestReplicationStateZKImpl 
   private static HBaseTestingUtility utility;
   private static ZooKeeperWatcher zkw;
   private static String replicationZNode;
+  private ReplicationQueuesZKImpl rqZK;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -65,7 +69,7 @@ public class TestReplicationStateZKImpl 
 
   private static String initPeerClusterState(String baseZKNode)
       throws IOException, KeeperException {
-    // Set up state nodes of peer clusters
+    // Add a dummy region server and set up the cluster id
     Configuration testConf = new Configuration(conf);
     testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
     ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
@@ -82,16 +86,13 @@ public class TestReplicationStateZKImpl 
     DummyServer ds1 = new DummyServer(server1);
     DummyServer ds2 = new DummyServer(server2);
     DummyServer ds3 = new DummyServer(server3);
-    try {
-      rq1 = new ReplicationQueuesZKImpl(zkw, conf, ds1);
-      rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
-      rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
-      rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
-      rp = new ReplicationPeersZKImpl(zkw, conf, zkw);
-      OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
-    } catch (KeeperException e) {
-      fail("Exception thrown: " + e);
-    }
+    rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1);
+    rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2);
+    rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
+    rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
+    rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+    OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
+    rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
   }
 
   @After
@@ -104,6 +105,23 @@ public class TestReplicationStateZKImpl 
     utility.shutdownMiniZKCluster();
   }
 
+  @Test
+  public void testIsPeerPath_PathToParentOfPeerNode() {
+    assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
+  }
+
+  @Test
+  public void testIsPeerPath_PathToChildOfPeerNode() {
+    String peerChild = ZKUtil.joinZNode(ZKUtil.joinZNode(rqZK.peersZNode, "1"), "child");
+    assertFalse(rqZK.isPeerPath(peerChild));
+  }
+
+  @Test
+  public void testIsPeerPath_ActualPeerPath() {
+    String peerPath = ZKUtil.joinZNode(rqZK.peersZNode, "1");
+    assertTrue(rqZK.isPeerPath(peerPath));
+  }
+
   static class DummyServer implements Server {
     private String serverName;
     private boolean isAborted = false;

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java?rev=1507498&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java Fri Jul 26 22:44:00 2013
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
+ * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
+ * of the rsZNode. All other znode creation/initialization is handled by the replication state
+ * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
+ * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
+ */
+@Category(MediumTests.class)
+public class TestReplicationTrackerZKImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
+
+  private static Configuration conf;
+  private static HBaseTestingUtility utility;
+
+  // Each one of the below variables are reinitialized before every test case
+  private ZooKeeperWatcher zkw;
+  private ReplicationPeers rp;
+  private ReplicationTracker rt;
+  private AtomicInteger rsRemovedCount;
+  private String rsRemovedData;
+  private AtomicInteger plChangedCount;
+  private List<String> plChangedData;
+  private AtomicInteger peerRemovedCount;
+  private String peerRemovedData;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    utility = new HBaseTestingUtility();
+    utility.startMiniZKCluster();
+    conf = utility.getConfiguration();
+    ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    ZKUtil.createWithParents(zk, zk.rsZNode);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
+    try {
+      ZKClusterId.setClusterId(zkw, new ClusterId());
+      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+      rp.init();
+      rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
+    } catch (Exception e) {
+      fail("Exception during test setup: " + e);
+    }
+    rsRemovedCount = new AtomicInteger(0);
+    rsRemovedData = "";
+    plChangedCount = new AtomicInteger(0);
+    plChangedData = new ArrayList<String>();
+    peerRemovedCount = new AtomicInteger(0);
+    peerRemovedData = "";
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility.shutdownMiniZKCluster();
+  }
+
+  @Test
+  public void testGetListOfRegionServers() throws Exception {
+    // 0 region servers
+    assertEquals(0, rt.getListOfRegionServers().size());
+
+    // 1 region server
+    ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
+    assertEquals(1, rt.getListOfRegionServers().size());
+
+    // 2 region servers
+    ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
+    assertEquals(2, rt.getListOfRegionServers().size());
+
+    // 1 region server
+    ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
+    assertEquals(1, rt.getListOfRegionServers().size());
+
+    // 0 region server
+    ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
+    assertEquals(0, rt.getListOfRegionServers().size());
+  }
+
+  @Test(timeout = 30000)
+  public void testRegionServerRemovedEvent() throws Exception {
+    ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
+      HConstants.EMPTY_BYTE_ARRAY);
+    rt.registerListener(new DummyReplicationListener());
+    // delete one
+    ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
+    // wait for event
+    while (rsRemovedCount.get() < 1) {
+      Thread.sleep(5);
+    }
+    assertEquals("hostname2.example.org:1234", rsRemovedData);
+  }
+
+  @Test(timeout = 30000)
+  public void testPeerRemovedEvent() throws Exception {
+    rp.addPeer("5", utility.getClusterKey());
+    rt.registerListener(new DummyReplicationListener());
+    rp.removePeer("5");
+    // wait for event
+    while (peerRemovedCount.get() < 1) {
+      Thread.sleep(5);
+    }
+    assertEquals("5", peerRemovedData);
+  }
+
+  @Test(timeout = 30000)
+  public void testPeerListChangedEvent() throws Exception {
+    // add a peer
+    rp.addPeer("5", utility.getClusterKey());
+    zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
+    rt.registerListener(new DummyReplicationListener());
+    rp.disablePeer("5");
+    ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
+    // wait for event
+    int tmp = plChangedCount.get();
+    while (plChangedCount.get() <= tmp) {
+      Thread.sleep(5);
+    }
+    assertEquals(1, plChangedData.size());
+    assertTrue(plChangedData.contains("5"));
+
+    // clean up
+    ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
+  }
+
+  private class DummyReplicationListener implements ReplicationListener {
+
+    @Override
+    public void regionServerRemoved(String regionServer) {
+      rsRemovedData = regionServer;
+      rsRemovedCount.getAndIncrement();
+      LOG.debug("Received regionServerRemoved event: " + regionServer);
+    }
+
+    @Override
+    public void peerRemoved(String peerId) {
+      peerRemovedData = peerId;
+      peerRemovedCount.getAndIncrement();
+      LOG.debug("Received peerRemoved event: " + peerId);
+    }
+
+    @Override
+    public void peerListChanged(List<String> peerIds) {
+      plChangedData.clear();
+      plChangedData.addAll(peerIds);
+      plChangedCount.getAndIncrement();
+      LOG.debug("Received peerListChanged event");
+    }
+  }
+
+  private class DummyServer implements Server {
+    private String serverName;
+    private boolean isAborted = false;
+    private boolean isStopped = false;
+
+    public DummyServer(String serverName) {
+      this.serverName = serverName;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return new ServerName(this.serverName);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      LOG.info("Aborting " + serverName);
+      this.isAborted = true;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return this.isAborted;
+    }
+
+    @Override
+    public void stop(String why) {
+      this.isStopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+      return this.isStopped;
+    }
+  }
+}
+

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1507498&r1=1507497&r2=1507498&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Fri Jul 26 22:44:00 2013
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -50,11 +51,13 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
@@ -128,6 +131,8 @@ public class TestReplicationSourceManage
     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
     ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
 
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+
     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
     manager = replication.getReplicationManager();
     fs = FileSystem.get(conf);
@@ -226,12 +231,15 @@ public class TestReplicationSourceManage
     LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("hostname0.example.org");
-    ReplicationZookeeper rz = new ReplicationZookeeper(server);
+    ReplicationQueues rq =
+        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
+          server);
+    rq.init(server.getServerName().toString());
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      rz.addLogToList(file, "1");
+      rq.addLog("1", file);
     }
     // create 3 DummyServers
     Server s1 = new DummyServer("dummyserver1.example.org");
@@ -266,12 +274,14 @@ public class TestReplicationSourceManage
     LOG.debug("testNodeFailoverDeadServerParsing");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    ReplicationZookeeper rz = new ReplicationZookeeper(server);
+    ReplicationQueues repQueues =
+        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+    repQueues.init(server.getServerName().toString());
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      rz.addLogToList(file, "1");
+      repQueues.addLog("1", file);
     }
 
     // create 3 DummyServers
@@ -280,13 +290,19 @@ public class TestReplicationSourceManage
     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
 
     // simulate three servers fail sequentially
-    ReplicationZookeeper rz1 = new ReplicationZookeeper(s1);
+    ReplicationQueues rq1 =
+        ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    rq1.init(s1.getServerName().toString());
     SortedMap<String, SortedSet<String>> testMap =
-        rz1.claimQueues(server.getServerName().getServerName());
-    ReplicationZookeeper rz2 = new ReplicationZookeeper(s2);
-    testMap = rz2.claimQueues(s1.getServerName().getServerName());
-    ReplicationZookeeper rz3 = new ReplicationZookeeper(s3);
-    testMap = rz3.claimQueues(s2.getServerName().getServerName());
+        rq1.claimQueues(server.getServerName().getServerName());
+    ReplicationQueues rq2 =
+        ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
+    rq2.init(s2.getServerName().toString());
+    testMap = rq2.claimQueues(s1.getServerName().getServerName());
+    ReplicationQueues rq3 =
+        ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
+    rq3.init(s3.getServerName().toString());
+    testMap = rq3.claimQueues(s2.getServerName().getServerName());
 
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
     List<String> result = replicationQueueInfo.getDeadRegionServers();
@@ -304,18 +320,21 @@ public class TestReplicationSourceManage
     private SortedMap<String, SortedSet<String>> logZnodesMap;
     Server server;
     private String deadRsZnode;
-    ReplicationZookeeper rz;
+    ReplicationQueues rq;
 
     public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
       this.deadRsZnode = znode;
       this.server = s;
-      rz = new ReplicationZookeeper(server);
+      this.rq =
+          ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
+            server);
+      this.rq.init(this.server.getServerName().toString());
     }
 
     @Override
     public void run() {
       try {
-        logZnodesMap = rz.claimQueues(deadRsZnode);
+        logZnodesMap = rq.claimQueues(deadRsZnode);
         server.abort("Done with testing", null);
       } catch (Exception e) {
         LOG.error("Got exception while running NodeFailoverWorker", e);



Mime
View raw message