hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-16144 Replication queue's lock will live forever if RS acquiring the lock has died prematurely
Date Mon, 18 Jul 2016 01:55:34 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 c8ddb32a8 -> 4f5774e8d


HBASE-16144 Replication queue's lock will live forever if RS acquiring the lock has died prematurely


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4f5774e8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4f5774e8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4f5774e8

Branch: refs/heads/branch-1.1
Commit: 4f5774e8ddfd177059afa67a1088afbf8b95c8dc
Parents: c8ddb32
Author: Phil Yang <ud1937@gmail.com>
Authored: Wed Jul 6 17:26:57 2016 +0800
Committer: Phil Yang <ud1937@gmail.com>
Committed: Mon Jul 18 09:45:19 2016 +0800

----------------------------------------------------------------------
 .../replication/ReplicationQueuesZKImpl.java    |  13 ++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  12 ++
 .../cleaner/ReplicationZKLockCleanerChore.java  | 112 +++++++++++++++++++
 .../replication/TestMultiSlaveReplication.java  |  46 ++++++++
 4 files changed, 182 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4f5774e8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 382bb69..92a67f8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -25,6 +25,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -224,7 +225,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements
R
    * @param znode the server names of the other server
    * @return true if the lock was acquired, false in every other cases
    */
-  private boolean lockOtherRS(String znode) {
+  @VisibleForTesting
+  public boolean lockOtherRS(String znode) {
     try {
       String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
       if (parent.equals(this.myQueuesZnode)) {
@@ -251,6 +253,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements
R
     return true;
   }
 
+  public String getLockZNode(String znode) {
+    return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
+  }
+
+  @VisibleForTesting
+  public boolean checkLockExists(String znode) throws KeeperException {
+    return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
+  }
+
   /**
    * Delete all the replication queues for a given region server.
    * @param regionserverZnode The znode of the region server to delete.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f5774e8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ee9c052..4a748b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
@@ -287,6 +288,7 @@ public class HMaster extends HRegionServer implements MasterServices,
Server {
   private ClusterStatusPublisher clusterStatusPublisherChore = null;
 
   CatalogJanitor catalogJanitorChore;
+  private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
 
@@ -1109,6 +1111,15 @@ public class HMaster extends HRegionServer implements MasterServices,
Server {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Started service threads");
     }
+    if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
+      try {
+        replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
+            cleanerInterval, this.getZooKeeper(), this.conf);
+        getChoreService().scheduleChore(replicationZKLockCleanerChore);
+      } catch (Exception e) {
+        LOG.error("start replicationZKLockCleanerChore failed", e);
+      }
+    }
   }
 
   @Override
@@ -1142,6 +1153,7 @@ public class HMaster extends HRegionServer implements MasterServices,
Server {
     // Clean up and close up shop
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
+    if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f5774e8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
new file mode 100644
index 0000000..dc5338e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * A cleaner that cleans replication locks on zk which is locked by dead region servers
+ */
+@InterfaceAudience.Private
+public class ReplicationZKLockCleanerChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
+  private ZooKeeperWatcher zk;
+  private ReplicationTracker tracker;
+  private long ttl;
+  private ReplicationQueuesZKImpl queues;
+
+  // Wait some times before delete lock to prevent a session expired RS not dead fully.
+  private static final long DEFAULT_TTL = 60 * 10 * 1000;//10 min
+
+  @VisibleForTesting
+  public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";
+
+  public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
+      ZooKeeperWatcher zk, Configuration conf) throws Exception {
+    super("ReplicationZKLockCleanerChore", stopper, period);
+
+    this.zk = zk;
+    this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
+    tracker = ReplicationFactory.getReplicationTracker(zk,
+        ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
+    queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
+  }
+
+  @Override protected void chore() {
+    try {
+      List<String> regionServers = tracker.getListOfRegionServers();
+      if (regionServers == null) {
+        return;
+      }
+      Set<String> rsSet = new HashSet<String>(regionServers);
+      List<String> replicators = queues.getListOfReplicators();
+
+      for (String replicator: replicators) {
+        try {
+          String lockNode = queues.getLockZNode(replicator);
+          byte[] data = ZKUtil.getData(zk, lockNode);
+          if (data == null) {
+            continue;
+          }
+          String rsServerNameZnode = Bytes.toString(data);
+          String[] array = rsServerNameZnode.split("/");
+          String znode = array[array.length - 1];
+          if (!rsSet.contains(znode)) {
+            Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
+            if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime()
> this.ttl) {
+              // server is dead, but lock is still there, we have to delete the lock.
+              ZKUtil.deleteNode(zk, lockNode);
+              LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
+            }
+            continue;
+          }
+          LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);
+
+        } catch (KeeperException.NoNodeException ignore) {
+        } catch (InterruptedException e) {
+          LOG.warn("zk operation interrupted", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    } catch (KeeperException e) {
+      LOG.warn("zk operation interrupted", e);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f5774e8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index ae38ec6..6cb946b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
@@ -43,12 +46,15 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -94,10 +100,17 @@ public class TestMultiSlaveReplication {
     conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
     conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
         "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
+    conf1.setBoolean(HConstants.ZOOKEEPER_USEMULTI , false);// for testZKLockCleaner
+    conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
+    conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
+        ReplicationQueuesZKImpl.class, ReplicationQueues.class);
+    conf1.setLong(ReplicationZKLockCleanerChore.TTL_CONFIG_KEY, 0L);
+
 
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    utility1.setZkCluster(miniZK);
     new ZooKeeperWatcher(conf1, "cluster1", null, true);
 
     conf2 = new Configuration(conf1);
@@ -196,6 +209,39 @@ public class TestMultiSlaveReplication {
     utility1.shutdownMiniCluster();
   }
 
+  @Test
+  public void testZKLockCleaner() throws Exception {
+    MiniHBaseCluster cluster = utility1.startMiniCluster(1, 2);
+    HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("zk")));
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+    new HBaseAdmin(conf1).createTable(table);
+    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    replicationAdmin.addPeer("cluster2", rpc, null);
+    HRegionServer rs = cluster.getRegionServer(0);
+    ReplicationQueuesZKImpl zk = new ReplicationQueuesZKImpl(rs.getZooKeeper(), conf1, rs);
+    zk.init(rs.getServerName().toString());
+    List<String> replicators = zk.getListOfReplicators();
+    assertEquals(2, replicators.size());
+    String zNode = cluster.getRegionServer(1).getServerName().toString();
+
+    assertTrue(zk.lockOtherRS(zNode));
+    assertTrue(zk.checkLockExists(zNode));
+    Thread.sleep(10000);
+    assertTrue(zk.checkLockExists(zNode));
+    cluster.abortRegionServer(0);
+    Thread.sleep(10000);
+    HRegionServer rs1 = cluster.getRegionServer(1);
+    zk = new ReplicationQueuesZKImpl(rs1.getZooKeeper(), conf1, rs1);
+    zk.init(rs1.getServerName().toString());
+    assertFalse(zk.checkLockExists(zNode));
+
+    utility1.shutdownMiniCluster();
+  }
+
   private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
       final byte[] row) throws IOException {
     final Admin admin = utility.getHBaseAdmin();


Mime
View raw message