bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-850: Use nanoseconds to calculate poll timeout when doing group commit (Matteo Merli via sijie)
Date Tue, 21 Apr 2015 07:24:00 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 6622b46d4 -> fe6259c7e


BOOKKEEPER-850: Use nanoseconds to calculate poll timeout when doing group commit (Matteo
Merli via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/fe6259c7
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/fe6259c7
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/fe6259c7

Branch: refs/heads/master
Commit: fe6259c7eade644c8d2a5e96aba61c1792d64843
Parents: 6622b46
Author: Sijie Guo <sijie@apache.org>
Authored: Tue Apr 21 00:23:48 2015 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Tue Apr 21 00:23:48 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../org/apache/bookkeeper/bookie/Journal.java   | 20 +++++++++++---------
 2 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/fe6259c7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 308a0a4..c36151d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -69,6 +69,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-849: Collect stats with sub-milliseconds precision (Matteo Merli via sijie)
 
+      BOOKKEEPER-850: Use nanoseconds to calculate poll timeout when doing group commit (Matteo
Merli via sijie)
+
       bookkeeper-client:
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/fe6259c7/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index cc61aa5..48e5f55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -494,7 +494,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
     final ServerConfiguration conf;
     final ForceWriteThread forceWriteThread;
     // Time after which we will stop grouping and issue the flush
-    private final long maxGroupWaitInMSec;
+    private final long maxGroupWaitInNanos;
     // Threshold after which we flush any buffered journal entries
     private final long bufferedEntriesThreshold;
     // Threshold after which we flush any buffered journal writes
@@ -546,7 +546,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
         this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
         this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
-        this.maxGroupWaitInMSec = conf.getJournalMaxGroupWaitMSec();
+        this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
         this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
         this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
         this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
@@ -554,7 +554,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
 
         // Unless there is a cap on the max wait (which requires group force writes)
         // we cannot skip flushing for queue empty
-        this.flushWhenQueueEmpty = maxGroupWaitInMSec <= 0 || conf.getJournalFlushWhenQueueEmpty();
+        this.flushWhenQueueEmpty = maxGroupWaitInNanos <= 0 || conf.getJournalFlushWhenQueueEmpty();
 
         this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
@@ -822,17 +822,19 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
                     if (toFlush.isEmpty()) {
                         qe = queue.take();
                     } else {
-                        long pollWaitTime = maxGroupWaitInMSec - MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime);
-                        if (flushWhenQueueEmpty || pollWaitTime < 0) {
-                            pollWaitTime = 0;
+                        long pollWaitTimeNanos = maxGroupWaitInNanos - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+                        if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
+                            pollWaitTimeNanos = 0;
                         }
-                        qe = queue.poll(pollWaitTime, TimeUnit.MILLISECONDS);
+                        qe = queue.poll(pollWaitTimeNanos, TimeUnit.NANOSECONDS);
                         boolean shouldFlush = false;
                         // We should issue a forceWrite if any of the three conditions below
holds good
                         // 1. If the oldest pending entry has been pending for longer than
the max wait time
-                        if (maxGroupWaitInMSec > 0 && !groupWhenTimeout &&
(MathUtils.elapsedMSec(toFlush.getFirst().enqueueTime) > maxGroupWaitInMSec)) {
+                        if (maxGroupWaitInNanos > 0 && !groupWhenTimeout
+                                && (MathUtils.elapsedNanos(toFlush.get(0).enqueueTime)
> maxGroupWaitInNanos)) {
                             groupWhenTimeout = true;
-                        } else if (maxGroupWaitInMSec > 0 && groupWhenTimeout
&& qe != null && MathUtils.elapsedMSec(qe.enqueueTime) < maxGroupWaitInMSec)
{
+                        } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& qe != null
+                                && MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)
{
                             // when group timeout, it would be better to look forward, as
there might be lots of entries already timeout
                             // due to a previous slow write (writing to filesystem which
impacted by force write).
                             // Group those entries in the queue


Mime
View raw message