hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [2/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers
Date Wed, 21 Dec 2016 02:39:42 GMT
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f8474c8d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f8474c8d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f8474c8d

Branch: refs/heads/master
Commit: f8474c8d4d3e722aa0129e085f6a5287c5e2be89
Parents: 06b67a6
Author: Vincent <vincent.poon@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Dec 20 18:03:11 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 ++++++
 .../replication/TestMasterReplication.java      | 46 +++++++++++++++++++-
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a6fe0fb..c988f87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId
"
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index fa6f894..2c9fdcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -539,6 +539,20 @@ public class ReplicationSourceManager implements ReplicationListener
{
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 5b8538b..7ac5e94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,7 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -69,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -169,6 +174,43 @@ public class TestMasterReplication {
   }
 
   /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.getZNodePaths().baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+  /**
    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading
a set of
    * HFiles to a table in each cluster, checking if it's replicated.
    */
@@ -332,7 +374,7 @@ public class TestMasterReplication {
       shutDownMiniClusters();
     }
   }
-  
+
   /**
    * It tests the bulk loaded hfile replication scenario to only explicitly specified table
column
    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table
and set
@@ -483,7 +525,7 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
-  
+
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String
tableCfs)
       throws Exception {
     ReplicationAdmin replicationAdmin = null;


Mime
View raw message