From commits-return-66539-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Jan 30 02:41:38 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id A1AF3180654 for ; Tue, 30 Jan 2018 02:41:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 91DA1160C5B; Tue, 30 Jan 2018 01:41:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 600A3160C55 for ; Tue, 30 Jan 2018 02:41:35 +0100 (CET) Received: (qmail 29718 invoked by uid 500); 30 Jan 2018 01:41:32 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 26485 invoked by uid 99); 30 Jan 2018 01:41:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 01:41:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4A73CF334C; Tue, 30 Jan 2018 01:41:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Tue, 30 Jan 2018 01:42:15 -0000 Message-Id: In-Reply-To: <70ac662ee89e49cda8e5bef4333ba1af@git.apache.org> References: <70ac662ee89e49cda8e5bef4333ba1af@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/68] [abbrv] hbase git commit: HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e807fd57 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e807fd57 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e807fd57 Branch: refs/heads/HBASE-19397-branch-2 Commit: e807fd5759aabfbd71971e8c94ccbf1c9abef940 Parents: 3fdd304 Author: zhangduo Authored: Wed Dec 27 22:03:51 2017 +0800 Committer: zhangduo Committed: Tue Jan 30 09:24:47 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationFactory.java | 9 +- .../hbase/replication/ReplicationQueues.java | 160 ------- .../replication/ReplicationQueuesArguments.java | 70 --- .../replication/ReplicationQueuesZKImpl.java | 407 ----------------- .../hbase/replication/ReplicationTableBase.java | 442 ------------------- .../replication/ReplicationTrackerZKImpl.java | 21 +- .../replication/ZKReplicationQueueStorage.java | 22 + .../replication/TestReplicationStateBasic.java | 131 +++--- .../replication/TestReplicationStateZKImpl.java | 41 +- .../regionserver/DumpReplicationQueues.java | 15 +- .../RecoveredReplicationSource.java | 17 +- .../RecoveredReplicationSourceShipper.java | 22 +- .../replication/regionserver/Replication.java | 41 +- .../regionserver/ReplicationSource.java | 16 +- .../ReplicationSourceInterface.java | 11 +- .../regionserver/ReplicationSourceManager.java | 261 ++++++----- .../regionserver/ReplicationSyncUp.java | 29 +- .../hbase/master/cleaner/TestLogsCleaner.java | 12 +- .../cleaner/TestReplicationHFileCleaner.java | 23 +- .../cleaner/TestReplicationZKNodeCleaner.java | 22 +- .../replication/ReplicationSourceDummy.java | 6 +- .../replication/TestReplicationSyncUpTool.java | 6 +- .../TestReplicationSourceManager.java | 104 ++--- .../TestReplicationSourceManagerZkImpl.java | 57 +-- 24 files changed, 379 insertions(+), 1566 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 6c1c213..5e70e57 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -17,12 +17,11 @@ */ package org.apache.hadoop.hbase.replication; -import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; /** * A factory class for instantiating replication objects that deal with replication state. @@ -30,12 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) - throws Exception { - return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class, - args); - } - public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java deleted file mode 100644 index 7f440b1..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.SortedSet; - -import org.apache.hadoop.fs.Path; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.Pair; - -/** - * This provides an interface for maintaining a region server's replication queues. These queues - * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled) - * that still need to be replicated to remote clusters. - */ -@InterfaceAudience.Private -public interface ReplicationQueues { - - /** - * Initialize the region server replication queue interface. - * @param serverName The server name of the region server that owns the replication queues this - * interface manages. - */ - void init(String serverName) throws ReplicationException; - - /** - * Remove a replication queue. - * @param queueId a String that identifies the queue. - */ - void removeQueue(String queueId); - - /** - * Add a new WAL file to the given queue. If the queue does not exist it is created. - * @param queueId a String that identifies the queue. - * @param filename name of the WAL - */ - void addLog(String queueId, String filename) throws ReplicationException; - - /** - * Remove an WAL file from the given queue. - * @param queueId a String that identifies the queue. - * @param filename name of the WAL - */ - void removeLog(String queueId, String filename); - - /** - * Set the current position for a specific WAL in a given queue. - * @param queueId a String that identifies the queue - * @param filename name of the WAL - * @param position the current position in the file - */ - void setLogPosition(String queueId, String filename, long position); - - /** - * Get the current position for a specific WAL in a given queue. - * @param queueId a String that identifies the queue - * @param filename name of the WAL - * @return the current position in the file - */ - long getLogPosition(String queueId, String filename) throws ReplicationException; - - /** - * Remove all replication queues for this region server. - */ - void removeAllQueues(); - - /** - * Get a list of all WALs in the given queue. - * @param queueId a String that identifies the queue - * @return a list of WALs, null if no such queue exists for this server - */ - List getLogsInQueue(String queueId); - - /** - * Get a list of all queues for this region server. - * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues - */ - List getAllQueues(); - - /** - * Get queueIds from a dead region server, whose queues has not been claimed by other region - * servers. - * @return empty if the queue exists but no children, null if the queue does not exist. - */ - List getUnClaimedQueueIds(String regionserver); - - /** - * Take ownership for the queue identified by queueId and belongs to a dead region server. - * @param regionserver the id of the dead region server - * @param queueId the id of the queue - * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. - */ - Pair> claimQueue(String regionserver, String queueId); - - /** - * Remove the znode of region server if the queue is empty. - * @param regionserver - */ - void removeReplicatorIfQueueIsEmpty(String regionserver); - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - */ - List getListOfReplicators(); - - /** - * Checks if the provided znode is the same as this region server's - * @param regionserver the id of the region server - * @return if this is this rs's znode - */ - boolean isThisOurRegionServer(String regionserver); - - /** - * Add a peer to hfile reference queue if peer does not exist. - * @param peerId peer cluster id to be added - * @throws ReplicationException if fails to add a peer id to hfile reference queue - */ - void addPeerToHFileRefs(String peerId) throws ReplicationException; - - /** - * Remove a peer from hfile reference queue. - * @param peerId peer cluster id to be removed - */ - void removePeerFromHFileRefs(String peerId); - - /** - * Add new hfile references to the queue. - * @param peerId peer cluster id to which the hfiles need to be replicated - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue } - * @throws ReplicationException if fails to add a hfile reference - */ - void addHFileRefs(String peerId, List> pairs) throws ReplicationException; - - /** - * Remove hfile references from the queue. - * @param peerId peer cluster id from which this hfile references needs to be removed - * @param files list of hfile references to be removed - */ - void removeHFileRefs(String peerId, List files); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java deleted file mode 100644 index c2a5df3..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various - * ReplicationQueues Implementations with different constructor arguments by reflection. - */ -@InterfaceAudience.Private -public class ReplicationQueuesArguments { - - private ZKWatcher zk; - private Configuration conf; - private Abortable abort; - - public ReplicationQueuesArguments(Configuration conf, Abortable abort) { - this.conf = conf; - this.abort = abort; - } - - public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZKWatcher zk) { - this(conf, abort); - setZk(zk); - } - - public ZKWatcher getZk() { - return zk; - } - - public void setZk(ZKWatcher zk) { - this.zk = zk; - } - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public Abortable getAbortable() { - return abort; - } - - public void setAbortable(Abortable abort) { - this.abort = abort; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java deleted file mode 100644 index 7551cb7..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.ArrayList; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides an implementation of the - * interface using ZooKeeper. The - * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of - * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is - * the regionserver name (a concatenation of the region server’s hostname, client port and start - * code). For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234 - * - * Within this znode, the region server maintains a set of WAL replication queues. These queues are - * represented by child znodes named using there give queue id. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1 - * /hbase/replication/rs/hostname.example.org,6020,1234/2 - * - * Each queue has one child znode for every WAL that still needs to be replicated. The value of - * these WAL child znodes is the latest position that has been replicated. This position is updated - * every time a WAL entry is replicated. For example: - * - * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] - */ -@InterfaceAudience.Private -public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { - - /** Znode containing all replication queues for this region server. */ - private String myQueuesZnode; - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class); - - public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init(String serverName) throws ReplicationException { - this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - try { - if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize replication queues.", e); - } - if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { - try { - if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize hfile references replication queue.", - e); - } - } - } - - @Override - public void removeQueue(String queueId) { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - ZNodePaths.joinZNode(this.myQueuesZnode, queueId)); - } catch (KeeperException e) { - this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - try { - ZKUtil.createWithParents(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException( - "Could not add log because znode could not be created. queueId=" + queueId - + ", filename=" + filename); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - ZKUtil.deleteNode(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" - + filename + ")", e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - znode = ZNodePaths.joinZNode(znode, filename); - // Why serialize String of Long and not Long as bytes? - ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position)); - } catch (KeeperException e) { - this.abortable.abort("Failed to write replication wal position (filename=" + filename - + ", position=" + position + ")", e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - String znode = ZNodePaths.joinZNode(clusterZnode, filename); - byte[] bytes = null; - try { - bytes = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException("Internal Error: could not get position in log for queueId=" - + queueId + ", filename=" + filename, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return 0; - } - try { - return ZKUtil.parseWALPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + " znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the wal file - // again - return 0; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); - } - - @Override - public List getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); - List queues = null; - try { - queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); - } catch (KeeperException e) { - this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); - } - return queues; - } - - @Override - public Pair> claimQueue(String regionserver, String queueId) { - LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); - return moveQueueUsingMulti(regionserver, queueId); - } - - @Override - public void removeReplicatorIfQueueIsEmpty(String regionserver) { - String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); - try { - List list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); - if (list != null && list.isEmpty()){ - ZKUtil.deleteNode(this.zookeeper, rsPath); - } - } catch (KeeperException e) { - LOG.warn("Got error while removing replicator", e); - } - } - - @Override - public void removeAllQueues() { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - // if the znode is already expired, don't bother going further - if (e instanceof KeeperException.SessionExpiredException) { - return; - } - this.abortable.abort("Failed to delete replication queues for region server: " - + this.myQueuesZnode, e); - } - } - - @Override - public List getLogsInQueue(String queueId) { - String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e); - } - return result; - } - - @Override - public List getAllQueues() { - List listOfQueues = null; - try { - listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get a list of queues for region server: " - + this.myQueuesZnode, e); - } - return listOfQueues == null ? new ArrayList<>() : listOfQueues; - } - - /** - * It "atomically" copies one peer's wals queue from another dead region server and returns them - * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode. - * @param znode pertaining to the region server to copy the queues from - * @peerId peerId pertaining to the queue need to be copied - */ - private Pair> moveQueueUsingMulti(String znode, String peerId) { - try { - // hbase/replication/rs/deadrs - String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode); - List listOfOps = new ArrayList<>(); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId); - List wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + replicationQueueInfo.getPeerId() + - " didn't exist, will move its queue to avoid the failure of multi op"); - for (String wal : wals) { - String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - } - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - return null; - } - - SortedSet logQueue = new TreeSet<>(); - if (wals == null || wals.isEmpty()) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - } else { - // create the new cluster znode - ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); - listOfOps.add(op); - // get the offset of the logs and set it to new znodes - for (String wal : wals) { - String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); - byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); - LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); - String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal); - listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); - logQueue.add(wal); - } - // add delete op for peer - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - - if (LOG.isTraceEnabled()) - LOG.trace(" The multi list size is: " + listOfOps.size()); - } - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - - LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); - return new Pair<>(newPeerId, logQueue); - } catch (KeeperException e) { - // Multi call failed; it looks like some other regionserver took away the logs. - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - } catch (InterruptedException e) { - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - Thread.currentThread().interrupt(); - } - return null; - } - - @Override - public void addHFileRefs(String peerId, List> pairs) - throws ReplicationException { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); - } - - int size = pairs.size(); - List listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent( - ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), - HConstants.EMPTY_BYTE_ARRAY)); - } - if (debugEnabled) { - LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void removeHFileRefs(String peerId, List files) { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - boolean debugEnabled = LOG.isDebugEnabled(); - if (debugEnabled) { - LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); - } - - int size = files.size(); - List listOfOps = new ArrayList<>(size); - - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i)))); - } - if (debugEnabled) { - LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode - + " is " + listOfOps.size()); - } - try { - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); - } catch (KeeperException e) { - LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); - } - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - LOG.info("Adding peer " + peerId + " to hfile reference queue."); - ZKUtil.createWithParents(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", - e); - } - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - try { - if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); - } - return; - } else { - LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); - ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); - } - } catch (KeeperException e) { - LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", - e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java deleted file mode 100644 index 0d8427c..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ /dev/null @@ -1,442 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/* - * Abstract class that provides an interface to the Replication Table. Which is currently - * being used for WAL offset tracking. - * The basic schema of this table will store each individual queue as a - * seperate row. The row key will be a unique identifier of the creating server's name and the - * queueId. Each queue must have the following two columns: - * COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue - * COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this - * queue. The most recent previous owner is the leftmost entry. - * They will also have columns mapping [WAL filename : offset] - * The most flexible method of interacting with the Replication Table is by calling - * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up - * to the caller to close the returned table. - */ -@InterfaceAudience.Private -abstract class ReplicationTableBase { - - /** Name of the HBase Table used for tracking replication*/ - public static final TableName REPLICATION_TABLE_NAME = - TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); - - // Column family and column names for Queues in the Replication Table - public static final byte[] CF_QUEUE = Bytes.toBytes("q"); - public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o"); - public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h"); - - // Column Descriptor for the Replication Table - private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = - new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) - .setInMemory(true) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use - .setBloomFilterType(BloomType.NONE); - - // The value used to delimit the queueId and server name inside of a queue's row key. Currently a - // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. - // See HBASE-11394. - public static final String ROW_KEY_DELIMITER = "-"; - - // The value used to delimit server names in the queue history list - public static final String QUEUE_HISTORY_DELIMITER = "|"; - - /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. - */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; - - // We only need a single thread to initialize the Replication Table - private static final int NUM_INITIALIZE_WORKERS = 1; - - protected final Configuration conf; - protected final Abortable abortable; - private final Connection connection; - private final Executor executor; - private volatile CountDownLatch replicationTableInitialized; - - public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { - this.conf = new Configuration(conf); - this.abortable = abort; - decorateConf(); - this.connection = ConnectionFactory.createConnection(this.conf); - this.executor = setUpExecutor(); - this.replicationTableInitialized = new CountDownLatch(1); - createReplicationTableInBackground(); - } - - /** - * Modify the connection's config so that operations run on the Replication Table have longer and - * a larger number of retries - */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); - } - - /** - * Sets up the thread pool executor used to build the Replication Table in the background - * @return the configured executor - */ - private Executor setUpExecutor() { - ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS, - NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - tfb.setNameFormat("ReplicationTableExecutor-%d"); - tfb.setDaemon(true); - tempExecutor.setThreadFactory(tfb.build()); - return tempExecutor; - } - - /** - * Get whether the Replication Table has been successfully initialized yet - * @return whether the Replication Table is initialized - */ - public boolean getInitializationStatus() { - return replicationTableInitialized.getCount() == 0; - } - - /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** - * Build the row key for the given queueId. This will uniquely identify it from all other queues - * in the cluster. - * @param serverName The owner of the queue - * @param queueId String identifier of the queue - * @return String representation of the queue's row key - */ - protected String buildQueueRowKey(String serverName, String queueId) { - return queueId + ROW_KEY_DELIMITER + serverName; - } - - /** - * Parse the original queueId from a row key - * @param rowKey String representation of a queue's row key - * @return the original queueId - */ - protected String getRawQueueIdFromRowKey(String rowKey) { - return rowKey.split(ROW_KEY_DELIMITER)[0]; - } - - /** - * Returns a queue's row key given either its raw or reclaimed queueId - * - * @param queueId queueId of the queue - * @return byte representation of the queue's row key - */ - protected byte[] queueIdToRowKey(String serverName, String queueId) { - // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen - // then this is not a reclaimed queue. - if (!queueId.contains(ROW_KEY_DELIMITER)) { - return Bytes.toBytes(buildQueueRowKey(serverName, queueId)); - // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the - // queue's row key - } else { - return Bytes.toBytes(queueId); - } - } - - /** - * Creates a "|" delimited record of the queue's past region server owners. - * - * @param originalHistory the queue's original owner history - * @param oldServer the name of the server that used to own the queue - * @return the queue's new owner history - */ - protected String buildClaimedQueueHistory(String originalHistory, String oldServer) { - return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory; - } - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - */ - protected List getListOfReplicators() { - // scan all of the queues and return a list of all unique OWNER values - Set peerServers = new HashSet<>(); - ResultScanner allQueuesInCluster = null; - try (Table replicationTable = getOrBlockOnReplicationTable()){ - Scan scan = new Scan(); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - allQueuesInCluster = replicationTable.getScanner(scan); - for (Result queue : allQueuesInCluster) { - peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); - } - } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); - } finally { - if (allQueuesInCluster != null) { - allQueuesInCluster.close(); - } - } - return new ArrayList<>(peerServers); - } - - protected List getAllQueues(String serverName) { - List allQueues = new ArrayList<>(); - ResultScanner queueScanner = null; - try { - queueScanner = getQueuesBelongingToServer(serverName); - for (Result queue : queueScanner) { - String rowKey = Bytes.toString(queue.getRow()); - // If the queue does not have a Owner History, then we must be its original owner. So we - // want to return its queueId in raw form - if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) { - allQueues.add(getRawQueueIdFromRowKey(rowKey)); - } else { - allQueues.add(rowKey); - } - } - return allQueues; - } catch (IOException e) { - String errMsg = "Failed getting list of all replication queues for serverName=" + serverName; - abortable.abort(errMsg, e); - return null; - } finally { - if (queueScanner != null) { - queueScanner.close(); - } - } - } - - protected List getLogsInQueue(String serverName, String queueId) { - String rowKey = queueId; - if (!queueId.contains(ROW_KEY_DELIMITER)) { - rowKey = buildQueueRowKey(serverName, queueId); - } - return getLogsInQueue(Bytes.toBytes(rowKey)); - } - - protected List getLogsInQueue(byte[] rowKey) { - String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - Get getQueue = new Get(rowKey); - Result queue = replicationTable.get(getQueue); - if (queue == null || queue.isEmpty()) { - abortable.abort(errMsg, new ReplicationException(errMsg)); - return null; - } - return readWALsFromResult(queue); - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - } - - /** - * Read all of the WAL's from a queue into a list - * - * @param queue HBase query result containing the queue - * @return a list of all the WAL filenames - */ - protected List readWALsFromResult(Result queue) { - List wals = new ArrayList<>(); - Map familyMap = queue.getFamilyMap(CF_QUEUE); - for (byte[] cQualifier : familyMap.keySet()) { - // Ignore the meta data fields of the queue - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - wals.add(Bytes.toString(cQualifier)); - } - return wals; - } - - /** - * Get the queue id's and meta data (Owner and History) for the queues belonging to the named - * server - * - * @param server name of the server - * @return a ResultScanner over the QueueIds belonging to the server - * @throws IOException - */ - protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { - Scan scan = new Scan(); - SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareOperator.EQUAL, Bytes.toBytes(server)); - scan.setFilter(filterMyQueues); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); - try (Table replicationTable = getOrBlockOnReplicationTable()) { - ResultScanner results = replicationTable.getScanner(scan); - return results; - } - } - - /** - * Attempts to acquire the Replication Table. This operation will block until it is assigned by - * the CreateReplicationWorker thread. It is up to the caller of this method to close the - * returned Table - * @return the Replication Table when it is created - * @throws IOException - */ - protected Table getOrBlockOnReplicationTable() throws IOException { - // Sleep until the Replication Table becomes available - try { - replicationTableInitialized.await(); - } catch (InterruptedException e) { - String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + - e.getMessage(); - throw new InterruptedIOException(errMsg); - } - return getAndSetUpReplicationTable(); - } - - /** - * Creates a new copy of the Replication Table and sets up the proper Table time outs for it - * - * @return the Replication Table - * @throws IOException - */ - private Table getAndSetUpReplicationTable() throws IOException { - Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); - setReplicationTableTimeOuts(replicationTable); - return replicationTable; - } - - /** - * Builds the Replication Table in a background thread. Any method accessing the Replication Table - * should do so through getOrBlockOnReplicationTable() - * - * @return the Replication Table - * @throws IOException if the Replication Table takes too long to build - */ - private void createReplicationTableInBackground() throws IOException { - executor.execute(new CreateReplicationTableWorker()); - } - - /** - * Attempts to build the Replication Table. Will continue blocking until we have a valid - * Table for the Replication Table. - */ - private class CreateReplicationTableWorker implements Runnable { - - private Admin admin; - - @Override - public void run() { - try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); - RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { - retryCounter.sleepUntilNextRetry(); - if (!retryCounter.shouldRetry()) { - throw new IOException("Unable to acquire the Replication Table"); - } - } - replicationTableInitialized.countDown(); - } catch (IOException | InterruptedException e) { - abortable.abort("Failed building Replication Table", e); - } - } - - /** - * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR - * in TableBasedReplicationQueuesImpl - * - * @throws IOException - */ - private void createReplicationTable() throws IOException { - HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); - replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); - try { - admin.createTable(replicationTableDescriptor); - } catch (TableExistsException e) { - // In this case we can just continue as normal - } - } - - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java index 2c522f6..5659e4b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -54,6 +53,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements super(zookeeper, conf, abortable); this.stopper = stopper; this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); + // watch the changes + refreshOtherRegionServersList(true); } @Override @@ -71,7 +72,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements */ @Override public List getListOfRegionServers() { - refreshOtherRegionServersList(); + refreshOtherRegionServersList(false); List list = null; synchronized (otherRegionServers) { @@ -137,7 +138,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements if (!path.startsWith(this.watcher.znodePaths.rsZNode)) { return false; } - return refreshOtherRegionServersList(); + return refreshOtherRegionServersList(true); } } @@ -157,8 +158,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements * @return true if the local list of the other region servers was updated with the ZK data (even * if it was empty), false if the data was missing in ZK */ - private boolean refreshOtherRegionServersList() { - List newRsList = getRegisteredRegionServers(); + private boolean refreshOtherRegionServersList(boolean watch) { + List newRsList = getRegisteredRegionServers(watch); if (newRsList == null) { return false; } else { @@ -174,10 +175,14 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements * Get a list of all the other region servers in this cluster and set a watch * @return a list of server nanes */ - private List getRegisteredRegionServers() { + private List getRegisteredRegionServers(boolean watch) { List result = null; try { - result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + if (watch) { + result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + } else { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode); + } } catch (KeeperException e) { this.abortable.abort("Get list of registered region servers", e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 0275d52..41f50d8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -54,6 +54,28 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe /** * ZK based replication queue storage. + *

+ * The base znode for each regionserver is the regionserver name. For example: + * + *

+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ * 
+ * + * Within this znode, the region server maintains a set of WAL replication queues. These queues are + * represented by child znodes named using there give queue id. For example: + * + *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ * 
+ * + * Each queue has one child znode for every WAL that still needs to be replicated. The value of + * these WAL child znodes is the latest position that has been replicated. This position is updated + * every time a WAL entry is replicated. For example: + * + *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ * 
*/ @InterfaceAudience.Private class ZKReplicationQueueStorage extends ZKReplicationStorageBase http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 8905d43..4afda5d 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -42,9 +42,8 @@ import org.slf4j.LoggerFactory; */ public abstract class TestReplicationStateBasic { - protected ReplicationQueues rq1; - protected ReplicationQueues rq2; - protected ReplicationQueues rq3; + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); + protected ReplicationQueueStorage rqs; protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); @@ -63,8 +62,6 @@ public abstract class TestReplicationStateBasic { protected static final int ZK_MAX_COUNT = 300; protected static final int ZK_SLEEP_INTERVAL = 100; // millis - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - @Test public void testReplicationQueueStorage() throws ReplicationException { // Test methods with empty state @@ -76,15 +73,13 @@ public abstract class TestReplicationStateBasic { * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- * server2: zero queues */ - rq1.init(server1.getServerName()); - rq2.init(server2.getServerName()); - rq1.addLog("qId1", "trash"); - rq1.removeLog("qId1", "trash"); - rq1.addLog("qId2", "filename1"); - rq1.addLog("qId3", "filename2"); - rq1.addLog("qId3", "filename3"); - rq2.addLog("trash", "trash"); - rq2.removeQueue("trash"); + rqs.addWAL(server1, "qId1", "trash"); + rqs.removeWAL(server1, "qId1", "trash"); + rqs.addWAL(server1,"qId2", "filename1"); + rqs.addWAL(server1,"qId3", "filename2"); + rqs.addWAL(server1,"qId3", "filename3"); + rqs.addWAL(server2,"trash", "trash"); + rqs.removeQueue(server2,"trash"); List reps = rqs.getListOfReplicators(); assertEquals(2, reps.size()); @@ -105,62 +100,55 @@ public abstract class TestReplicationStateBasic { assertTrue(list.contains("qId3")); } + private void removeAllQueues(ServerName serverName) throws ReplicationException { + for (String queue: rqs.getAllQueues(serverName)) { + rqs.removeQueue(serverName, queue); + } + } @Test public void testReplicationQueues() throws ReplicationException { - rq1.init(server1.getServerName()); - rq2.init(server2.getServerName()); - rq3.init(server3.getServerName()); // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) rp.init(); - // 3 replicators should exist - assertEquals(3, rq1.getListOfReplicators().size()); - rq1.removeQueue("bogus"); - rq1.removeLog("bogus", "bogus"); - rq1.removeAllQueues(); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(0, rq1.getLogPosition("bogus", "bogus")); - assertNull(rq1.getLogsInQueue("bogus")); - assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString())); - - rq1.setLogPosition("bogus", "bogus", 5L); + rqs.removeQueue(server1, "bogus"); + rqs.removeWAL(server1, "bogus", "bogus"); + removeAllQueues(server1); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); populateQueues(); - assertEquals(3, rq1.getListOfReplicators().size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq3.getLogPosition("qId1", "filename0")); - rq3.setLogPosition("qId5", "filename4", 354L); - assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); + assertEquals(3, rqs.getListOfReplicators().size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); + rqs.setWALPosition(server3, "qId5", "filename4", 354L); + assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(1, rq2.getAllQueues().size()); - assertEquals(5, rq3.getAllQueues().size()); + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(1, rqs.getAllQueues(server2).size()); + assertEquals(5, rqs.getAllQueues(server3).size()); - assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size()); - rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName()); - assertEquals(2, rq3.getListOfReplicators().size()); + assertEquals(0, rqs.getAllQueues(server1).size()); + rqs.removeReplicatorIfQueueIsEmpty(server1); + assertEquals(2, rqs.getListOfReplicators().size()); - List queues = rq2.getUnClaimedQueueIds(server3.getServerName()); + List queues = rqs.getAllQueues(server3); assertEquals(5, queues.size()); for (String queue : queues) { - rq2.claimQueue(server3.getServerName(), queue); + rqs.claimQueue(server3, queue, server2); } - rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName()); - assertEquals(1, rq2.getListOfReplicators().size()); - - // Try to claim our own queues - assertNull(rq2.getUnClaimedQueueIds(server2.getServerName())); - rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName()); - - assertEquals(6, rq2.getAllQueues().size()); + rqs.removeReplicatorIfQueueIsEmpty(server3); + assertEquals(1, rqs.getListOfReplicators().size()); - rq2.removeAllQueues(); - - assertEquals(0, rq2.getListOfReplicators().size()); + assertEquals(6, rqs.getAllQueues(server2).size()); + removeAllQueues(server2); + rqs.removeReplicatorIfQueueIsEmpty(server2); + assertEquals(0, rqs.getListOfReplicators().size()); } @Test @@ -197,7 +185,6 @@ public abstract class TestReplicationStateBasic { @Test public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { rp.init(); - rq1.init(server1.getServerName()); List> files1 = new ArrayList<>(3); files1.add(new Pair<>(null, new Path("file_1"))); @@ -206,8 +193,8 @@ public abstract class TestReplicationStateBasic { assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); - rq1.addHFileRefs(ID_ONE, files1); + rqs.addPeerToHFileRefs(ID_ONE); + rqs.addHFileRefs(ID_ONE, files1); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); List hfiles2 = new ArrayList<>(files1.size()); @@ -215,43 +202,41 @@ public abstract class TestReplicationStateBasic { hfiles2.add(p.getSecond().getName()); } String removedString = hfiles2.remove(0); - rq1.removeHFileRefs(ID_ONE, hfiles2); + rqs.removeHFileRefs(ID_ONE, hfiles2); assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); hfiles2 = new ArrayList<>(1); hfiles2.add(removedString); - rq1.removeHFileRefs(ID_ONE, hfiles2); + rqs.removeHFileRefs(ID_ONE, hfiles2); assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); rp.unregisterPeer(ID_ONE); } @Test public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rq1.init(server1.getServerName()); - rp.init(); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); + rqs.addPeerToHFileRefs(ID_ONE); rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); - rq1.addPeerToHFileRefs(ID_TWO); + rqs.addPeerToHFileRefs(ID_TWO); List> files1 = new ArrayList<>(3); files1.add(new Pair<>(null, new Path("file_1"))); files1.add(new Pair<>(null, new Path("file_2"))); files1.add(new Pair<>(null, new Path("file_3"))); - rq1.addHFileRefs(ID_ONE, files1); - rq1.addHFileRefs(ID_TWO, files1); + rqs.addHFileRefs(ID_ONE, files1); + rqs.addHFileRefs(ID_TWO, files1); assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_ONE); - rq1.removePeerFromHFileRefs(ID_ONE); + rqs.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); rp.unregisterPeer(ID_TWO); - rq1.removePeerFromHFileRefs(ID_TWO); + rqs.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); } @@ -363,15 +348,15 @@ public abstract class TestReplicationStateBasic { * 3, 4, 5 log files respectively */ protected void populateQueues() throws ReplicationException { - rq1.addLog("trash", "trash"); - rq1.removeQueue("trash"); + rqs.addWAL(server1, "trash", "trash"); + rqs.removeQueue(server1, "trash"); - rq2.addLog("qId1", "trash"); - rq2.removeLog("qId1", "trash"); + rqs.addWAL(server2, "qId1", "trash"); + rqs.removeWAL(server2, "qId1", "trash"); for (int i = 1; i < 6; i++) { for (int j = 0; j < i; j++) { - rq3.addLog("qId" + i, "filename" + j); + rqs.addWAL(server3, "qId" + i, "filename" + j); } // Add peers for the corresponding queues so they are not orphans rp.registerPeer("qId" + i, http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 6abe3f8..e7c8b3b 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -41,7 +37,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +54,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { private static HBaseZKTestingUtility utility; private static ZKWatcher zkw; private static String replicationZNode; - private ReplicationQueuesZKImpl rqZK; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -89,23 +83,9 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { @Before public void setUp() { zkTimeoutCount = 0; - WarnOnlyAbortable abortable = new WarnOnlyAbortable(); - try { - rq1 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rq2 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rq3 = ReplicationFactory - .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); - rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - } catch (Exception e) { - // This should not occur, because getReplicationQueues() only throws for - // TableBasedReplicationQueuesImpl - fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); - } - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable()); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable); } @After @@ -118,23 +98,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { utility.shutdownMiniZKCluster(); } - @Test - public void testIsPeerPath_PathToParentOfPeerNode() { - assertFalse(rqZK.isPeerPath(rqZK.peersZNode)); - } - - @Test - public void testIsPeerPath_PathToChildOfPeerNode() { - String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child"); - assertFalse(rqZK.isPeerPath(peerChild)); - } - - @Test - public void testIsPeerPath_ActualPeerPath() { - String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1"); - assertTrue(rqZK.isPeerPath(peerPath)); - } - private static class WarnOnlyAbortable implements Abortable { @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index d8f9625..73e600e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -307,14 +305,10 @@ public class DumpReplicationQueues extends Configured implements Tool { boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; ReplicationPeers replicationPeers; - ReplicationQueues replicationQueues; ReplicationTracker replicationTracker; - ReplicationQueuesArguments replicationArgs = - new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw); StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), @@ -328,7 +322,6 @@ public class DumpReplicationQueues extends Configured implements Tool { } for (ServerName regionserver : regionservers) { List queueIds = queueStorage.getAllQueues(regionserver); - replicationQueues.init(regionserver.getServerName()); if (!liveRegionServers.contains(regionserver.getServerName())) { deadRegionServers.add(regionserver.getServerName()); } @@ -338,17 +331,17 @@ public class DumpReplicationQueues extends Configured implements Tool { if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); sb.append( - formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); + formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { sb.append( - formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); + formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } return sb.toString(); } - private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues, + private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted, boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); @@ -370,7 +363,7 @@ public class DumpReplicationQueues extends Configured implements Tool { peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); for (String wal : wals) { - long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); + long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index bd191e3..e0c45d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,15 +28,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class that handles the recovered source of a replication stream, which is transfered from @@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, + ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode, + super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode, clusterId, replicationEndpoint, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -64,7 +63,7 @@ public class RecoveredReplicationSource extends ReplicationSource { protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { final RecoveredReplicationSourceShipper worker = new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, - this.replicationQueues); + this.queueStorage); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 630b90b..fb365bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -23,13 +23,13 @@ import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used by a {@link RecoveredReplicationSource}. @@ -40,14 +40,14 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class); protected final RecoveredReplicationSource source; - private final ReplicationQueues replicationQueues; + private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, RecoveredReplicationSource source, - ReplicationQueues replicationQueues) { + ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; - this.replicationQueues = replicationQueues; + this.replicationQueues = queueStorage; } @Override @@ -116,11 +116,11 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper long startPosition = 0; String peerClusterZnode = source.getPeerClusterZnode(); try { - startPosition = this.replicationQueues.getLogPosition(peerClusterZnode, - this.queue.peek().getName()); + startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), + peerClusterZnode, this.queue.peek().getName()); if (LOG.isTraceEnabled()) { - LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " - + startPosition); + LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + + startPosition); } } catch (ReplicationException e) { terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 47f08f9..d555c6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,12 +27,6 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,27 +36,33 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @@ -74,7 +73,7 @@ public class Replication implements LoggerFactory.getLogger(Replication.class); private boolean replicationForBulkLoadData; private ReplicationSourceManager replicationManager; - private ReplicationQueues replicationQueues; + private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; private ReplicationTracker replicationTracker; private Configuration conf; @@ -128,10 +127,8 @@ public class Replication implements } try { - this.replicationQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, - server.getZooKeeper())); - this.replicationQueues.init(this.server.getServerName().toString()); + this.queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); this.replicationPeers.init(); @@ -148,7 +145,7 @@ public class Replication implements throw new IOException("Could not read cluster id", ke); } this.replicationManager = - new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf, + new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); http://git-wip-us.apache.org/repos/asf/hbase/blob/e807fd57/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a2eb202..58ea6ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,7 +30,6 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -83,7 +81,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; - protected ReplicationQueues replicationQueues; + protected ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; protected Configuration conf; @@ -148,7 +146,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, + ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; @@ -161,7 +159,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); - this.replicationQueues = replicationQueues; + this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.manager = manager; this.fs = fs; @@ -230,7 +228,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.replicationQueues.addHFileRefs(peerId, pairs); + this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " @@ -239,7 +237,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } else { // user has explicitly not defined any table cfs for replication, means replicate all the // data - this.replicationQueues.addHFileRefs(peerId, pairs); + this.queueStorage.addHFileRefs(peerId, pairs); metrics.incrSizeOfHFileRefsQueue(pairs.size()); } }