From commits-return-64598-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Sat Jan 6 04:35:19 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id CA59A180647 for ; Sat, 6 Jan 2018 04:35:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BA215160C3B; Sat, 6 Jan 2018 03:35:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87C61160C27 for ; Sat, 6 Jan 2018 04:35:18 +0100 (CET) Received: (qmail 4032 invoked by uid 500); 6 Jan 2018 03:35:16 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 1875 invoked by uid 99); 6 Jan 2018 03:35:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Jan 2018 03:35:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D5F2F1820; Sat, 6 Jan 2018 03:35:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Sat, 06 Jan 2018 03:35:25 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/48] hbase git commit: HBASE-19707 Race in start and terminate of a replication source after we async start replicatione endpoint HBASE-19707 Race in start and terminate of a replication source after we async start replicatione endpoint Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad2148d2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad2148d2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad2148d2 Branch: refs/heads/HBASE-19397 Commit: ad2148d26b5ae395ecb636d39bfb822cf4215485 Parents: 3269147 Author: zhangduo Authored: Fri Jan 5 18:28:44 2018 +0800 Committer: zhangduo Committed: Sat Jan 6 11:34:00 2018 +0800 ---------------------------------------------------------------------- .../RecoveredReplicationSource.java | 16 +- .../regionserver/ReplicationSource.java | 202 ++++++++++--------- .../replication/TestReplicationAdmin.java | 1 - 3 files changed, 116 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ad2148d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 1be9a88..3cae0f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends ReplicationSource { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(getUncaughtExceptionHandler()); + worker.startup(this::uncaughtException); worker.setWALReader( startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); workerThreads.put(walGroupId, worker); @@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, - String walGroupId, PriorityBlockingQueue queue, long startPosition) { - ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, - conf, queue, startPosition, walEntryFilter, this); - Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - getUncaughtExceptionHandler()); + protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + PriorityBlockingQueue queue, long startPosition) { + ReplicationSourceWALReader walReader = + new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + Threads.setDaemonThreadRunning(walReader, + threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, + this::uncaughtException); return walReader; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad2148d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6b622ee..923d893 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; *

*/ @InterfaceAudience.Private -public class ReplicationSource extends Thread implements ReplicationSourceInterface { +public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, @@ -115,10 +115,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private MetricsSource metrics; // WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; - // whether the replication endpoint has been initialized - private volatile boolean endpointInitialized = false; // ReplicationEndpoint which will handle the actual replication - private ReplicationEndpoint replicationEndpoint; + private volatile ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. protected WALEntryFilter walEntryFilter; // throttler @@ -136,6 +134,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; private int waitOnEndpointSeconds = -1; + private Thread initThread; + /** * Instantiation method used by region servers * @param conf configuration to use @@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queues.put(logPrefix, queue); - if (this.isSourceActive() && this.endpointInitialized) { + if (this.isSourceActive() && this.replicationEndpoint != null) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker @@ -236,28 +236,36 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } - private void initAndStartReplicationEndpoint() throws Exception { + private ReplicationEndpoint createReplicationEndpoint() + throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; - TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); - tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); if (replicationEndpointImpl == null) { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } - replicationEndpoint = - Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); + ReplicationEndpoint replicationEndpoint = + Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); if (rsServerHost != null) { ReplicationEndpoint newReplicationEndPoint = - rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); if (newReplicationEndPoint != null) { // Override the newly created endpoint from the hook with configured end point replicationEndpoint = newReplicationEndPoint; } } + return replicationEndpoint; + } + + private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) + throws IOException, TimeoutException { + TableDescriptors tableDescriptors = null; + if (server instanceof HRegionServer) { + tableDescriptors = ((HRegionServer) server).getTableDescriptors(); + } replicationEndpoint .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); @@ -265,60 +273,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); } - @Override - public void run() { - // mark we are running now - this.sourceRunning = true; - - int sleepMultiplier = 1; - while (this.isSourceActive()) { - try { - initAndStartReplicationEndpoint(); - break; - } catch (Exception e) { - LOG.warn("Error starting ReplicationEndpoint, retrying", e); - if (replicationEndpoint != null) { - replicationEndpoint.stop(); - replicationEndpoint = null; - } - if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - this.endpointInitialized = true; - - sleepMultiplier = 1; - // delay this until we are in an asynchronous thread - while (this.isSourceActive() && this.peerClusterId == null) { - this.peerClusterId = replicationEndpoint.getPeerUUID(); - if (this.isSourceActive() && this.peerClusterId == null) { - if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - - // In rare case, zookeeper setting may be messed up. That leads to the incorrect - // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { - this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId + " which is not allowed by ReplicationEndpoint:" - + replicationEndpoint.getClass().getName(), null, false); - this.manager.removeSource(this); - return; - } - LOG.info("Replicating " + clusterId + " -> " + peerClusterId); - - initializeWALEntryFilter(); - // start workers - for (Map.Entry> entry : queues.entrySet()) { - String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); - tryStartNewShipper(walGroupId, queue); - } - } - private void initializeWALEntryFilter() { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList filters = @@ -332,37 +286,31 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { - final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, - walGroupId, queue, this); + ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(getUncaughtExceptionHandler()); - worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, - worker.getStartPosition())); + worker.startup(this::uncaughtException); + worker.setWALReader( + startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); } } protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, PriorityBlockingQueue queue, long startPosition) { ReplicationSourceWALReader walReader = - new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - getUncaughtExceptionHandler()); + this::uncaughtException); } - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - return new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); - server.stop("Unexpected exception in " + t.getName()); - } - }; + protected final void uncaughtException(Thread t, Throwable e) { + RSRpcServices.exitIfOOME(e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + server.abort("Unexpected exception in " + t.getName(), e); } @Override @@ -435,17 +383,76 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return replicationPeer.isPeerEnabled(); } + private void initialize() { + int sleepMultiplier = 1; + while (this.isSourceActive()) { + ReplicationEndpoint replicationEndpoint; + try { + replicationEndpoint = createReplicationEndpoint(); + } catch (Exception e) { + LOG.warn("error creating ReplicationEndpoint, retry", e); + if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + + try { + initAndStartReplicationEndpoint(replicationEndpoint); + this.replicationEndpoint = replicationEndpoint; + break; + } catch (Exception e) { + LOG.warn("Error starting ReplicationEndpoint, retry", e); + replicationEndpoint.stop(); + if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + + if (!this.isSourceActive()) { + return; + } + + sleepMultiplier = 1; + // delay this until we are in an asynchronous thread + while (this.isSourceActive() && this.peerClusterId == null) { + this.peerClusterId = replicationEndpoint.getPeerUUID(); + if (this.isSourceActive() && this.peerClusterId == null) { + if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + + // In rare case, zookeeper setting may be messed up. That leads to the incorrect + // peerClusterId value, which is the same as the source clusterId + if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { + this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + + peerClusterId + " which is not allowed by ReplicationEndpoint:" + + replicationEndpoint.getClass().getName(), null, false); + this.manager.removeSource(this); + return; + } + LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + + initializeWALEntryFilter(); + // start workers + for (Map.Entry> entry : queues.entrySet()) { + String walGroupId = entry.getKey(); + PriorityBlockingQueue queue = entry.getValue(); + tryStartNewShipper(walGroupId, queue); + } + } + @Override public void startup() { - String n = Thread.currentThread().getName(); - Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.error("Unexpected exception in ReplicationSource", e); - } - }; - Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId, - handler); + // mark we are running now + this.sourceRunning = true; + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + this::uncaughtException); } @Override @@ -466,6 +473,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf cause); } this.sourceRunning = false; + if (initThread != null && Thread.currentThread() != initThread) { + // This usually won't happen but anyway, let's wait until the initialization thread exits. + // And notice that we may call terminate directly from the initThread so here we need to + // avoid join on ourselves. + initThread.interrupt(); + Threads.shutdown(initThread, this.sleepForRetries); + } Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); @@ -482,11 +496,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } if (this.replicationEndpoint != null) { try { - this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); + this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, + TimeUnit.MILLISECONDS); } catch (TimeoutException te) { - LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" - + this.queueId, - te); + LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + + this.queueId, te); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad2148d2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 89cf393..a6091e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants;