From commits-return-17-archive-asf-public=cust-asf.ponee.io@druid.apache.org Wed Jul 25 05:57:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B7D09180626 for ; Wed, 25 Jul 2018 05:57:33 +0200 (CEST) Received: (qmail 55305 invoked by uid 500); 25 Jul 2018 03:57:32 -0000 Mailing-List: contact commits-help@druid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@druid.apache.org Delivered-To: mailing list commits@druid.apache.org Received: (qmail 55296 invoked by uid 99); 25 Jul 2018 03:57:32 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jul 2018 03:57:32 +0000 From: GitBox To: commits@druid.apache.org Subject: [GitHub] leventov closed pull request #6041: Synchronize scheduled poll() calls in SQLMetadataSegmentManager Message-ID: <153249105211.1467.2103300265872260848.gitbox@gitbox.apache.org> Date: Wed, 25 Jul 2018 03:57:32 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit leventov closed pull request #6041: Synchronize scheduled poll() calls in SQLMetadataSegmentManager URL: https://github.com/apache/incubator-druid/pull/6041 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java index 6900dff018c..4bbb0fa9986 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java @@ -148,10 +148,12 @@ public Void withHandle(Handle handle) throws Exception * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly. + * + * {@link SQLMetadataSegmentManager} also have a similar issue. */ private long currentStartOrder = -1; private ScheduledExecutorService exec = null; - private long retryStartTime = 0; + private long failStartTimeMs = 0; @Inject public SQLMetadataRuleManager( @@ -311,17 +313,17 @@ public void poll() log.info("Polled and found rules for %,d datasource(s)", newRules.size()); rules.set(newRules); - retryStartTime = 0; + failStartTimeMs = 0; } catch (Exception e) { - if (retryStartTime == 0) { - retryStartTime = System.currentTimeMillis(); + if (failStartTimeMs == 0) { + failStartTimeMs = System.currentTimeMillis(); } - if (System.currentTimeMillis() - retryStartTime > config.getAlertThreshold().toStandardDuration().getMillis()) { + if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis()) { log.makeAlert(e, "Exception while polling for rules") .emit(); - retryStartTime = 0; + failStartTimeMs = 0; } else { log.error(e, "Exception while polling for rules"); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 7d5a7e6c416..4a83125e1eb 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -29,9 +29,6 @@ import com.google.common.collect.Interners; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.druid.client.DruidDataSource; import io.druid.client.ImmutableDruidDataSource; @@ -73,8 +70,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -85,10 +85,16 @@ private static final Interner DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class); - // Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the - // same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops - // leadership repeatedly in quick succession. - private final Object lock = new Object(); + /** + * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods + * should be synchronized to prevent from being called at the same time if two different threads are calling them. + * This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession. + */ + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + /** {@link #poll()} and {@link #isStarted()} use readLock. */ + private final Lock readLock = readWriteLock.readLock(); + /** {@link #start()} and {@link #stop()} use writeLock. */ + private final Lock writeLock = readWriteLock.writeLock(); private final ObjectMapper jsonMapper; private final Supplier config; @@ -96,9 +102,21 @@ private final AtomicReference> dataSourcesRef; private final SQLMetadataConnector connector; - private volatile ListeningScheduledExecutorService exec = null; - private volatile ListenableFuture future = null; - private volatile boolean started; + /** The number of times this SQLMetadataSegmentManager was started. */ + private long startCount = 0; + /** + * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if + * currently stopped. + * + * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent + * the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and + * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions + * occurs quickly. + * + * {@link SQLMetadataRuleManager} also have a similar issue. + */ + private long currentStartOrder = -1; + private ScheduledExecutorService exec = null; @Inject public SQLMetadataSegmentManager( @@ -119,34 +137,52 @@ public SQLMetadataSegmentManager( @LifecycleStart public void start() { - synchronized (lock) { - if (started) { + writeLock.lock(); + try { + if (isStarted()) { return; } - exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d")); + startCount++; + currentStartOrder = startCount; + final long localStartOrder = currentStartOrder; + + exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"); final Duration delay = config.get().getPollDuration().toStandardDuration(); - future = exec.scheduleWithFixedDelay( + exec.scheduleWithFixedDelay( new Runnable() { @Override public void run() { + // poll() is synchronized together with start(), stop() and isStarted() to ensure that when stop() exists, + // poll() won't actually run anymore after that (it could only enter the syncrhonized section and exit + // immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed + // to avoid flakiness in SQLMetadataSegmentManagerTest. + // See https://github.com/apache/incubator-druid/issues/6028 + readLock.lock(); try { - poll(); + if (localStartOrder == currentStartOrder) { + poll(); + } } catch (Exception e) { log.makeAlert(e, "uncaught exception in segment manager polling thread").emit(); } + finally { + readLock.unlock(); + } } }, 0, delay.getMillis(), TimeUnit.MILLISECONDS ); - started = true; + } + finally { + writeLock.unlock(); } } @@ -154,8 +190,9 @@ public void run() @LifecycleStop public void stop() { - synchronized (lock) { - if (!started) { + writeLock.lock(); + try { + if (!isStarted()) { return; } @@ -165,11 +202,12 @@ public void stop() current = dataSourcesRef.get(); } while (!dataSourcesRef.compareAndSet(current, emptyMap)); - future.cancel(false); - future = null; + currentStartOrder = -1; exec.shutdownNow(); exec = null; - started = false; + } + finally { + writeLock.unlock(); } } @@ -340,7 +378,15 @@ public boolean removeSegment(String ds, final String segmentID) @Override public boolean isStarted() { - return started; + // isStarted() is synchronized together with start(), stop() and poll() to ensure that the latest currentStartOrder + // is always visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator. + readLock.lock(); + try { + return currentStartOrder >= 0; + } + finally { + readLock.unlock(); + } } @Override @@ -394,10 +440,6 @@ public ImmutableDruidDataSource getInventoryValue(String key) public void poll() { try { - if (!started) { - return; - } - ConcurrentHashMap newDataSources = new ConcurrentHashMap<>(); log.debug("Starting polling of segment table"); diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java index 281838f3ccf..7e630359db2 100644 --- a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -118,6 +118,7 @@ public void testPoll() { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertEquals( ImmutableList.of("wikipedia"), manager.getAllDatasourceNames() @@ -149,6 +150,7 @@ public void testPollWithCurroptedSegment() EmittingLogger.registerEmitter(new NoopServiceEmitter()); manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertEquals( "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName() @@ -160,6 +162,7 @@ public void testGetUnusedSegmentsForInterval() { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); Assert.assertTrue(manager.removeDatasource("wikipedia")); Assert.assertEquals( @@ -178,6 +181,7 @@ public void testRemoveDataSource() throws IOException { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); final String newDataSource = "wikipedia2"; final DataSegment newSegment = new DataSegment( @@ -207,6 +211,7 @@ public void testRemoveDataSegment() throws IOException { manager.start(); manager.poll(); + Assert.assertTrue(manager.isStarted()); final String newDataSource = "wikipedia2"; final DataSegment newSegment = new DataSegment( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org For additional commands, e-mail: commits-help@druid.apache.org