Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CA1A2200ACA for ; Thu, 9 Jun 2016 15:15:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C88CF160A58; Thu, 9 Jun 2016 13:15:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C130E160A2B for ; Thu, 9 Jun 2016 15:15:21 +0200 (CEST) Received: (qmail 91725 invoked by uid 500); 9 Jun 2016 13:15:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 91716 invoked by uid 99); 9 Jun 2016 13:15:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 13:15:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8911ADFE59; Thu, 9 Jun 2016 13:15:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ashishsinghi@apache.org To: commits@hbase.apache.org Message-Id: <9b1a428668684a3897cf59eb10c144a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node Date: Thu, 9 Jun 2016 13:15:20 +0000 (UTC) archived-at: Thu, 09 Jun 2016 13:15:23 -0000 Repository: hbase Updated Branches: refs/heads/master 41cc21554 -> 9012a0b12 HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9012a0b1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9012a0b1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9012a0b1 Branch: refs/heads/master Commit: 9012a0b123b3eea8b08c8687cef812e83e9b491d Parents: 41cc215 Author: Ashish Singhi Authored: Thu Jun 9 18:44:29 2016 +0530 Committer: Ashish Singhi Committed: Thu Jun 9 18:44:29 2016 +0530 ---------------------------------------------------------------------- .../replication/ReplicationPeersZKImpl.java | 21 ------------- .../hbase/replication/ReplicationQueues.java | 6 ++++ .../replication/ReplicationQueuesHBaseImpl.java | 6 ++++ .../replication/ReplicationQueuesZKImpl.java | 33 ++++++++++++++++---- .../regionserver/ReplicationSourceManager.java | 11 +++++-- .../cleaner/TestReplicationHFileCleaner.java | 1 + .../replication/TestReplicationStateBasic.java | 5 +++ 7 files changed, 53 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 15265d9..5af97c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -129,17 +129,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createWithParents(this.zookeeper, this.peersZNode); - // Irrespective of bulk load hfile replication is enabled or not we add peerId node to - // hfile-refs node -- HBASE-15397 - try { - String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); - LOG.info("Adding peer " + peerId + " to hfile reference queue."); - ZKUtil.createWithParents(this.zookeeper, peerId); - } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer with id=" + id - + ", node under hfile references node.", e); - } - List listOfOps = new ArrayList(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), ReplicationSerDeHelper.toByteArray(peerConfig)); @@ -166,16 +155,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re + " because that id does not exist."); } ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); - // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile - // replication is enabled or not - - String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id); - try { - LOG.info("Removing peer " + peerId + " from hfile reference queue."); - ZKUtil.deleteNodeRecursively(this.zookeeper, peerId); - } catch (NoNodeException e) { - LOG.info("Did not find node " + peerId + " to delete.", e); - } } catch (KeeperException e) { throw new ReplicationException("Could not remove peer with id=" + id, e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index db6da91..809b122 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -123,6 +123,12 @@ public interface ReplicationQueues { void addPeerToHFileRefs(String peerId) throws ReplicationException; /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId); + + /** * Add new hfile references to the queue. * @param peerId peer cluster id to which the hfiles need to be replicated * @param files list of hfile references to be added http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java index bbc9e32..29f0632 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java @@ -302,6 +302,12 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues { } @Override + public void removePeerFromHFileRefs(String peerId) { + // TODO + throw new NotImplementedException(); + } + + @Override public void addHFileRefs(String peerId, List files) throws ReplicationException { // TODO throw new NotImplementedException(); http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 32d0883..f03efff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -89,12 +89,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } catch (KeeperException e) { throw new ReplicationException("Could not initialize replication queues.", e); } - // Irrespective of bulk load hfile replication is enabled or not we add peerId node to - // hfile-refs node -- HBASE-15397 - try { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize hfile references replication queue.", e); + if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { + try { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize hfile references replication queue.", + e); + } } } @@ -504,4 +506,23 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R e); } } + + @Override + public void removePeerFromHFileRefs(String peerId) { + final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + try { + if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); + } + return; + } else { + LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); + } + } catch (KeeperException e) { + LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", + e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ed2eecc..e9330f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -115,6 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final ThreadPoolExecutor executor; private final Random rand; + private final boolean replicationForBulkLoadDataEnabled; /** @@ -166,6 +167,9 @@ public class ReplicationSourceManager implements ReplicationListener { this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); this.latestPaths = Collections.synchronizedSet(new HashSet()); + replicationForBulkLoadDataEnabled = + conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); } /** @@ -227,9 +231,6 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { - boolean replicationForBulkLoadDataEnabled = - conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); for (String id : this.replicationPeers.getPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { @@ -579,6 +580,7 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void peerRemoved(String peerId) { removePeer(peerId); + this.replicationQueues.removePeerFromHFileRefs(peerId); } @Override @@ -588,6 +590,9 @@ public class ReplicationSourceManager implements ReplicationListener { boolean added = this.replicationPeers.peerAdded(id); if (added) { addSource(id); + if (replicationForBulkLoadDataEnabled) { + this.replicationQueues.addPeerToHFileRefs(id); + } } } catch (Exception e) { LOG.error("Error while adding a new peer", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 1778e73..e5f1e69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -111,6 +111,7 @@ public class TestReplicationHFileCleaner { public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); + rq.addPeerToHFileRefs(peerId); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 5ab26ab..de5cc31 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -204,6 +204,7 @@ public abstract class TestReplicationStateBasic { assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); @@ -225,7 +226,9 @@ public abstract class TestReplicationStateBasic { rp.init(); rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rq1.addPeerToHFileRefs(ID_ONE); rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rq1.addPeerToHFileRefs(ID_TWO); List files1 = new ArrayList(3); files1.add("file_1"); @@ -238,11 +241,13 @@ public abstract class TestReplicationStateBasic { assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); rp.removePeer(ID_ONE); + rq1.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); rp.removePeer(ID_TWO); + rq1.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); assertNull(rqc.getReplicableHFiles(ID_TWO)); }