From commits-return-67142-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Mon Feb 5 05:09:07 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 1B43D180677 for ; Mon, 5 Feb 2018 05:09:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0B022160C60; Mon, 5 Feb 2018 04:09:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D3522160C5A for ; Mon, 5 Feb 2018 05:09:05 +0100 (CET) Received: (qmail 19011 invoked by uid 500); 5 Feb 2018 04:09:04 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 15094 invoked by uid 99); 5 Feb 2018 04:09:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Feb 2018 04:09:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91023F4DD0; Mon, 5 Feb 2018 04:09:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Mon, 05 Feb 2018 04:09:24 -0000 Message-Id: In-Reply-To: <4232331453054485a2f3d7ec576cb6f2@git.apache.org> References: <4232331453054485a2f3d7ec576cb6f2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/42] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 222ff6c..27e9d40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -167,7 +167,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); - this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -271,8 +270,8 @@ public class ReplicationSourceManager implements ReplicationListener { } List otherRegionServers = replicationTracker.getListOfRegionServers().stream() .map(ServerName::valueOf).collect(Collectors.toList()); - LOG.info( - "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); + LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + + otherRegionServers); // Look if there's anything to process after a restart for (ServerName rs : currentReplicators) { @@ -289,7 +288,7 @@ public class ReplicationSourceManager implements ReplicationListener { * The returned future is for adoptAbandonedQueues task. */ Future init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getConnectedPeerIds()) { + for (String id : this.replicationPeers.getAllPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case @@ -308,8 +307,8 @@ public class ReplicationSourceManager implements ReplicationListener { */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); - ReplicationPeer peer = replicationPeers.getConnectedPeer(id); + ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id); + ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); @@ -355,7 +354,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void deleteSource(String peerId, boolean closeConnection) { abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); if (closeConnection) { - this.replicationPeers.peerDisconnected(peerId); + this.replicationPeers.removePeer(peerId); } } @@ -448,12 +447,12 @@ public class ReplicationSourceManager implements ReplicationListener { // update replication queues on ZK // synchronize on replicationPeers to avoid adding source for the to-be-removed peer synchronized (replicationPeers) { - for (String id : replicationPeers.getConnectedPeerIds()) { + for (String id : replicationPeers.getAllPeerIds()) { try { this.queueStorage.addWAL(server.getServerName(), id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" + - " when creating a new source, queueId=" + id + ", filename=" + logName, e); + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); } } } @@ -598,7 +597,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void addPeer(String id) throws ReplicationException, IOException { LOG.info("Trying to add peer, peerId: " + id); - boolean added = this.replicationPeers.peerConnected(id); + boolean added = this.replicationPeers.addPeer(id); if (added) { LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); @@ -734,16 +733,21 @@ public class ReplicationSourceManager implements ReplicationListener { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); String actualPeerId = replicationQueueInfo.getPeerId(); - ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId); + + ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); + if (peer == null) { + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + + ", peer is null"); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); + continue; + } + ReplicationPeerConfig peerConfig = null; try { - peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); - } catch (ReplicationException ex) { - LOG.warn("Received exception while getting replication peer config, skipping replay" - + ex); - } - if (peer == null || peerConfig == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS); + peerConfig = replicationPeers.getPeerConfig(actualPeerId); + } catch (Exception e) { + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + + ", failed to read peer config", e); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); continue; } @@ -772,7 +776,7 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { - if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) { + if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); closeRecoveredQueue(src); continue; http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index e1eb822..08dd428 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -94,7 +94,7 @@ public class TestReplicationHFileCleaner { server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); HMaster.decorateMasterConfiguration(conf); - rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); + rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); fs = FileSystem.get(conf); @@ -108,7 +108,8 @@ public class TestReplicationHFileCleaner { @Before public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); - rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); + rp.getPeerStorage().addPeer(peerId, + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); rq.addPeerToHFileRefs(peerId); } @@ -119,7 +120,7 @@ public class TestReplicationHFileCleaner { } catch (IOException e) { LOG.warn("Failed to delete files recursively from path " + root); } - rp.unregisterPeer(peerId); + rp.getPeerStorage().removePeer(peerId); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 7b2e73f..53b8ba3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index de7e768..9f0c670 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -21,8 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -77,10 +76,6 @@ public class TestReplicationTrackerZKImpl { private ReplicationTracker rt; private AtomicInteger rsRemovedCount; private String rsRemovedData; - private AtomicInteger plChangedCount; - private List plChangedData; - private AtomicInteger peerRemovedCount; - private String peerRemovedData; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -97,7 +92,7 @@ public class TestReplicationTrackerZKImpl { String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"); try { ZKClusterId.setClusterId(zkw, new ClusterId()); - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); rp.init(); rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); } catch (Exception e) { @@ -105,10 +100,6 @@ public class TestReplicationTrackerZKImpl { } rsRemovedCount = new AtomicInteger(0); rsRemovedData = ""; - plChangedCount = new AtomicInteger(0); - plChangedData = new ArrayList<>(); - peerRemovedCount = new AtomicInteger(0); - peerRemovedData = ""; } @AfterClass @@ -161,25 +152,22 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerNameControl() throws Exception { int exists = 0; - int hyphen = 0; - rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.getPeerStorage().addPeer("6", + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); - try{ - rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - }catch(IllegalArgumentException e){ - exists++; + try { + rp.getPeerStorage().addPeer("6", + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + } catch (ReplicationException e) { + if (e.getCause() instanceof KeeperException.NodeExistsException) { + exists++; + } } - try{ - rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - }catch(IllegalArgumentException e){ - hyphen++; - } assertEquals(1, exists); - assertEquals(1, hyphen); // clean up - rp.unregisterPeer("6"); + rp.getPeerStorage().removePeer("6"); } private class DummyReplicationListener implements ReplicationListener { http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 1d3b6ea..77b2fb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -389,7 +389,7 @@ public abstract class TestReplicationSourceManager { } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationPeers rp1 = - ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); @@ -585,7 +585,7 @@ public abstract class TestReplicationSourceManager { private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - rp.registerPeer(peerId, peerConfig); + rp.getPeerStorage().addPeer(peerId, peerConfig, true); try { manager.addPeer(peerId); } catch (Exception e) { @@ -612,7 +612,7 @@ public abstract class TestReplicationSourceManager { } return true; } else { - return (rp.getConnectedPeer(peerId) != null); + return (rp.getPeer(peerId) != null); } }); } @@ -624,8 +624,8 @@ public abstract class TestReplicationSourceManager { */ private void removePeerAndWait(final String peerId) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - if (rp.getAllPeerIds().contains(peerId)) { - rp.unregisterPeer(peerId); + if (rp.getPeerStorage().listPeerIds().contains(peerId)) { + rp.getPeerStorage().removePeer(peerId); try { manager.removePeer(peerId); } catch (Exception e) { @@ -635,10 +635,9 @@ public abstract class TestReplicationSourceManager { Waiter.waitFor(conf, 20000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { - List peers = rp.getAllPeerIds(); - return (!manager.getAllQueues().contains(peerId)) && - (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) && - manager.getSource(peerId) == null; + Collection peers = rp.getPeerStorage().listPeerIds(); + return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) + && (!peers.contains(peerId)) && manager.getSource(peerId) == null; } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/4b458ac4/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java index fc31c37..b755c32 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java @@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility { /** * Gets a ZKWatcher. */ - public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) - throws ZooKeeperConnectionException, IOException { + public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException { ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() { boolean aborted = false;