hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [1/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers
Date Wed, 21 Dec 2016 02:39:41 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 f63b5a0db -> 5ea953b01
  refs/heads/branch-1 33002bd8e -> e79afbf0c
  refs/heads/branch-1.1 aa47da890 -> d3ffdf6e9
  refs/heads/branch-1.2 b8822633b -> f276edfad
  refs/heads/branch-1.3 c43d759d9 -> 7b3187c1a
  refs/heads/master 06b67a632 -> f8474c8d4


HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <apurtell@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e79afbf0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e79afbf0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e79afbf0

Branch: refs/heads/branch-1
Commit: e79afbf0cbca95ed4dad67ef83d9755c86629a85
Parents: 33002bd
Author: Vincent <vincent.poon@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Dec 20 18:01:33 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 ++++++
 .../replication/TestMasterReplication.java      | 48 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 63549d0..81b39a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId
"
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b31cc54..cdc6fce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -550,6 +550,20 @@ public class ReplicationSourceManager implements ReplicationListener
{
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 2a1ef6a..a8af946 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,8 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -65,9 +68,11 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -171,6 +176,43 @@ public class TestMasterReplication {
   }
 
   /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+  /**
    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading
a set of
    * HFiles to a table in each cluster, checking if it's replicated.
    */
@@ -334,7 +376,7 @@ public class TestMasterReplication {
       shutDownMiniClusters();
     }
   }
-  
+
   /**
    * It tests the bulk loaded hfile replication scenario to only explicitly specified table
column
    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table
and set
@@ -515,7 +557,7 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
-  
+
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String
tableCfs)
       throws Exception {
     ReplicationAdmin replicationAdmin = null;


Mime
View raw message