From commits-return-65966-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Jan 23 11:24:49 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id D6ABC180799 for ; Tue, 23 Jan 2018 11:24:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C6AA4160C4D; Tue, 23 Jan 2018 10:24:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 37B30160C51 for ; Tue, 23 Jan 2018 11:24:47 +0100 (CET) Received: (qmail 67861 invoked by uid 500); 23 Jan 2018 10:24:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66914 invoked by uid 99); 23 Jan 2018 10:24:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jan 2018 10:24:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AEC81F4DF8; Tue, 23 Jan 2018 10:24:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Tue, 23 Jan 2018 10:25:19 -0000 Message-Id: In-Reply-To: <70a6e3b73d3e458b9cd86de44b81fbec@git.apache.org> References: <70a6e3b73d3e458b9cd86de44b81fbec@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] hbase git commit: HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished Signed-off-by: zhangduo Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aab18b45 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aab18b45 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aab18b45 Branch: refs/heads/HBASE-19397-branch-2 Commit: aab18b45ba20cea51df861914c12b87755008f7d Parents: 6bd7fd8 Author: Guanghao Zhang Authored: Thu Jan 4 16:58:01 2018 +0800 Committer: zhangduo Committed: Tue Jan 23 18:19:45 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfig.java | 1 - .../hbase/replication/ReplicationPeerImpl.java | 4 +- .../hbase/replication/ReplicationQueueInfo.java | 23 +- .../hbase/replication/ReplicationUtils.java | 56 ++ .../replication/TestReplicationStateZKImpl.java | 22 - .../regionserver/ReplicationSourceService.java | 3 +- .../regionserver/PeerProcedureHandler.java | 3 + .../regionserver/PeerProcedureHandlerImpl.java | 50 +- .../RecoveredReplicationSource.java | 6 +- .../RecoveredReplicationSourceShipper.java | 8 +- .../replication/regionserver/Replication.java | 15 +- .../regionserver/ReplicationSource.java | 34 +- .../regionserver/ReplicationSourceFactory.java | 4 +- .../ReplicationSourceInterface.java | 8 +- .../regionserver/ReplicationSourceManager.java | 895 ++++++++++--------- .../regionserver/ReplicationSourceShipper.java | 6 +- .../ReplicationSourceWALReader.java | 2 +- .../replication/ReplicationSourceDummy.java | 2 +- .../replication/TestNamespaceReplication.java | 57 +- .../TestReplicationSourceManager.java | 11 +- .../TestReplicationSourceManagerZkImpl.java | 1 - 21 files changed, 659 insertions(+), 552 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index fdae288..bf8d030 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 3e17025..604e0bb 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class ReplicationPeerImpl implements ReplicationPeer { + private final Configuration conf; private final String id; http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java index ecd888f..cd65f9b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.ServerName; /** - * This class is responsible for the parsing logic for a znode representing a queue. + * This class is responsible for the parsing logic for a queue id representing a queue. * It will extract the peerId if it's recovered as well as the dead region servers * that were part of the queue's history. */ @@ -38,21 +38,20 @@ public class ReplicationQueueInfo { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class); private final String peerId; - private final String peerClusterZnode; + private final String queueId; private boolean queueRecovered; // List of all the dead region servers that had this queue (if recovered) private List deadRegionServers = new ArrayList<>(); /** - * The passed znode will be either the id of the peer cluster or - * the handling story of that queue in the form of id-servername-* + * The passed queueId will be either the id of the peer or the handling story of that queue + * in the form of id-servername-* */ - public ReplicationQueueInfo(String znode) { - this.peerClusterZnode = znode; - String[] parts = znode.split("-", 2); + public ReplicationQueueInfo(String queueId) { + this.queueId = queueId; + String[] parts = queueId.split("-", 2); this.queueRecovered = parts.length != 1; - this.peerId = this.queueRecovered ? - parts[0] : peerClusterZnode; + this.peerId = this.queueRecovered ? parts[0] : queueId; if (parts.length >= 2) { // extract dead servers extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); @@ -60,7 +59,7 @@ public class ReplicationQueueInfo { } /** - * Parse dead server names from znode string servername can contain "-" such as + * Parse dead server names from queue id. servername can contain "-" such as * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... */ @@ -119,8 +118,8 @@ public class ReplicationQueueInfo { return this.peerId; } - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } public boolean isQueueRecovered() { http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 7b676ca..ebe68a7 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; /** @@ -66,4 +70,56 @@ public final class ReplicationUtils { queueStorage.removeReplicatorIfQueueIsEmpty(replicator); } } + + private static boolean isCollectionEqual(Collection c1, Collection c2) { + if (c1 == null) { + return c2 == null; + } + if (c2 == null) { + return false; + } + return c1.size() == c2.size() && c1.containsAll(c2); + } + + private static boolean isNamespacesEqual(Set ns1, Set ns2) { + return isCollectionEqual(ns1, ns2); + } + + private static boolean isTableCFsEqual(Map> tableCFs1, + Map> tableCFs2) { + if (tableCFs1 == null) { + return tableCFs2 == null; + } + if (tableCFs2 == null) { + return false; + } + if (tableCFs1.size() != tableCFs2.size()) { + return false; + } + for (Map.Entry> entry1 : tableCFs1.entrySet()) { + TableName table = entry1.getKey(); + if (!tableCFs2.containsKey(table)) { + return false; + } + List cfs1 = entry1.getValue(); + List cfs2 = tableCFs2.get(table); + if (!isCollectionEqual(cfs1, cfs2)) { + return false; + } + } + return true; + } + + public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) { + if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { + return false; + } + if (rpc1.replicateAllUserTables()) { + return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) && + isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); + } else { + return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) && + isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 6825c36..2790bd0 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -37,14 +35,10 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationStateZKImpl extends TestReplicationStateBasic { - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class); - private static Configuration conf; private static HBaseZKTestingUtility utility; private static ZKWatcher zkw; @@ -92,20 +86,4 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { public static void tearDownAfterClass() throws Exception { utility.shutdownMiniZKCluster(); } - - private static class WarnOnlyAbortable implements Abortable { - - @Override - public void abort(String why, Throwable e) { - LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why); - if (LOG.isDebugEnabled()) { - LOG.debug(e.toString(), e); - } - } - - @Override - public boolean isAborted() { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index a82fa3d..2aef0a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java index b392985..65da9af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; +/** + * A handler for modifying replication peer in peer procedures. + */ @InterfaceAudience.Private public interface PeerProcedureHandler { http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index c09c6a0..ce8fdae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -15,21 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class PeerProcedureHandlerImpl implements PeerProcedureHandler { - private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); private final ReplicationSourceManager replicationSourceManager; private final KeyLocker peersLock = new KeyLocker<>(); @@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } @Override - public void addPeer(String peerId) throws ReplicationException, IOException { + public void addPeer(String peerId) throws IOException { Lock peerLock = peersLock.acquireLock(peerId); try { replicationSourceManager.addPeer(peerId); @@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } @Override - public void removePeer(String peerId) throws ReplicationException, IOException { + public void removePeer(String peerId) throws IOException { Lock peerLock = peersLock.acquireLock(peerId); try { if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { @@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } } - @Override - public void disablePeer(String peerId) throws ReplicationException, IOException { + private void refreshPeerState(String peerId) throws ReplicationException, IOException { PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); try { + ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + PeerState oldState = peer.getPeerState(); newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + // RS need to start work with the new replication state change + if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { + replicationSourceManager.refreshSources(peerId); + } } finally { peerLock.unlock(); } - LOG.info("disable replication peer, id: {}, new state: {}", peerId, newState); } @Override public void enablePeer(String peerId) throws ReplicationException, IOException { - PeerState newState; - Lock peerLock = peersLock.acquireLock(peerId); - try { - newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); - } finally { - peerLock.unlock(); - } - LOG.info("enable replication peer, id: {}, new state: {}", peerId, newState); + refreshPeerState(peerId); + } + + @Override + public void disablePeer(String peerId) throws ReplicationException, IOException { + refreshPeerState(peerId); } @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); try { - replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + ReplicationPeerConfig oldConfig = peer.getPeerConfig(); + ReplicationPeerConfig newConfig = + replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + // RS need to start work with the new replication config change + if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { + replicationSourceManager.refreshSources(peerId); + } } finally { peerLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 7bceb78..1be9a88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource { ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); return walReader; } @@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource { } } if (allTasksDone) { - manager.closeRecoveredQueue(this); - LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + manager.removeRecoveredSource(this); + LOG.info("Finished recovering queue " + queueId + " with the following stats: " + getStats()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index fb365bc..1e45496 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -77,7 +77,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper if (entryBatch.getWalEntries().isEmpty() && entryBatch.getLastSeqIds().isEmpty()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getPeerClusterZnode()); + + source.getQueueId()); source.getSourceMetrics().incrCompletedRecoveryQueue(); setWorkerState(WorkerState.FINISHED); continue; @@ -114,7 +114,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper // normally has a position (unless the RS failed between 2 logs) private long getRecoveredQueueStartPos() { long startPosition = 0; - String peerClusterZnode = source.getPeerClusterZnode(); + String peerClusterZnode = source.getQueueId(); try { startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), peerClusterZnode, this.queue.peek().getName()); @@ -130,8 +130,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper @Override protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), - lastReadPosition, true, false); + source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), + lastReadPosition, true); lastLoggedPosition = lastReadPosition; } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index ce056a1..2fa5a9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -59,10 +58,10 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @@ -223,11 +222,7 @@ public class Replication implements */ @Override public void startReplicationService() throws IOException { - try { - this.replicationManager.init(); - } catch (ReplicationException e) { - throw new IOException(e); - } + this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), @@ -285,9 +280,9 @@ public class Replication implements throws IOException { try { this.replicationManager.addHFileRefs(tableName, family, pairs); - } catch (ReplicationException e) { + } catch (IOException e) { LOG.error("Failed to add hfile references in the replication queue.", e); - throw new IOException(e); + throw e; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ffed88d..0092251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -105,7 +105,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // total number of edits we replicated private AtomicLong totalReplicatedEdits = new AtomicLong(0); // The znode we currently play with - protected String peerClusterZnode; + protected String queueId; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Indicates if this particular source is running @@ -141,14 +141,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server - * @param peerClusterZnode the name of our znode + * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param metrics metrics for replication source */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); @@ -167,8 +167,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.metrics = metrics; this.clusterId = clusterId; - this.peerClusterZnode = peerClusterZnode; - this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); + this.queueId = queueId; + this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); @@ -178,7 +178,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); } @@ -216,12 +216,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { - String peerId = peerClusterZnode; - if (peerId.contains("-")) { - // peerClusterZnode will be in the form peerId + "-" + rsZNode. - // A peerId will not have "-" in its name, see HBASE-11394 - peerId = peerClusterZnode.split("-")[0]; - } Map> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { List tableCfs = tableCFMap.get(tableName); @@ -310,7 +304,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); - this.manager.closeQueue(this); + this.manager.removeSource(this); return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); @@ -355,7 +349,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf ReplicationSourceWALReader walReader = new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode, + threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); } @@ -449,7 +443,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf LOG.error("Unexpected exception in ReplicationSource", e); } }; - Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, + Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId, handler); } @@ -465,9 +459,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { - LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason); + LOG.info("Closing source " + this.queueId + " because: " + reason); } else { - LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, + LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason, cause); } this.sourceRunning = false; @@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" - + this.peerClusterZnode, + + this.queueId, te); } } @@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } @Override - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index 865a202..93e8331 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -32,8 +32,8 @@ public class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); - static ReplicationSourceInterface create(Configuration conf, String peerId) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + static ReplicationSourceInterface create(Configuration conf, String queueId) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/aab18b45/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 4f10c73..d7cf9a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -51,7 +51,7 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** @@ -96,11 +96,11 @@ public interface ReplicationSourceInterface { Path getCurrentPath(); /** - * Get the id that the source is replicating to + * Get the queue id that the source is replicating to * - * @return peer cluster id + * @return queue id */ - String getPeerClusterZnode(); + String getQueueId(); /** * Get the id that the source is replicating to.