Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10EB1200B0F for ; Fri, 17 Jun 2016 22:06:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0C128160A61; Fri, 17 Jun 2016 20:06:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ACE37160A4C for ; Fri, 17 Jun 2016 22:06:57 +0200 (CEST) Received: (qmail 25650 invoked by uid 500); 17 Jun 2016 20:06:56 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 25641 invoked by uid 99); 17 Jun 2016 20:06:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2016 20:06:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B726DE049D; Fri, 17 Jun 2016 20:06:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eclark@apache.org To: commits@hbase.apache.org Message-Id: <83ccebee06084cf0b4bf1edc7fdf4c77@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-16018 Refactored the ReplicationPeers interface to clear up what some methods do and move away from a ZooKeeper-specific implementation. Date: Fri, 17 Jun 2016 20:06:56 +0000 (UTC) archived-at: Fri, 17 Jun 2016 20:06:59 -0000 Repository: hbase Updated Branches: refs/heads/master 5147fb12a -> 61ff6ced5 HBASE-16018 Refactored the ReplicationPeers interface to clear up what some methods do and move away from a ZooKeeper-specific implementation. Also added some documentation for undocumented methods. Signed-off-by: Elliott Clark Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61ff6ced Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61ff6ced Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61ff6ced Branch: refs/heads/master Commit: 61ff6ced5bea3586129fb0844bbf64b122775b42 Parents: 5147fb1 Author: Joseph Hwang Authored: Tue Jun 14 08:58:49 2016 -0700 Committer: Elliott Clark Committed: Fri Jun 17 13:04:21 2016 -0700 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 8 ++-- .../hbase/replication/ReplicationPeers.java | 41 +++++++++++++++----- .../replication/ReplicationPeersZKImpl.java | 17 ++++---- .../regionserver/ReplicationSource.java | 2 +- .../regionserver/ReplicationSourceManager.java | 14 +++---- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../replication/TestReplicationStateBasic.java | 40 +++++++++---------- .../TestReplicationStateHBaseImpl.java | 6 +-- .../TestReplicationTrackerZKImpl.java | 18 ++++----- 9 files changed, 85 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index e0985bd..b04d317 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -183,7 +183,7 @@ public class ReplicationAdmin implements Closeable { if (tableCfs != null) { peerConfig.setTableCFsMap(tableCfs); } - this.replicationPeers.addPeer(id, peerConfig); + this.replicationPeers.registerPeer(id, peerConfig); } /** @@ -192,7 +192,7 @@ public class ReplicationAdmin implements Closeable { * @param peerConfig configuration for the replication slave cluster */ public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { - this.replicationPeers.addPeer(id, peerConfig); + this.replicationPeers.registerPeer(id, peerConfig); } /** @@ -212,7 +212,7 @@ public class ReplicationAdmin implements Closeable { * @param id a short name that identifies the cluster */ public void removePeer(String id) throws ReplicationException { - this.replicationPeers.removePeer(id); + this.replicationPeers.unregisterPeer(id); } /** @@ -556,7 +556,7 @@ public class ReplicationAdmin implements Closeable { @VisibleForTesting public void peerAdded(String id) throws ReplicationException { - this.replicationPeers.peerAdded(id); + this.replicationPeers.peerConnected(id); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 9f70d95..2a7963a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -51,18 +51,30 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster */ - void addPeer(String peerId, ReplicationPeerConfig peerConfig) + void registerPeer(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException; /** * Removes a remote slave cluster and stops the replication to it. * @param peerId a short that identifies the cluster */ - void removePeer(String peerId) throws ReplicationException; + void unregisterPeer(String peerId) throws ReplicationException; - boolean peerAdded(String peerId) throws ReplicationException; + /** + * Method called after a peer has been connected. It will create a ReplicationPeer to track the + * newly connected cluster. + * @param peerId a short that identifies the cluster + * @return whether a ReplicationPeer was successfully created + * @throws ReplicationException + */ + boolean peerConnected(String peerId) throws ReplicationException; - void peerRemoved(String peerId); + /** + * Method called after a peer has been disconnected. It will remove the ReplicationPeer that + * tracked the disconnected cluster. + * @param peerId a short that identifies the cluster + */ + void peerDisconnected(String peerId); /** * Restart the replication to the specified remote slave cluster. @@ -77,14 +89,14 @@ public interface ReplicationPeers { void disablePeer(String peerId) throws ReplicationException; /** - * Get the table and column-family list string of the peer from ZK. + * Get the table and column-family list string of the peer from the underlying storage. * @param peerId a short that identifies the cluster */ public Map> getPeerTableCFsConfig(String peerId) throws ReplicationException; /** - * Set the table and column-family list string of the peer to ZK. + * Set the table and column-family list string of the peer to the underlying storage. * @param peerId a short that identifies the cluster * @param tableCFs the table and column-family list which will be replicated for this peer */ @@ -93,17 +105,20 @@ public interface ReplicationPeers { throws ReplicationException; /** - * Returns the ReplicationPeer + * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will + * continue to track changes to the Peer's state and config. This method returns null if no + * peer has been connected with the given peerId. * @param peerId id for the peer * @return ReplicationPeer object */ - ReplicationPeer getPeer(String peerId); + ReplicationPeer getConnectedPeer(String peerId); /** - * Returns the set of peerIds defined + * Returns the set of peerIds of the clusters that have been connected and have an underlying + * ReplicationPeer. * @return a Set of Strings for peerIds */ - public Set getPeerIds(); + public Set getConnectedPeerIds(); /** * Get the replication status for the specified connected remote slave cluster. @@ -152,5 +167,11 @@ public interface ReplicationPeers { */ Pair getPeerConf(String peerId) throws ReplicationException; + /** + * Update the peerConfig for the a given peer cluster + * @param id a short that identifies the cluster + * @param peerConfig new config for the peer cluster + * @throws ReplicationException + */ void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 5af97c2..54c2dac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; /** * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The @@ -105,7 +104,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void addPeer(String id, ReplicationPeerConfig peerConfig) + public void registerPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { try { if (peerExists(id)) { @@ -148,7 +147,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void removePeer(String id) throws ReplicationException { + public void unregisterPeer(String id) throws ReplicationException { try { if (!peerExists(id)) { throw new IllegalArgumentException("Cannot remove peer with id=" + id @@ -219,7 +218,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re public boolean getStatusOfPeer(String id) { ReplicationPeer replicationPeer = this.peerClusters.get(id); if (replicationPeer == null) { - throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); + throw new IllegalArgumentException("Peer with id= " + id + " is not cached"); } return replicationPeer.getPeerState() == PeerState.ENABLED; } @@ -270,12 +269,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public ReplicationPeer getPeer(String peerId) { + public ReplicationPeer getConnectedPeer(String peerId) { return peerClusters.get(peerId); } @Override - public Set getPeerIds() { + public Set getConnectedPeerIds() { return peerClusters.keySet(); // this is not thread-safe } @@ -342,7 +341,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) throws ReplicationException { - ReplicationPeer peer = getPeer(id); + ReplicationPeer peer = getConnectedPeer(id); if (peer == null){ throw new ReplicationException("Could not find peer Id " + id); } @@ -411,12 +410,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public boolean peerAdded(String peerId) throws ReplicationException { + public boolean peerConnected(String peerId) throws ReplicationException { return createAndAddPeer(peerId); } @Override - public void peerRemoved(String peerId) { + public void peerDisconnected(String peerId) { ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { ((ConcurrentMap) peerClusters).remove(peerId, rp); http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 84e0787..2f3b2a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -237,7 +237,7 @@ public class ReplicationSource extends Thread // A peerId will not have "-" in its name, see HBASE-11394 peerId = peerClusterZnode.split("-")[0]; } - Map> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs(); + Map> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs(); if (tableCFMap != null) { List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 433f9c5..7532c64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -230,7 +230,7 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getPeerIds()) { + for (String id : this.replicationPeers.getConnectedPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case @@ -264,7 +264,7 @@ public class ReplicationSourceManager implements ReplicationListener { protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); - ReplicationPeer peer = replicationPeers.getPeer(id); + ReplicationPeer peer = replicationPeers.getConnectedPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); @@ -306,7 +306,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void deleteSource(String peerId, boolean closeConnection) { this.replicationQueues.removeQueue(peerId); if (closeConnection) { - this.replicationPeers.peerRemoved(peerId); + this.replicationPeers.peerDisconnected(peerId); } } @@ -381,7 +381,7 @@ public class ReplicationSourceManager implements ReplicationListener { // update replication queues on ZK synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for // the to-be-removed peer - for (String id : replicationPeers.getPeerIds()) { + for (String id : replicationPeers.getConnectedPeerIds()) { try { this.replicationQueues.addLog(id, logName); } catch (ReplicationException e) { @@ -586,7 +586,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void peerListChanged(List peerIds) { for (String id : peerIds) { try { - boolean added = this.replicationPeers.peerAdded(id); + boolean added = this.replicationPeers.peerConnected(id); if (added) { addSource(id); if (replicationForBulkLoadDataEnabled) { @@ -659,7 +659,7 @@ public class ReplicationSourceManager implements ReplicationListener { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); String actualPeerId = replicationQueueInfo.getPeerId(); - ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); + ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId); ReplicationPeerConfig peerConfig = null; try { peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); @@ -688,7 +688,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, server, peerId, this.clusterId, peerConfig, peer); - if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { + if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; } http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index e5f1e69..fc3e516 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -110,7 +110,7 @@ public class TestReplicationHFileCleaner { @Before public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); - rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); + rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); rq.addPeerToHFileRefs(peerId); } @@ -121,7 +121,7 @@ public class TestReplicationHFileCleaner { } catch (IOException e) { LOG.warn("Failed to delete files recursively from path " + root); } - rp.removePeer(peerId); + rp.unregisterPeer(peerId); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index b4451f2..933c621 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -165,7 +165,7 @@ public abstract class TestReplicationStateBasic { rp.init(); try { - rp.addPeer(ID_ONE, + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); fail("Should throw an IllegalArgumentException because " + "zookeeper.znode.parent is missing leading '/'."); @@ -174,7 +174,7 @@ public abstract class TestReplicationStateBasic { } try { - rp.addPeer(ID_ONE, + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); } catch (IllegalArgumentException e) { @@ -182,7 +182,7 @@ public abstract class TestReplicationStateBasic { } try { - rp.addPeer(ID_ONE, + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); fail("Should throw an IllegalArgumentException because " + "hbase.zookeeper.property.clientPort is missing."); @@ -203,7 +203,7 @@ public abstract class TestReplicationStateBasic { files1.add("file_3"); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); @@ -216,7 +216,7 @@ public abstract class TestReplicationStateBasic { files2.add(removedString); rq1.removeHFileRefs(ID_ONE, files2); assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); - rp.removePeer(ID_ONE); + rp.unregisterPeer(ID_ONE); } @Test @@ -225,9 +225,9 @@ public abstract class TestReplicationStateBasic { rqc.init(); rp.init(); - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rq1.addPeerToHFileRefs(ID_ONE); - rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); rq1.addPeerToHFileRefs(ID_TWO); List files1 = new ArrayList(3); @@ -240,13 +240,13 @@ public abstract class TestReplicationStateBasic { assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); - rp.removePeer(ID_ONE); + rp.unregisterPeer(ID_ONE); rq1.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); - rp.removePeer(ID_TWO); + rp.unregisterPeer(ID_TWO); rq1.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_TWO)); @@ -258,7 +258,7 @@ public abstract class TestReplicationStateBasic { // Test methods with non-existent peer ids try { - rp.removePeer("bogus"); + rp.unregisterPeer("bogus"); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); } catch (IllegalArgumentException e) { } @@ -277,16 +277,16 @@ public abstract class TestReplicationStateBasic { fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); } catch (IllegalArgumentException e) { } - assertFalse(rp.peerAdded("bogus")); - rp.peerRemoved("bogus"); + assertFalse(rp.peerConnected("bogus")); + rp.peerDisconnected("bogus"); assertNull(rp.getPeerConf("bogus")); assertNumberOfPeers(0); // Add some peers - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); assertNumberOfPeers(1); - rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); assertNumberOfPeers(2); // Test methods with a peer that is added but not connected @@ -296,13 +296,13 @@ public abstract class TestReplicationStateBasic { } catch (IllegalArgumentException e) { } assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); - rp.removePeer(ID_ONE); - rp.peerRemoved(ID_ONE); + rp.unregisterPeer(ID_ONE); + rp.peerDisconnected(ID_ONE); assertNumberOfPeers(1); // Add one peer - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rp.peerAdded(ID_ONE); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.peerConnected(ID_ONE); assertNumberOfPeers(2); assertTrue(rp.getStatusOfPeer(ID_ONE)); rp.disablePeer(ID_ONE); @@ -311,7 +311,7 @@ public abstract class TestReplicationStateBasic { assertConnectedPeerStatus(true, ID_ONE); // Disconnect peer - rp.peerRemoved(ID_ONE); + rp.peerDisconnected(ID_ONE); assertNumberOfPeers(2); try { rp.getStatusOfPeer(ID_ONE); @@ -361,7 +361,7 @@ public abstract class TestReplicationStateBasic { rq3.addLog("qId" + i, "filename" + j); } //Add peers for the corresponding queues so they are not orphans - rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); + rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index 346ff37..3a9a5a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -204,9 +204,9 @@ public class TestReplicationStateHBaseImpl { @Test public void TestMultipleReplicationQueuesHBaseImpl () { try { - rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); - rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); - rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); + rp.registerPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); + rp.registerPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); + rp.registerPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); } catch (ReplicationException e) { fail("Failed to add peers to ReplicationPeers"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index 671b7fd..3b19660 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -147,9 +147,9 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerRemovedEvent() throws Exception { - rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); rt.registerListener(new DummyReplicationListener()); - rp.removePeer("5"); + rp.unregisterPeer("5"); // wait for event while (peerRemovedCount.get() < 1) { Thread.sleep(5); @@ -160,7 +160,7 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerListChangedEvent() throws Exception { // add a peer - rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); rt.registerListener(new DummyReplicationListener()); rp.disablePeer("5"); @@ -177,23 +177,23 @@ public class TestReplicationTrackerZKImpl { // clean up //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5"); - rp.removePeer("5"); + rp.unregisterPeer("5"); } @Test(timeout = 30000) public void testPeerNameControl() throws Exception { int exists = 0; int hyphen = 0; - rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); try{ - rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); }catch(IllegalArgumentException e){ exists++; } try{ - rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); }catch(IllegalArgumentException e){ hyphen++; } @@ -201,7 +201,7 @@ public class TestReplicationTrackerZKImpl { assertEquals(1, hyphen); // clean up - rp.removePeer("6"); + rp.unregisterPeer("6"); } private class DummyReplicationListener implements ReplicationListener { @@ -217,7 +217,7 @@ public class TestReplicationTrackerZKImpl { public void peerRemoved(String peerId) { peerRemovedData = peerId; peerRemovedCount.getAndIncrement(); - LOG.debug("Received peerRemoved event: " + peerId); + LOG.debug("Received peerDisconnected event: " + peerId); } @Override