hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-16135 PeerClusterZnode under rs of removed peer may never be deleted
Date Mon, 04 Jul 2016 11:59:40 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 f30e95314 -> a9b7f3f02


HBASE-16135 PeerClusterZnode under rs of removed peer may never be deleted


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a9b7f3f0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a9b7f3f0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a9b7f3f0

Branch: refs/heads/branch-1.3
Commit: a9b7f3f0219556fa8023fe4684aef3724e624597
Parents: f30e953
Author: zhangduo <zhangduo@apache.org>
Authored: Mon Jul 4 19:51:45 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Mon Jul 4 19:53:49 2016 +0800

----------------------------------------------------------------------
 .../replication/ReplicationQueuesZKImpl.java    |  8 ++-
 .../regionserver/ReplicationSourceManager.java  | 63 +++++++++++++-------
 .../TestReplicationSourceManager.java           | 44 ++++++++++++--
 3 files changed, 84 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a9b7f3f0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index c2d3b1f..d582f28 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -311,9 +311,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements
R
       for (String peerId : peerIdsToProcess) {
         ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
         if (!peerExists(replicationQueueInfo.getPeerId())) {
-          LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
-          // Protection against moving orphaned queues
-          continue;
+          // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
+          // this will cause the whole multi op fail.
+          // NodeFailoverWorker will skip the orphaned queues.
+          LOG.warn("Peer " + peerId
+              + " didn't exist, will move its queue to avoid the failure of multi op");
         }
         String newPeerId = peerId + "-" + znode;
         String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a9b7f3f0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 17ccc87..6135328 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,6 +19,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -63,8 +66,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -356,6 +357,11 @@ public class ReplicationSourceManager implements ReplicationListener
{
     return this.oldsources;
   }
 
+  @VisibleForTesting
+  List<String> getAllQueues() {
+    return replicationQueues.getAllQueues();
+  }
+
   void preLogRoll(Path newLog) throws IOException {
     recordLog(newLog);
     String logName = newLog.getName();
@@ -540,24 +546,28 @@ public class ReplicationSourceManager implements ReplicationListener
{
         + sources.size() + " and another "
         + oldsources.size() + " that were recovered");
     String terminateMessage = "Replication stream was removed by a user";
-    List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
     List<ReplicationSourceInterface> oldSourcesToDelete =
         new ArrayList<ReplicationSourceInterface>();
-    // First close all the recovered sources for this peer
-    for (ReplicationSourceInterface src : oldsources) {
-      if (id.equals(src.getPeerClusterId())) {
-        oldSourcesToDelete.add(src);
+    // synchronized on oldsources to avoid adding recovered source for the to-be-removed
peer
+    // see NodeFailoverWorker.run
+    synchronized (oldsources) {
+      // First close all the recovered sources for this peer
+      for (ReplicationSourceInterface src : oldsources) {
+        if (id.equals(src.getPeerClusterId())) {
+          oldSourcesToDelete.add(src);
+        }
+      }
+      for (ReplicationSourceInterface src : oldSourcesToDelete) {
+        src.terminate(terminateMessage);
+        closeRecoveredQueue(src);
       }
-    }
-    for (ReplicationSourceInterface src : oldSourcesToDelete) {
-      src.terminate(terminateMessage);
-      closeRecoveredQueue((src));
     }
     LOG.info("Number of deleted recovered sources for " + id + ": "
         + oldSourcesToDelete.size());
     // Now look for the one on this cluster
-    synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding
source
-                                          // for the to-be-removed peer
+    List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
+    // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
+    synchronized (this.replicationPeers) {
       for (ReplicationSourceInterface src : this.sources) {
         if (id.equals(src.getPeerClusterId())) {
           srcToRemove.add(src);
@@ -615,9 +625,12 @@ public class ReplicationSourceManager implements ReplicationListener
{
     private final UUID clusterId;
 
     /**
-     *
      * @param rsZnode
      */
+    public NodeFailoverWorker(String rsZnode) {
+      this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
+    }
+
     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
         final ReplicationPeers replicationPeers, final UUID clusterId) {
       super("Failover-for-"+rsZnode);
@@ -673,6 +686,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           }
           if (peer == null || peerConfig == null) {
             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+            replicationQueues.removeQueue(peerId);
             continue;
           }
           // track sources in walsByIdRecoveredQueues
@@ -692,15 +706,20 @@ public class ReplicationSourceManager implements ReplicationListener
{
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
                 server, peerId, this.clusterId, peerConfig, peer);
-          if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
-            src.terminate("Recovered queue doesn't belong to any current peer");
-            break;
-          }
-          oldsources.add(src);
-          for (String wal : walsSet) {
-            src.enqueueLog(new Path(oldLogDir, wal));
+          // synchronized on oldsources to avoid adding recovered source for the to-be-removed
peer
+          // see removePeer
+          synchronized (oldsources) {
+            if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
+              src.terminate("Recovered queue doesn't belong to any current peer");
+              closeRecoveredQueue(src);
+              continue;
+            }
+            oldsources.add(src);
+            for (String wal : walsSet) {
+              src.enqueueLog(new Path(oldLogDir, wal));
+            }
+            src.startup();
           }
-          src.startup();
         } catch (IOException e) {
           // TODO manage it
           LOG.error("Failed creating a source", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a9b7f3f0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 0bd2208..f11e1e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -84,10 +86,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Sets;
+import org.junit.rules.TestName;
 
 @Category(MediumTests.class)
 public class TestReplicationSourceManager {
@@ -186,15 +188,24 @@ public class TestReplicationSourceManager {
     utility.shutdownMiniCluster();
   }
 
-  @Before
-  public void setUp() throws Exception {
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
     fs.delete(logDir, true);
     fs.delete(oldLogDir, true);
   }
 
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
   @After
   public void tearDown() throws Exception {
-    setUp();
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
   }
 
   @Test
@@ -268,7 +279,6 @@ public class TestReplicationSourceManager {
 
   @Test
   public void testClaimQueues() throws Exception {
-    LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("hostname0.example.org");
     ReplicationQueues rq =
@@ -426,6 +436,28 @@ public class TestReplicationSourceManager {
   }
 
   @Test
+  public void testCleanupUnknownPeerZNode() throws Exception {
+    final Server server = new DummyServer("hostname2.example.org");
+    ReplicationQueues rq =
+        ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
+          server);
+    rq.init(server.getServerName().toString());
+    // populate some znodes in the peer znode
+    // add log to an unknown peer
+    String group = "testgroup";
+    rq.addLog("2", group + ".log1");
+    rq.addLog("2", group + ".log2");
+
+    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
+    w1.run();
+
+    // The log of the unknown peer should be removed from zk
+    for (String peer : manager.getAllQueues()) {
+      assertTrue(peer.startsWith("1"));
+    }
+  }
+
+  @Test
   public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
     // 1. Create wal key
     WALKey logKey = new WALKey();


Mime
View raw message