distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [01/31] incubator-distributedlog git commit: DL-115: fix force get log segment logic
Date Fri, 30 Dec 2016 00:07:15 GMT
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 0ed872353 -> f607a48ff


DL-115: fix force get log segment logic

Summary:

We should remove the ledger closed check:

- force to get log segments if the reader has been idle for a while after open the ledger
- idleReaderWarnThreshold should be 2x larger than readLACLongPollTimeout


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/00be3e58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/00be3e58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/00be3e58

Branch: refs/heads/master
Commit: 00be3e58d9a4a4e48989c8e45825a0c4911f9cdc
Parents: 0ed8723
Author: Yiming Zang <yzang@twitter.com>
Authored: Wed Nov 16 14:14:52 2016 -0800
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Thu Dec 29 02:06:55 2016 -0800

----------------------------------------------------------------------
 .../DistributedLogConfiguration.java            |  4 +-
 .../readahead/ReadAheadWorker.java              | 46 +++++++++-----------
 2 files changed, 23 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index ebb7ae2..46a056b 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -3467,8 +3467,10 @@ public class DistributedLogConfiguration extends CompositeConfiguration
{
      */
     public void validate() {
         Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= getReadLACLongPollTimeout(),
-            "Invalid timeout configuration : bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+
+            "Invalid timeout configuration: bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+
                 ") should be longer than readLACLongPollTimeout ("+getReadLACLongPollTimeout()+")");
+        Preconditions.checkArgument(getReaderIdleWarnThresholdMillis() > 2 * getReadLACLongPollTimeout(),
+            "Invalid configuration: ReaderIdleWarnThreshold should be 2x larget than readLACLongPollTimeout");
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00be3e58/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 9a1911e..0217560 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -180,7 +180,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
     protected final long metadataLatencyWarnThresholdMillis;
     final ReadAheadTracker tracker;
     final Stopwatch resumeStopWatch;
-    final Stopwatch lastLedgerCloseDetected = Stopwatch.createUnstarted();
+    final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted();
     // Misc
     private final boolean readAheadSkipBrokenEntries;
     // Stats
@@ -901,9 +901,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                             });
                 } else {
                     final long lastAddConfirmed;
-                    boolean isClosed;
                     try {
-                        isClosed = handleCache.isLedgerHandleClosed(currentLH);
                         lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
                     } catch (BKException ie) {
                         // Exception is thrown due to no ledger handle
@@ -912,30 +910,26 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable,
AsyncClosea
                     }
 
                     if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
-                        if (isClosed) {
-                            // This indicates that the currentMetadata is still marked in
-                            // progress while the ledger has been closed. This specific ledger
-                            // is not going to produce any more entries - so we should
-                            // be reading metadata entries to mark the current metadata
-                            // as complete
-                            if (lastLedgerCloseDetected.isRunning()) {
-                                if (lastLedgerCloseDetected.elapsed(TimeUnit.MILLISECONDS)
-                                    > conf.getReaderIdleWarnThresholdMillis()) {
-                                    idleReaderWarn.inc();
-                                    LOG.info("{} Ledger {} for inprogress segment {} closed
for idle reader warn threshold",
-                                        new Object[] { fullyQualifiedName, currentMetadata,
currentLH });
-                                    reInitializeMetadata = true;
-                                    forceReadLogSegments = true;
-                                }
-                            } else {
-                                lastLedgerCloseDetected.reset().start();
-                                if (conf.getTraceReadAheadMetadataChanges()) {
-                                    LOG.info("{} Ledger {} for inprogress segment {} closed",
-                                        new Object[] { fullyQualifiedName, currentMetadata,
currentLH });
-                                }
+                        // This indicates that the currentMetadata is still marked in
+                        // progress while we have already read all the entries. It might
+                        // indicate a failure to detect metadata change. So we
+                        // should probably try force read log segments if the reader has
+                        // been idle for after a while.
+                        if (LACNotAdvancedStopWatch.isRunning()) {
+                            if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS)
+                                > conf.getReaderIdleWarnThresholdMillis()) {
+                                idleReaderWarn.inc();
+                                LOG.info("{} Ledger {} for inprogress segment {}, reader
has been idle for warn threshold {}",
+                                    new Object[] { fullyQualifiedName, currentMetadata, currentLH,
conf.getReaderIdleWarnThresholdMillis() });
+                                reInitializeMetadata = true;
+                                forceReadLogSegments = true;
                             }
                         } else {
-                            lastLedgerCloseDetected.reset();
+                            LACNotAdvancedStopWatch.reset().start();
+                            if (conf.getTraceReadAheadMetadataChanges()) {
+                                LOG.info("{} Ledger {} for inprogress segment {} closed",
+                                    new Object[] { fullyQualifiedName, currentMetadata, currentLH
});
+                            }
                         }
 
                         tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
@@ -966,7 +960,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                     }
                 }
             } else {
-                lastLedgerCloseDetected.reset();
+                LACNotAdvancedStopWatch.reset();
                 if (null != currentLH) {
                     try {
                         if (inProgressChanged) {


Mime
View raw message