hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [27/41] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
Date Thu, 11 Jan 2018 05:52:17 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 853bafb..24a4f30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -166,7 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.clusterId = clusterId;
     this.walFileLengthProvider = walFileLengthProvider;
     this.replicationTracker.registerListener(this);
-    this.replicationPeers.getAllPeerIds();
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -270,8 +269,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
     List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
         .map(ServerName::valueOf).collect(Collectors.toList());
-    LOG.info(
-      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
+    LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+        + otherRegionServers);
 
     // Look if there's anything to process after a restart
     for (ServerName rs : currentReplicators) {
@@ -288,7 +287,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * The returned future is for adoptAbandonedQueues task.
    */
   Future<?> init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getConnectedPeerIds()) {
+    for (String id : this.replicationPeers.getAllPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the
case
@@ -307,8 +306,8 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException
{
-    ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
-    ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
+    ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
+    ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
     synchronized (this.walsById) {
       this.sources.add(src);
@@ -354,7 +353,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
     if (closeConnection) {
-      this.replicationPeers.peerDisconnected(peerId);
+      this.replicationPeers.removePeer(peerId);
     }
   }
 
@@ -445,12 +444,12 @@ public class ReplicationSourceManager implements ReplicationListener
{
     // update replication queues on ZK
     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
     synchronized (replicationPeers) {
-      for (String id : replicationPeers.getConnectedPeerIds()) {
+      for (String id : replicationPeers.getAllPeerIds()) {
         try {
           this.queueStorage.addWAL(server.getServerName(), id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue" +
-            " when creating a new source, queueId=" + id + ", filename=" + logName, e);
+          throw new IOException("Cannot add log to replication queue"
+              + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
         }
       }
     }
@@ -593,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   public void addPeer(String id) throws ReplicationException, IOException {
     LOG.info("Trying to add peer, peerId: " + id);
-    boolean added = this.replicationPeers.peerConnected(id);
+    boolean added = this.replicationPeers.addPeer(id);
     if (added) {
       LOG.info("Peer " + id + " connected success, trying to start the replication source
thread.");
       addSource(id);
@@ -729,19 +728,25 @@ public class ReplicationSourceManager implements ReplicationListener
{
           // there is not an actual peer defined corresponding to peerId for the failover.
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
           String actualPeerId = replicationQueueInfo.getPeerId();
-          ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
+
+          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          if (peer == null) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+                + ", peer is null");
+            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
+            continue;
+          }
+
           ReplicationPeerConfig peerConfig = null;
           try {
-            peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
-          } catch (ReplicationException ex) {
-            LOG.warn("Received exception while getting replication peer config, skipping
replay"
-                + ex);
-          }
-          if (peer == null || peerConfig == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
+            peerConfig = replicationPeers.getPeerConfig(actualPeerId);
+          } catch (Exception e) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+                + ", failed to read peer config", e);
             abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
+
           // track sources in walsByIdRecoveredQueues
           Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
           walsByIdRecoveredQueues.put(peerId, walsByGroup);
@@ -760,7 +765,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed
peer
           // see removePeer
           synchronized (oldsources) {
-            if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
+            if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current peer");
               closeRecoveredQueue(src);
               continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 8802e36..905ade4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -87,7 +87,7 @@ public class TestReplicationHFileCleaner {
     server = new DummyServer();
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     Replication.decorateMasterConfiguration(conf);
-    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
     rp.init();
     rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
     fs = FileSystem.get(conf);
@@ -101,7 +101,8 @@ public class TestReplicationHFileCleaner {
   @Before
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
-    rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+    rp.getPeerStorage().addPeer(peerId,
+      ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(),
true);
     rq.addPeerToHFileRefs(peerId);
   }
 
@@ -112,7 +113,7 @@ public class TestReplicationHFileCleaner {
     } catch (IOException e) {
       LOG.warn("Failed to delete files recursively from path " + root);
     }
-    rp.unregisterPeer(peerId);
+    rp.getPeerStorage().removePeer(peerId);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index c57d9bb..ca4369e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index f118ca3..fdfa6b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -73,10 +72,6 @@ public class TestReplicationTrackerZKImpl {
   private ReplicationTracker rt;
   private AtomicInteger rsRemovedCount;
   private String rsRemovedData;
-  private AtomicInteger plChangedCount;
-  private List<String> plChangedData;
-  private AtomicInteger peerRemovedCount;
-  private String peerRemovedData;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -93,7 +88,7 @@ public class TestReplicationTrackerZKImpl {
     String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
     try {
       ZKClusterId.setClusterId(zkw, new ClusterId());
-      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
       rp.init();
       rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
     } catch (Exception e) {
@@ -101,10 +96,6 @@ public class TestReplicationTrackerZKImpl {
     }
     rsRemovedCount = new AtomicInteger(0);
     rsRemovedData = "";
-    plChangedCount = new AtomicInteger(0);
-    plChangedData = new ArrayList<>();
-    peerRemovedCount = new AtomicInteger(0);
-    peerRemovedData = "";
   }
 
   @AfterClass
@@ -157,25 +148,22 @@ public class TestReplicationTrackerZKImpl {
   @Test(timeout = 30000)
   public void testPeerNameControl() throws Exception {
     int exists = 0;
-    int hyphen = 0;
-    rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.getPeerStorage().addPeer("6",
+      ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
true);
 
-    try{
-      rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      exists++;
+    try {
+      rp.getPeerStorage().addPeer("6",
+        ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
true);
+    } catch (ReplicationException e) {
+      if (e.getCause() instanceof KeeperException.NodeExistsException) {
+        exists++;
+      }
     }
 
-    try{
-      rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      hyphen++;
-    }
     assertEquals(1, exists);
-    assertEquals(1, hyphen);
 
     // clean up
-    rp.unregisterPeer("6");
+    rp.getPeerStorage().removePeer("6");
   }
 
   private class DummyReplicationListener implements ReplicationListener {

http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f6724df..4c737e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -380,7 +380,7 @@ public abstract class TestReplicationSourceManager {
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationPeers rp1 =
-        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(),
s1);
+        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
     rp1.init();
     NodeFailoverWorker w1 =
         manager.new NodeFailoverWorker(server.getServerName());
@@ -561,7 +561,7 @@ public abstract class TestReplicationSourceManager {
   private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
       final boolean waitForSource) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    rp.registerPeer(peerId, peerConfig);
+    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
     try {
       manager.addPeer(peerId);
     } catch (Exception e) {
@@ -588,7 +588,7 @@ public abstract class TestReplicationSourceManager {
         }
         return true;
       } else {
-        return (rp.getConnectedPeer(peerId) != null);
+        return (rp.getPeer(peerId) != null);
       }
     });
   }
@@ -600,8 +600,8 @@ public abstract class TestReplicationSourceManager {
    */
   private void removePeerAndWait(final String peerId) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    if (rp.getAllPeerIds().contains(peerId)) {
-      rp.unregisterPeer(peerId);
+    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
+      rp.getPeerStorage().removePeer(peerId);
       try {
         manager.removePeer(peerId);
       } catch (Exception e) {
@@ -611,10 +611,9 @@ public abstract class TestReplicationSourceManager {
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
-        List<String> peers = rp.getAllPeerIds();
-        return (!manager.getAllQueues().contains(peerId)) &&
-          (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
-          manager.getSource(peerId) == null;
+        Collection<String> peers = rp.getPeerStorage().listPeerIds();
+        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId)
== null)
+            && (!peers.contains(peerId)) && manager.getSource(peerId) ==
null;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/38950974/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
index fc31c37..b755c32 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
@@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility {
   /**
    * Gets a ZKWatcher.
    */
-  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil)
-      throws ZooKeeperConnectionException, IOException {
+  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException
{
     ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable()
{
       boolean aborted = false;
 


Mime
View raw message