druid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] leventov closed pull request #6041: Synchronize scheduled poll() calls in SQLMetadataSegmentManager
Date Wed, 25 Jul 2018 03:57:32 GMT
leventov closed pull request #6041: Synchronize scheduled poll() calls in SQLMetadataSegmentManager
URL: https://github.com/apache/incubator-druid/pull/6041
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
index 6900dff018c..4bbb0fa9986 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
@@ -148,10 +148,12 @@ public Void withHandle(Handle handle) throws Exception
    * This field is used to implement a simple stamp mechanism instead of just a boolean "started"
flag to prevent
    * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link
#poll()} concurrently, if
    * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs
quickly.
+   *
+   * {@link SQLMetadataSegmentManager} also have a similar issue.
    */
   private long currentStartOrder = -1;
   private ScheduledExecutorService exec = null;
-  private long retryStartTime = 0;
+  private long failStartTimeMs = 0;
 
   @Inject
   public SQLMetadataRuleManager(
@@ -311,17 +313,17 @@ public void poll()
       log.info("Polled and found rules for %,d datasource(s)", newRules.size());
 
       rules.set(newRules);
-      retryStartTime = 0;
+      failStartTimeMs = 0;
     }
     catch (Exception e) {
-      if (retryStartTime == 0) {
-        retryStartTime = System.currentTimeMillis();
+      if (failStartTimeMs == 0) {
+        failStartTimeMs = System.currentTimeMillis();
       }
 
-      if (System.currentTimeMillis() - retryStartTime > config.getAlertThreshold().toStandardDuration().getMillis())
{
+      if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis())
{
         log.makeAlert(e, "Exception while polling for rules")
            .emit();
-        retryStartTime = 0;
+        failStartTimeMs = 0;
       } else {
         log.error(e, "Exception while polling for rules");
       }
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
index 7d5a7e6c416..4a83125e1eb 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
@@ -29,9 +29,6 @@
 import com.google.common.collect.Interners;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import io.druid.client.DruidDataSource;
 import io.druid.client.ImmutableDruidDataSource;
@@ -73,8 +70,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -85,10 +85,16 @@
   private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
   private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
 
-  // Use to synchronize start() and stop(). These methods should be synchronized to prevent
from being called at the
-  // same time if two different threads are calling them. This might be possible if a druid
coordinator gets and drops
-  // leadership repeatedly in quick succession.
-  private final Object lock = new Object();
+  /**
+   * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}.
These methods
+   * should be synchronized to prevent from being called at the same time if two different
threads are calling them.
+   * This might be possible if a druid coordinator gets and drops leadership repeatedly in
quick succession.
+   */
+  private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  /** {@link #poll()} and {@link #isStarted()} use readLock. */
+  private final Lock readLock = readWriteLock.readLock();
+  /** {@link #start()} and {@link #stop()} use writeLock. */
+  private final Lock writeLock = readWriteLock.writeLock();
 
   private final ObjectMapper jsonMapper;
   private final Supplier<MetadataSegmentManagerConfig> config;
@@ -96,9 +102,21 @@
   private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef;
   private final SQLMetadataConnector connector;
 
-  private volatile ListeningScheduledExecutorService exec = null;
-  private volatile ListenableFuture<?> future = null;
-  private volatile boolean started;
+  /** The number of times this SQLMetadataSegmentManager was started. */
+  private long startCount = 0;
+  /**
+   * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is
currently started; -1 if
+   * currently stopped.
+   *
+   * This field is used to implement a simple stamp mechanism instead of just a boolean "started"
flag to prevent
+   * the theoretical situation of two or more tasks scheduled in {@link #start()} calling
{@link #isStarted()} and
+   * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()}
- {@link #start()} actions
+   * occurs quickly.
+   *
+   * {@link SQLMetadataRuleManager} also have a similar issue.
+   */
+  private long currentStartOrder = -1;
+  private ScheduledExecutorService exec = null;
 
   @Inject
   public SQLMetadataSegmentManager(
@@ -119,34 +137,52 @@ public SQLMetadataSegmentManager(
   @LifecycleStart
   public void start()
   {
-    synchronized (lock) {
-      if (started) {
+    writeLock.lock();
+    try {
+      if (isStarted()) {
         return;
       }
 
-      exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
+      startCount++;
+      currentStartOrder = startCount;
+      final long localStartOrder = currentStartOrder;
+
+      exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
 
       final Duration delay = config.get().getPollDuration().toStandardDuration();
-      future = exec.scheduleWithFixedDelay(
+      exec.scheduleWithFixedDelay(
           new Runnable()
           {
             @Override
             public void run()
             {
+              // poll() is synchronized together with start(), stop() and isStarted() to
ensure that when stop() exists,
+              // poll() won't actually run anymore after that (it could only enter the syncrhonized
section and exit
+              // immediately because the localStartedOrder doesn't match the new currentStartOrder).
It's needed
+              // to avoid flakiness in SQLMetadataSegmentManagerTest.
+              // See https://github.com/apache/incubator-druid/issues/6028
+              readLock.lock();
               try {
-                poll();
+                if (localStartOrder == currentStartOrder) {
+                  poll();
+                }
               }
               catch (Exception e) {
                 log.makeAlert(e, "uncaught exception in segment manager polling thread").emit();
 
               }
+              finally {
+                readLock.unlock();
+              }
             }
           },
           0,
           delay.getMillis(),
           TimeUnit.MILLISECONDS
       );
-      started = true;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -154,8 +190,9 @@ public void run()
   @LifecycleStop
   public void stop()
   {
-    synchronized (lock) {
-      if (!started) {
+    writeLock.lock();
+    try {
+      if (!isStarted()) {
         return;
       }
 
@@ -165,11 +202,12 @@ public void stop()
         current = dataSourcesRef.get();
       } while (!dataSourcesRef.compareAndSet(current, emptyMap));
 
-      future.cancel(false);
-      future = null;
+      currentStartOrder = -1;
       exec.shutdownNow();
       exec = null;
-      started = false;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -340,7 +378,15 @@ public boolean removeSegment(String ds, final String segmentID)
   @Override
   public boolean isStarted()
   {
-    return started;
+    // isStarted() is synchronized together with start(), stop() and poll() to ensure that
the latest currentStartOrder
+    // is always visible. readLock should be used to avoid unexpected performance degradation
of DruidCoordinator.
+    readLock.lock();
+    try {
+      return currentStartOrder >= 0;
+    }
+    finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -394,10 +440,6 @@ public ImmutableDruidDataSource getInventoryValue(String key)
   public void poll()
   {
     try {
-      if (!started) {
-        return;
-      }
-
       ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>();
 
       log.debug("Starting polling of segment table");
diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
index 281838f3ccf..7e630359db2 100644
--- a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -118,6 +118,7 @@ public void testPoll()
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertEquals(
         ImmutableList.of("wikipedia"),
         manager.getAllDatasourceNames()
@@ -149,6 +150,7 @@ public void testPollWithCurroptedSegment()
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     Assert.assertEquals(
         "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
@@ -160,6 +162,7 @@ public void testGetUnusedSegmentsForInterval()
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertTrue(manager.removeDatasource("wikipedia"));
 
     Assert.assertEquals(
@@ -178,6 +181,7 @@ public void testRemoveDataSource() throws IOException
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(
@@ -207,6 +211,7 @@ public void testRemoveDataSegment() throws IOException
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Mime
View raw message