hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [45/50] hbase git commit: HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node
Date Sat, 11 Jun 2016 04:56:31 GMT
HBASE-15952 Bulk load data replication is not working when RS user does not have permission
on hfile-refs node


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9012a0b1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9012a0b1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9012a0b1

Branch: refs/heads/hbase-12439
Commit: 9012a0b123b3eea8b08c8687cef812e83e9b491d
Parents: 41cc215
Author: Ashish Singhi <ashishsinghi@apache.org>
Authored: Thu Jun 9 18:44:29 2016 +0530
Committer: Ashish Singhi <ashishsinghi@apache.org>
Committed: Thu Jun 9 18:44:29 2016 +0530

----------------------------------------------------------------------
 .../replication/ReplicationPeersZKImpl.java     | 21 -------------
 .../hbase/replication/ReplicationQueues.java    |  6 ++++
 .../replication/ReplicationQueuesHBaseImpl.java |  6 ++++
 .../replication/ReplicationQueuesZKImpl.java    | 33 ++++++++++++++++----
 .../regionserver/ReplicationSourceManager.java  | 11 +++++--
 .../cleaner/TestReplicationHFileCleaner.java    |  1 +
 .../replication/TestReplicationStateBasic.java  |  5 +++
 7 files changed, 53 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 15265d9..5af97c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -129,17 +129,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
 
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
 
-      // Irrespective of bulk load hfile replication is enabled or not we add peerId node
to
-      // hfile-refs node -- HBASE-15397
-      try {
-        String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
-        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
-        ZKUtil.createWithParents(this.zookeeper, peerId);
-      } catch (KeeperException e) {
-        throw new ReplicationException("Failed to add peer with id=" + id
-            + ", node under hfile references node.", e);
-      }
-
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
         ReplicationSerDeHelper.toByteArray(peerConfig));
@@ -166,16 +155,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements
Re
             + " because that id does not exist.");
       }
       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
-      // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
-      // replication is enabled or not
-
-      String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
-      try {
-        LOG.info("Removing peer " + peerId + " from hfile reference queue.");
-        ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
-      } catch (NoNodeException e) {
-        LOG.info("Did not find node " + peerId + " to delete.", e);
-      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not remove peer with id=" + id, e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index db6da91..809b122 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -123,6 +123,12 @@ public interface ReplicationQueues {
   void addPeerToHFileRefs(String peerId) throws ReplicationException;
 
   /**
+   * Remove a peer from hfile reference queue.
+   * @param peerId peer cluster id to be removed
+   */
+  void removePeerFromHFileRefs(String peerId);
+
+  /**
    * Add new hfile references to the queue.
    * @param peerId peer cluster id to which the hfiles need to be replicated
    * @param files list of hfile references to be added

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
index bbc9e32..29f0632 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
@@ -302,6 +302,12 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues
{
   }
 
   @Override
+  public void removePeerFromHFileRefs(String peerId) {
+    // TODO
+    throw new NotImplementedException();
+  }
+
+  @Override
   public void addHFileRefs(String peerId, List<String> files) throws ReplicationException
{
     // TODO
     throw new NotImplementedException();

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 32d0883..f03efff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -89,12 +89,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements
R
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication queues.", e);
     }
-    // Irrespective of bulk load hfile replication is enabled or not we add peerId node to
-    // hfile-refs node -- HBASE-15397
-    try {
-      ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not initialize hfile references replication queue.",
e);
+    if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
+      try {
+        ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+      } catch (KeeperException e) {
+        throw new ReplicationException("Could not initialize hfile references replication
queue.",
+            e);
+      }
     }
   }
 
@@ -504,4 +506,23 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements
R
           e);
     }
   }
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) {
+    final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
+        }
+        return;
+      } else {
+        LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
+        ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference
queue.",
+        e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ed2eecc..e9330f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -115,6 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final ThreadPoolExecutor executor;
 
   private final Random rand;
+  private final boolean replicationForBulkLoadDataEnabled;
 
 
   /**
@@ -166,6 +167,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.executor.setThreadFactory(tfb.build());
     this.rand = new Random();
     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
+    replicationForBulkLoadDataEnabled =
+        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
   }
 
   /**
@@ -227,9 +231,6 @@ public class ReplicationSourceManager implements ReplicationListener {
    * old region server wal queues
    */
   protected void init() throws IOException, ReplicationException {
-    boolean replicationForBulkLoadDataEnabled =
-        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
-          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
     for (String id : this.replicationPeers.getPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
@@ -579,6 +580,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   @Override
   public void peerRemoved(String peerId) {
     removePeer(peerId);
+    this.replicationQueues.removePeerFromHFileRefs(peerId);
   }
 
   @Override
@@ -588,6 +590,9 @@ public class ReplicationSourceManager implements ReplicationListener {
         boolean added = this.replicationPeers.peerAdded(id);
         if (added) {
           addSource(id);
+          if (replicationForBulkLoadDataEnabled) {
+            this.replicationQueues.addPeerToHFileRefs(id);
+          }
         }
       } catch (Exception e) {
         LOG.error("Error while adding a new peer", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 1778e73..e5f1e69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -111,6 +111,7 @@ public class TestReplicationHFileCleaner {
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
     rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+    rq.addPeerToHFileRefs(peerId);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 5ab26ab..de5cc31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -204,6 +204,7 @@ public abstract class TestReplicationStateBasic {
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rq1.addPeerToHFileRefs(ID_ONE);
     rq1.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
     assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
@@ -225,7 +226,9 @@ public abstract class TestReplicationStateBasic {
 
     rp.init();
     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rq1.addPeerToHFileRefs(ID_ONE);
     rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+    rq1.addPeerToHFileRefs(ID_TWO);
 
     List<String> files1 = new ArrayList<String>(3);
     files1.add("file_1");
@@ -238,11 +241,13 @@ public abstract class TestReplicationStateBasic {
     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
 
     rp.removePeer(ID_ONE);
+    rq1.removePeerFromHFileRefs(ID_ONE);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
 
     rp.removePeer(ID_TWO);
+    rq1.removePeerFromHFileRefs(ID_TWO);
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
     assertNull(rqc.getReplicableHFiles(ID_TWO));
   }


Mime
View raw message