Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE2AA200CB1 for ; Sat, 10 Jun 2017 04:54:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ACBE7160BD4; Sat, 10 Jun 2017 02:54:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7EBA2160BCA for ; Sat, 10 Jun 2017 04:54:24 +0200 (CEST) Received: (qmail 88483 invoked by uid 500); 10 Jun 2017 02:54:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 88474 invoked by uid 99); 10 Jun 2017 02:54:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Jun 2017 02:54:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7879FDFF81; Sat, 10 Jun 2017 02:54:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18192: Replication drops recovered queues on region server shutdown Date: Sat, 10 Jun 2017 02:54:23 +0000 (UTC) archived-at: Sat, 10 Jun 2017 02:54:25 -0000 Repository: hbase Updated Branches: refs/heads/branch-1 961337aad -> 6e3da5a39 HBASE-18192: Replication drops recovered queues on region server shutdown Signed-off-by: tedyu Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e3da5a3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e3da5a3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e3da5a3 Branch: refs/heads/branch-1 Commit: 6e3da5a39a21c75de5d0dff9edbe767232a20310 Parents: 961337a Author: Ashu Pachauri Authored: Fri Jun 9 14:36:45 2017 -0700 Committer: tedyu Committed: Fri Jun 9 19:54:17 2017 -0700 ---------------------------------------------------------------------- .../regionserver/ReplicationSource.java | 47 +++++-- .../replication/TestReplicationSource.java | 126 ++++++++++++++++++- 2 files changed, 162 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6e3da5a3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 65ea422..6954ea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -143,6 +143,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); + // Hold the state of a replication worker thread + public enum WorkerState { + RUNNING, + STOPPED, + FINISHED // The worker is done processing a recovered queue + } + private AtomicLong totalBufferUsed; /** @@ -399,7 +406,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.sourceRunning = false; Collection workers = workerThreads.values(); for (ReplicationSourceShipperThread worker : workers) { - worker.setWorkerRunning(false); + worker.setWorkerState(WorkerState.STOPPED); worker.entryReader.interrupt(); worker.interrupt(); } @@ -513,8 +520,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private long lastLoggedPosition = -1; // Path of the current log private volatile Path currentPath; - // Indicates whether this particular worker is running - private boolean workerRunning = true; + // Current state of the worker thread + private WorkerState state; ReplicationSourceWALReaderThread entryReader; // Use guava cache to set ttl for each key private LoadingCache canSkipWaitingSet = CacheBuilder.newBuilder() @@ -538,6 +545,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void run() { + setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isWorkerActive()) { int sleepMultiplier = 1; @@ -570,7 +578,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + peerClusterZnode); metrics.incrCompletedRecoveryQueue(); - setWorkerRunning(false); + setWorkerState(WorkerState.FINISHED); continue; } } catch (InterruptedException e) { @@ -579,13 +587,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } - if (replicationQueueInfo.isQueueRecovered()) { + if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit boolean allOtherTaskDone = true; for (ReplicationSourceShipperThread worker : workerThreads.values()) { - if (!worker.equals(this) && worker.isAlive()) { + if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) { allOtherTaskDone = false; break; } @@ -597,6 +605,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } } + // If the worker exits run loop without finishing it's task, mark it as stopped. + if (state != WorkerState.FINISHED) { + setWorkerState(WorkerState.STOPPED); + } } private void waitingUntilCanPush(Map.Entry entry) { @@ -927,7 +939,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } private boolean isWorkerActive() { - return !stopper.isStopped() && workerRunning && !isInterrupted(); + return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted(); } private void terminate(String reason, Exception cause) { @@ -940,14 +952,29 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } entryReader.interrupt(); Threads.shutdown(entryReader, sleepForRetries); + setWorkerState(WorkerState.STOPPED); this.interrupt(); Threads.shutdown(this, sleepForRetries); LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); } - public void setWorkerRunning(boolean workerRunning) { - entryReader.setReaderRunning(workerRunning); - this.workerRunning = workerRunning; + /** + * Set the worker state + * @param state + */ + public void setWorkerState(WorkerState state) { + this.state = state; + if (entryReader != null) { + entryReader.setReaderRunning(state == WorkerState.RUNNING); + } + } + + /** + * Get the current state of this worker. + * @return WorkerState + */ + public WorkerState getWorkerState() { + return state; } private void releaseBufferQuota(int size) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6e3da5a3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index f7e644f..6e6fe9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; @@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.wal.WAL; @@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplica import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -63,10 +71,12 @@ public class TestReplicationSource { LogFactory.getLog(TestReplicationSource.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static HBaseTestingUtility TEST_UTIL_PEER = + new HBaseTestingUtility(); private static FileSystem FS; private static Path oldLogDir; private static Path logDir; - private static Configuration conf = HBaseConfiguration.create(); + private static Configuration conf = TEST_UTIL.getConfiguration(); /** * @throws java.lang.Exception @@ -82,6 +92,13 @@ public class TestReplicationSource { if (FS.exists(logDir)) FS.delete(logDir, true); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + /** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard @@ -172,5 +189,112 @@ public class TestReplicationSource { } + /** + * Tests that recovered queues are preserved on a regionserver shutdown. + * See HBASE-18192 + * @throws Exception + */ + @Test + public void testServerShutdownRecoveredQueue() throws Exception { + try { + // Ensure single-threaded WAL + conf.set("hbase.wal.provider", "defaultProvider"); + conf.setInt("replication.sleep.before.failover", 2000); + // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. + conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); + TEST_UTIL_PEER.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + HRegionServer serverB = cluster.getRegionServer(1); + final ReplicationSourceManager managerB = + ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); + final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); + + final String peerId = "TestPeer"; + replicationAdmin.addPeer(peerId, + new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null); + // Wait for replication sources to come up + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); + } + }); + // Disabling peer makes sure there is at least one log to claim when the server dies + // The recovered queue will also stay there until the peer is disabled even if the + // WALs it contains have no data. + replicationAdmin.disablePeer(peerId); + + // Stopping serverA + // It's queues should be claimed by the only other alive server i.e. serverB + cluster.stopRegionServer(serverA.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerB.getOldSources().size() == 1; + } + }); + + final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); + serverC.waitForServerOnline(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return serverC.getReplicationSourceService() != null; + } + }); + final ReplicationSourceManager managerC = + ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); + // Sanity check + assertEquals(0, managerC.getOldSources().size()); + + // Stopping serverB + // Now serverC should have two recovered queues: + // 1. The serverB's normal queue + // 2. serverA's recovered queue on serverB + cluster.stopRegionServer(serverB.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 2; + } + }); + replicationAdmin.enablePeer(peerId); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 0; + } + }); + } finally { + conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); + } + } + + /** + * Regionserver implementation that adds a delay on the graceful shutdown. + */ + public static class ShutdownDelayRegionServer extends HRegionServer { + public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException { + super(conf, csm); + } + + @Override + protected void stopServiceThreads() { + // Add a delay before service threads are shutdown. + // This will keep the zookeeper connection alive for the duration of the delay. + LOG.info("Adding a delay to the regionserver shutdown"); + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted while sleeping"); + } + super.stopServiceThreads(); + } + } + }