bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-888: Dispatch individual callbacks from journal in different threads
Date Tue, 09 Feb 2016 07:30:30 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 61390322e -> 63a2a91eb


BOOKKEEPER-888: Dispatch individual callbacks from journal in different threads

Currently the journal is sending all the responses from a single thread, after the entries
in a batch are synced. Since a thread pool has been configured, it is better to spread the
send-response tasks to all the available threads.

Author: Matteo Merli <mmerli@yahoo-inc.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #8 from merlimat/bk-888


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/63a2a91e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/63a2a91e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/63a2a91e

Branch: refs/heads/master
Commit: 63a2a91eba191e92a1902cf5400325dcf4d36089
Parents: 6139032
Author: Matteo Merli <mmerli@yahoo-inc.com>
Authored: Mon Feb 8 23:30:29 2016 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Feb 8 23:30:29 2016 -0800

----------------------------------------------------------------------
 .../org/apache/bookkeeper/bookie/Journal.java   | 21 ++++++++------------
 1 file changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/63a2a91e/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 48e5f55..08394c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -268,7 +268,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
     /**
      * Journal Entry to Record
      */
-    private static class QueueEntry {
+    private class QueueEntry implements Runnable {
         ByteBuffer entry;
         long ledgerId;
         long entryId;
@@ -286,15 +286,17 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
             this.enqueueTime = enqueueTime;
         }
 
-        public void callback() {
+        @Override
+        public void run() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
+            journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
         }
     }
 
-    private class ForceWriteRequest implements Runnable {
+    private class ForceWriteRequest {
         private final JournalChannel logFile;
         private final LinkedList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
@@ -330,7 +332,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
                 lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
 
                 // Notify the waiters that the force write succeeded
-                cbThreadPool.submit(this);
+                for (QueueEntry e : this.forceWriteWaiters) {
+                    cbThreadPool.submit(e);
+                }
 
                 return this.forceWriteWaiters.size();
             }
@@ -339,15 +343,6 @@ class Journal extends BookieCriticalThread implements CheckpointSource
{
             }
         }
 
-        @Override
-        public void run() {
-            for (QueueEntry e : this.forceWriteWaiters) {
-                journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(e.enqueueTime),
-                        TimeUnit.NANOSECONDS);
-                e.callback();    // Process cbs inline
-            }
-        }
-
         public void closeFileIfNecessary() {
             // Close if shouldClose is set
             if (shouldClose) {


Mime
View raw message