bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-748: Move fence requests out of read threads
Date Mon, 05 Jun 2017 20:25:21 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 5fe86525a -> 58b92de8b


BOOKKEEPER-748: Move fence requests out of read threads

This change is moving the fence request out of current read threads and using callback to trigger scheduling read entry and also it refactors the ReadEntryProcessV3 to support long poll in the subsequent requests.

Major changes:

- change fence request to use SettableFuture
- refactor ReadEntryProcessV3: to support run fence request in callback and support long poll (in subsequent requests)
- fix stats issue: requests stats measure the latency from bookie receives the request until it sends the response.
- remove "public static final" for state fields. it is not needed for variables in Interface.

Author: Sijie Guo <sijie@apache.org>
Author: Robin Dhamankar <rdhamankar@twitter.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <None>

Closes #174 from sijie/longpoll/part1_server_side_change


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/58b92de8
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/58b92de8
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/58b92de8

Branch: refs/heads/master
Commit: 58b92de8b5feee37f8bc9a99d8c98c2a9d9df383
Parents: 5fe8652
Author: Sijie Guo <sijie@apache.org>
Authored: Mon Jun 5 13:25:17 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Jun 5 13:25:17 2017 -0700

----------------------------------------------------------------------
 .../bookie/BookKeeperServerStats.java           | 141 ++++----
 .../org/apache/bookkeeper/bookie/Bookie.java    | 100 ++----
 .../bookkeeper/bookie/LedgerDescriptorImpl.java |   1 +
 .../bookkeeper/proto/BookieProtoEncoding.java   |   5 +-
 .../apache/bookkeeper/proto/BookieProtocol.java |   7 +
 .../proto/BookieRequestProcessor.java           |  47 ++-
 .../proto/BookkeeperInternalCallbacks.java      |  91 ++++-
 .../proto/GetBookieInfoProcessorV3.java         |   2 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java |  27 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java  | 349 ++++++++++++++-----
 .../bookkeeper/proto/ReadLacProcessorV3.java    |   2 +-
 .../apache/bookkeeper/proto/RequestUtils.java   |  38 ++
 .../bookkeeper/proto/WriteLacProcessorV3.java   |   2 +-
 13 files changed, 553 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 99a2db1..dc16b52 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -22,78 +22,101 @@ package org.apache.bookkeeper.bookie;
 
 public interface BookKeeperServerStats {
 
-    public final static String SERVER_SCOPE = "bookkeeper_server";
-    public final static String BOOKIE_SCOPE = "bookie";
+    String SERVER_SCOPE = "bookkeeper_server";
+    String BOOKIE_SCOPE = "bookie";
 
-    public final static String SERVER_STATUS = "SERVER_STATUS";
+    String SERVER_STATUS = "SERVER_STATUS";
 
+    //
+    // Network Stats (scoped under SERVER_SCOPE)
+    //
+
+    // Stats
+    String CHANNEL_WRITE = "CHANNEL_WRITE";
+
+    //
     // Server Operations
-    public final static String ADD_ENTRY_REQUEST = "ADD_ENTRY_REQUEST";
-    public final static String ADD_ENTRY = "ADD_ENTRY";
-    public final static String READ_ENTRY_REQUEST = "READ_ENTRY_REQUEST";
-    public final static String READ_ENTRY = "READ_ENTRY";
-    public final static String READ_ENTRY_FENCE_REQUEST = "READ_ENTRY_FENCE_REQUEST";
-    public final static String READ_ENTRY_FENCE_WAIT = "READ_ENTRY_FENCE_WAIT";
-    public final static String READ_ENTRY_FENCE_READ = "READ_ENTRY_FENCE_READ";
-    public final static String WRITE_LAC = "WRITE_LAC";
-    public final static String READ_LAC = "READ_LAC";
-    public final static String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
+    //
+
+    // Stats
+    String ADD_ENTRY_REQUEST = "ADD_ENTRY_REQUEST";
+    String ADD_ENTRY = "ADD_ENTRY";
+    String READ_ENTRY_REQUEST = "READ_ENTRY_REQUEST";
+    String READ_ENTRY = "READ_ENTRY";
+    String READ_ENTRY_SCHEDULING_DELAY = "READ_ENTRY_SCHEDULING_DELAY";
+    String READ_ENTRY_FENCE_REQUEST = "READ_ENTRY_FENCE_REQUEST";
+    String READ_ENTRY_FENCE_WAIT = "READ_ENTRY_FENCE_WAIT";
+    String READ_ENTRY_FENCE_READ = "READ_ENTRY_FENCE_READ";
+    String READ_ENTRY_LONG_POLL_REQUEST = "READ_ENTRY_LONG_POLL_REQUEST";
+    String READ_ENTRY_LONG_POLL_PRE_WAIT = "READ_ENTRY_LONG_POLL_PRE_WAIT";
+    String READ_ENTRY_LONG_POLL_WAIT = "READ_ENTRY_LONG_POLL_WAIT";
+    String READ_ENTRY_LONG_POLL_READ = "READ_ENTRY_LONG_POLL_READ";
+    String WRITE_LAC_REQUEST = "WRITE_LAC_REQUEST";
+    String WRITE_LAC = "WRITE_LAC";
+    String READ_LAC_REQUEST = "READ_LAC_REQUEST";
+    String READ_LAC = "READ_LAC";
+    String GET_BOOKIE_INFO_REQUEST = "GET_BOOKIE_INFO_REQUEST";
+    String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
 
     // Bookie Operations
-    public final static String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
-    public final static String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
-    public final static String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
-    public final static String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
-    public final static String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+    String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
+    String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
+    String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+    String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
+    String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
+    String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
+
+    //
+    // Journal Stats (scoped under SERVER_SCOPE)
+    //
 
-    // Journal Stats
-    public final static String JOURNAL_SCOPE = "journal";
-    public final static String JOURNAL_ADD_ENTRY = "JOURNAL_ADD_ENTRY";
-    public final static String JOURNAL_SYNC = "JOURNAL_SYNC";
-    public final static String JOURNAL_MEM_ADD_ENTRY = "JOURNAL_MEM_ADD_ENTRY";
-    public final static String JOURNAL_PREALLOCATION = "JOURNAL_PREALLOCATION";
-    public final static String JOURNAL_FORCE_WRITE_LATENCY = "JOURNAL_FORCE_WRITE_LATENCY";
-    public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
-    public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
-    public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
-    public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
-    public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
-    public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
+    String JOURNAL_SCOPE = "journal";
+    String JOURNAL_ADD_ENTRY = "JOURNAL_ADD_ENTRY";
+    String JOURNAL_SYNC = "JOURNAL_SYNC";
+    String JOURNAL_MEM_ADD_ENTRY = "JOURNAL_MEM_ADD_ENTRY";
+    String JOURNAL_PREALLOCATION = "JOURNAL_PREALLOCATION";
+    String JOURNAL_FORCE_WRITE_LATENCY = "JOURNAL_FORCE_WRITE_LATENCY";
+    String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
+    String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
+    String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
+    String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
+    String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
+    String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
 
     // Ledger Storage Stats
-    public final static String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
-    public final static String STORAGE_GET_ENTRY = "STORAGE_GET_ENTRY";
-    public final static String SKIP_LIST_GET_ENTRY = "SKIP_LIST_GET_ENTRY";
-    public final static String SKIP_LIST_PUT_ENTRY = "SKIP_LIST_PUT_ENTRY";
-    public final static String SKIP_LIST_SNAPSHOT = "SKIP_LIST_SNAPSHOT";
+    String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
+    String STORAGE_GET_ENTRY = "STORAGE_GET_ENTRY";
+    String SKIP_LIST_GET_ENTRY = "SKIP_LIST_GET_ENTRY";
+    String SKIP_LIST_PUT_ENTRY = "SKIP_LIST_PUT_ENTRY";
+    String SKIP_LIST_SNAPSHOT = "SKIP_LIST_SNAPSHOT";
 
     // Counters
-    public final static String JOURNAL_WRITE_BYTES = "JOURNAL_WRITE_BYTES";
-    public final static String JOURNAL_QUEUE_SIZE = "JOURNAL_QUEUE_SIZE";
-    public final static String READ_BYTES = "READ_BYTES";
-    public final static String WRITE_BYTES = "WRITE_BYTES";
-    public final static String NUM_MINOR_COMP = "NUM_MINOR_COMP";
-    public final static String NUM_MAJOR_COMP = "NUM_MAJOR_COMP";
-    public final static String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
-    public final static String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
-    public final static String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
-    public final static String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
-    public final static String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
-    public final static String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
-    public final static String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
-    public final static String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
-    public final static String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
+    String JOURNAL_WRITE_BYTES = "JOURNAL_WRITE_BYTES";
+    String JOURNAL_QUEUE_SIZE = "JOURNAL_QUEUE_SIZE";
+    String READ_BYTES = "READ_BYTES";
+    String WRITE_BYTES = "WRITE_BYTES";
+    String NUM_MINOR_COMP = "NUM_MINOR_COMP";
+    String NUM_MAJOR_COMP = "NUM_MAJOR_COMP";
+    String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
+    String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
+    String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
+    String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
+    String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
+    String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
+    String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
+    String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
+    String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
 
     // Gauge
-    public final static String NUM_INDEX_PAGES = "NUM_INDEX_PAGES";
-    public final static String NUM_OPEN_LEDGERS = "NUM_OPEN_LEDGERS";
-    public final static String JOURNAL_FORCE_WRITE_GROUPING_COUNT = "JOURNAL_FORCE_WRITE_GROUPING_COUNT";
-    public final static String NUM_PENDING_READ = "NUM_PENDING_READ";
-    public final static String NUM_PENDING_ADD = "NUM_PENDING_ADD";
+    String NUM_INDEX_PAGES = "NUM_INDEX_PAGES";
+    String NUM_OPEN_LEDGERS = "NUM_OPEN_LEDGERS";
+    String JOURNAL_FORCE_WRITE_GROUPING_COUNT = "JOURNAL_FORCE_WRITE_GROUPING_COUNT";
+    String NUM_PENDING_READ = "NUM_PENDING_READ";
+    String NUM_PENDING_ADD = "NUM_PENDING_ADD";
 
     // LedgerDirs Stats
-    public final static String LD_LEDGER_SCOPE = "ledger";
-    public final static String LD_INDEX_SCOPE = "index";
-    public final static String LD_WRITABLE_DIRS = "writable_dirs";
+    String LD_LEDGER_SCOPE = "ledger";
+    String LD_INDEX_SCOPE = "index";
+    String LD_WRITABLE_DIRS = "writable_dirs";
 
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index f32626a..12f491d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -207,78 +209,6 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    final static Future<Boolean> SUCCESS_FUTURE = new Future<Boolean>() {
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) { return false; }
-        @Override
-        public Boolean get() { return true; }
-        @Override
-        public Boolean get(long timeout, TimeUnit unit) { return true; }
-        @Override
-        public boolean isCancelled() { return false; }
-        @Override
-        public boolean isDone() {
-            return true;
-        }
-    };
-
-    static class CountDownLatchFuture<T> implements Future<T> {
-
-        T value = null;
-        volatile boolean done = false;
-        CountDownLatch latch = new CountDownLatch(1);
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) { return false; }
-        @Override
-        public T get() throws InterruptedException {
-            latch.await();
-            return value;
-        }
-        @Override
-        public T get(long timeout, TimeUnit unit)
-            throws InterruptedException, TimeoutException {
-            if (!latch.await(timeout, unit)) {
-                throw new TimeoutException("Timed out waiting for latch");
-            }
-            return value;
-        }
-
-        @Override
-        public boolean isCancelled() { return false; }
-
-        @Override
-        public boolean isDone() {
-            return done;
-        }
-
-        void setDone(T value) {
-            this.value = value;
-            done = true;
-            latch.countDown();
-        }
-    }
-
-    static class FutureWriteCallback implements WriteCallback {
-
-        CountDownLatchFuture<Boolean> result =
-            new CountDownLatchFuture<Boolean>();
-
-        @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
-                                  BookieSocketAddress addr, Object ctx) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
-                          new Object[] { entryId, ledgerId, addr, rc });
-            }
-            result.setDone(0 == rc);
-        }
-
-        public Future<Boolean> getResult() {
-            return result;
-        }
-    }
-
     public static void checkDirectoryStructure(File dir) throws IOException {
         if (!dir.exists()) {
             File parent = dir.getParentFile();
@@ -1454,6 +1384,26 @@ public class Bookie extends BookieCriticalThread {
             entry.release();
         }
     }
+    
+    static class FutureWriteCallback implements WriteCallback {
+
+        SettableFuture<Boolean> result = SettableFuture.create();
+
+        @Override
+        public void writeComplete(int rc, long ledgerId, long entryId,
+                                  BookieSocketAddress addr, Object ctx) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
+                          new Object[] { entryId, ledgerId, addr, rc });
+            }
+
+            result.set(0 == rc);
+        }
+
+        public SettableFuture<Boolean> getResult() {
+            return result;
+        }
+    }
 
     /**
      * Fences a ledger. From this point on, clients will be unable to
@@ -1462,7 +1412,7 @@ public class Bookie extends BookieCriticalThread {
      * This method is idempotent. Once a ledger is fenced, it can
      * never be unfenced. Fencing a fenced ledger has no effect.
      */
-    public Future<Boolean> fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException {
+    public SettableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException {
         LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
         boolean success;
         synchronized (handle) {
@@ -1483,7 +1433,9 @@ public class Bookie extends BookieCriticalThread {
             return fwc.getResult();
         } else {
             // already fenced
-            return SUCCESS_FUTURE;
+            SettableFuture<Boolean> successFuture = SettableFuture.create();
+            successFuture.set(true);
+            return successFuture;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index f80bbc8..a1e0fc0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -81,6 +81,7 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
         return ledgerStorage.getExplicitLac(ledgerId);
     }
 
+    @Override
     long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex());
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 8de4323..e1ccabc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -46,10 +46,9 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageEncoder;
 
 public class BookieProtoEncoding {
-    private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
-
-    static interface EnDecoder {
+    private static final Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
+    public static interface EnDecoder {
         /**
          * Encode a <i>object</i> into channel buffer.
          *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index d744d09..4df4044 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -355,6 +355,13 @@ public interface BookieProtocol {
             super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
         }
     }
+    
+    static class ErrorResponse extends Response {
+        ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
+                      long ledgerId, long entryId) {
+            super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+        }
+    }
 
     static class AuthResponse extends Response {
         final AuthMessage authMessage;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 0746686..ce5972e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -21,12 +21,10 @@
 package org.apache.bookkeeper.proto;
 
 import com.google.protobuf.ByteString;
-
 import io.netty.channel.Channel;
-
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
@@ -38,15 +36,28 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_WAIT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
 
 public class BookieRequestProcessor implements RequestProcessor {
 
-    private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
+    private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
+
     /**
      * The server configuration. We use this for getting the number of add and read
      * worker threads.
@@ -75,9 +86,21 @@ public class BookieRequestProcessor implements RequestProcessor {
     final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
+    final OpStatsLogger fenceReadRequestStats;
+    final OpStatsLogger fenceReadEntryStats;
+    final OpStatsLogger fenceReadWaitStats;
+    final OpStatsLogger readEntrySchedulingDelayStats;
+    final OpStatsLogger longPollPreWaitStats;
+    final OpStatsLogger longPollWaitStats;
+    final OpStatsLogger longPollReadStats;
+    final OpStatsLogger longPollReadRequestStats;
+    final OpStatsLogger writeLacRequestStats;
     final OpStatsLogger writeLacStats;
+    final OpStatsLogger readLacRequestStats;
     final OpStatsLogger readLacStats;
+    final OpStatsLogger getBookieInfoRequestStats;
     final OpStatsLogger getBookieInfoStats;
+    final OpStatsLogger channelWriteStats;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
                                   StatsLogger statsLogger) {
@@ -91,9 +114,21 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
         this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
         this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
+        this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
+        this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
+        this.fenceReadWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_WAIT);
+        this.readEntrySchedulingDelayStats = statsLogger.getOpStatsLogger(READ_ENTRY_SCHEDULING_DELAY);
+        this.longPollPreWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_PRE_WAIT);
+        this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
+        this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
+        this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
         this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
+        this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
         this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
+        this.readLacRequestStats = statsLogger.getOpStatsLogger(READ_LAC_REQUEST);
         this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
+        this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
+        this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
     }
 
     @Override
@@ -194,7 +229,9 @@ public class BookieRequestProcessor implements RequestProcessor {
     }
 
     private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
-        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this);
+        ExecutorService fenceThreadPool =
+          null == readThreadPool ? null : readThreadPool.chooseThread(c);
+        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool);
         if (null == readThreadPool) {
             read.run();
         } else {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index ddea8b7..8de9783 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -22,15 +22,18 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.ByteBuf;
-
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +42,6 @@ import org.slf4j.LoggerFactory;
  * Declaration of a callback interfaces used in bookkeeper client library but
  * not exposed to the client application.
  */
-
 public class BookkeeperInternalCallbacks {
 
     static final Logger LOG = LoggerFactory.getLogger(BookkeeperInternalCallbacks.class);
@@ -80,6 +82,36 @@ public class BookkeeperInternalCallbacks {
     public interface GenericCallback<T> {
         void operationComplete(int rc, T result);
     }
+    
+        public static class TimedGenericCallback<T> implements GenericCallback<T> {
+
+        final GenericCallback<T> cb;
+        final int successRc;
+        final OpStatsLogger statsLogger;
+        final long startTime;
+
+        public TimedGenericCallback(GenericCallback<T> cb, int successRc, OpStatsLogger statsLogger) {
+            this.cb = cb;
+            this.successRc = successRc;
+            this.statsLogger = statsLogger;
+            this.startTime = MathUtils.nowInNano();
+        }
+
+        @Override
+        public void operationComplete(int rc, T result) {
+            if (successRc == rc) {
+                statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            } else {
+                statsLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            }
+            cb.operationComplete(rc, result);
+        }
+    }
+    
+    public interface ReadEntryCallbackCtx {
+        void setLastAddConfirmed(long lac);
+        long getLastAddConfirmed();
+    }
 
     /**
      * Declaration of a callback implementation for calls from BookieClient objects.
@@ -87,15 +119,10 @@ public class BookkeeperInternalCallbacks {
      * from a ledger).
      *
      */
-
     public interface ReadEntryCallback {
         void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx);
     }
 
-    public interface GetBookieInfoCallback {
-        void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx);
-    }
-
     /**
      * Listener on entries responded.
      */
@@ -114,6 +141,10 @@ public class BookkeeperInternalCallbacks {
          */
         void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx);
     }
+    
+    public interface GetBookieInfoCallback {
+        void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx);
+    }
 
     /**
      * This is a multi callback object that waits for all of
@@ -128,29 +159,59 @@ public class BookkeeperInternalCallbacks {
         // Final callback and the corresponding context to invoke
         final AsyncCallback.VoidCallback cb;
         final Object context;
+        final ExecutorService callbackExecutor;
         // This keeps track of how many operations have completed
         final AtomicInteger done = new AtomicInteger();
         // List of the exceptions from operations that completed unsuccessfully
         final LinkedBlockingQueue<Integer> exceptions = new LinkedBlockingQueue<Integer>();
 
-        public MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context, int successRc, int failureRc) {
+        public MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context,
+                             int successRc, int failureRc) {
+            this(expected, cb, context, successRc, failureRc, null);
+        }
+
+        public MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context,
+                             int successRc, int failureRc, ExecutorService callbackExecutor) {
             this.expected = expected;
             this.cb = cb;
             this.context = context;
             this.failureRc = failureRc;
             this.successRc = successRc;
+            this.callbackExecutor = callbackExecutor;
             if (expected == 0) {
-                cb.processResult(successRc, null, context);
+                callback();
             }
         }
 
         private void tick() {
             if (done.incrementAndGet() == expected) {
-                if (exceptions.isEmpty()) {
-                    cb.processResult(successRc, null, context);
-                } else {
-                    cb.processResult(failureRc, null, context);
+                callback();
+            }
+        }
+
+        private void callback() {
+            if (null != callbackExecutor) {
+                try {
+                    callbackExecutor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            doCallback();
+                        }
+                    });
+                } catch (RejectedExecutionException ree) {
+                    // if the callback executor is shutdown, do callback in same thread
+                    doCallback();
                 }
+            } else {
+                doCallback();
+            }
+        }
+
+        private void doCallback() {
+            if (exceptions.isEmpty()) {
+                cb.processResult(successRc, null, context);
+            } else {
+                cb.processResult(failureRc, null, context);
             }
         }
 
@@ -174,7 +235,7 @@ public class BookkeeperInternalCallbacks {
          *
          * @param data
          *          data to process
-         * @param iterationCallback
+         * @param cb
          *          Callback to invoke when process has been done.
          */
         public void process(T data, AsyncCallback.VoidCallback cb);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 4f7b8c0..6b825d9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -91,6 +91,6 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
                 .setGetBookieInfoResponse(getBookieInfoResponse);
         sendResponse(response.getStatus(),
                      response.build(),
-                     requestProcessor.getBookieInfoStats);
+                     requestProcessor.getBookieInfoRequestStats);
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 873ef30..e398c4a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -21,7 +21,8 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.channel.Channel;
-
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -48,12 +49,24 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
     }
 
     protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
-        channel.writeAndFlush(response);
-        if (StatusCode.EOK == code) {
-            statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
-        } else {
-            statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
-        }
+        final long writeNanos = MathUtils.nowInNano();
+        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
+                if (!future.isSuccess()) {
+                    requestProcessor.channelWriteStats.registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                } else {
+                    requestProcessor.channelWriteStats.registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                }
+                if (StatusCode.EOK == code) {
+                    statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                } else {
+                    statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                }
+            }
+        });
+
     }
 
     protected boolean isVersionCompatible() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index b921b46..e01a339 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -17,16 +17,17 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.ReferenceCountUtil;
-
 import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
@@ -34,139 +35,301 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.ByteString;
-
 class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
 
     private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
 
-    public ReadEntryProcessorV3(Request request, Channel channel,
-                                BookieRequestProcessor requestProcessor) {
+    protected Stopwatch lastPhaseStartTime;
+    private final ExecutorService fenceThreadPool;
+
+    private SettableFuture<Boolean> fenceResult = null;
+    
+    protected final ReadRequest readRequest;
+    protected final long ledgerId;
+    protected final long entryId;
+    
+    // Stats
+    protected final OpStatsLogger readStats;
+    protected final OpStatsLogger reqStats;
+
+    public ReadEntryProcessorV3(Request request,
+                                Channel channel,
+                                BookieRequestProcessor requestProcessor,
+                                ExecutorService fenceThreadPool) {
         super(request, channel, requestProcessor);
+        this.readRequest = request.getReadRequest();
+        this.ledgerId = readRequest.getLedgerId();
+        this.entryId = readRequest.getEntryId();
+        if (RequestUtils.isFenceRequest(this.readRequest)) {
+            this.readStats = requestProcessor.fenceReadEntryStats;
+            this.reqStats = requestProcessor.fenceReadRequestStats;
+        } else if (readRequest.hasPreviousLAC()) {
+            this.readStats = requestProcessor.longPollReadStats;
+            this.reqStats = requestProcessor.longPollReadRequestStats;
+        } else {
+            this.readStats = requestProcessor.readEntryStats;
+            this.reqStats = requestProcessor.readRequestStats;
+        }
+        
+        this.fenceThreadPool = fenceThreadPool;
+        lastPhaseStartTime = Stopwatch.createStarted();
     }
 
-    private ReadResponse getReadResponse() {
-        long startTimeNanos = MathUtils.nowInNano();
-        ReadRequest readRequest = request.getReadRequest();
-        long ledgerId = readRequest.getLedgerId();
-        long entryId = readRequest.getEntryId();
+    protected Long getPreviousLAC() {
+        if (readRequest.hasPreviousLAC()) {
+            return readRequest.getPreviousLAC();
+        } else {
+            return null;
+        }
+    }
 
-        ReadResponse.Builder readResponse = ReadResponse.newBuilder()
-                .setLedgerId(ledgerId)
-                .setEntryId(entryId);
+    /**
+     * Handle read result for fence read.
+     *
+     * @param entryBody
+     *          read result
+     * @param readResponseBuilder
+     *          read response builder
+     * @param entryId
+     *          entry id
+     * @param startTimeSw
+     *          timer for the read request
+     */
+    protected void handleReadResultForFenceRead(
+        final ByteBuf entryBody,
+        final ReadResponse.Builder readResponseBuilder,
+        final long entryId,
+        final Stopwatch startTimeSw) {
+        // reset last phase start time to measure fence result waiting time
+        lastPhaseStartTime.reset().start();
+        if (null != fenceThreadPool) {
+            Futures.addCallback(fenceResult, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(Boolean result) {
+                    sendFenceResponse(readResponseBuilder, entryBody, result, startTimeSw);
+                }
 
-        if (!isVersionCompatible()) {
-            readResponse.setStatus(StatusCode.EBADVERSION);
-            return readResponse.build();
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.error("Fence request for ledgerId {} entryId {} encountered exception",
+                        new Object[] { ledgerId, entryId, t });
+                    sendFenceResponse(readResponseBuilder, entryBody, false, startTimeSw);
+                }
+            }, fenceThreadPool);
+        } else {
+            boolean success = false;
+            try {
+                success = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+            } catch (Throwable t) {
+                LOG.error("Fence request for ledgerId {} entryId {} encountered exception : ",
+                    new Object[]{ readRequest.getLedgerId(), readRequest.getEntryId(), t });
+            }
+            sendFenceResponse(readResponseBuilder, entryBody, success, startTimeSw);
         }
+    }
+
+    /**
+     * Read a specific entry.
+     *
+     * @param readResponseBuilder
+     *          read response builder.
+     * @param entryId
+     *          entry to read
+     * @param startTimeSw
+     *          stop watch to measure the read operation.
+     * @return read response or null if it is a fence read operation.
+     * @throws IOException
+     */
+    protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+                                     long entryId,
+                                     Stopwatch startTimeSw)
+        throws IOException {
+        return readEntry(readResponseBuilder, entryId, false, startTimeSw);
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Received new read request: {}", request);
+    /**
+     * Read a specific entry.
+     *
+     * @param readResponseBuilder
+     *          read response builder.
+     * @param entryId
+     *          entry to read
+     * @param startTimeSw
+     *          stop watch to measure the read operation.
+     * @return read response or null if it is a fence read operation.
+     * @throws IOException
+     */
+    protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+                                     long entryId,
+                                     boolean readLACPiggyBack,
+                                     Stopwatch startTimeSw)
+        throws IOException {
+        ByteBuf entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
+        if (null != fenceResult) {
+            handleReadResultForFenceRead(entryBody, readResponseBuilder, entryId, startTimeSw);
+            return null;
+        } else {
+            try {
+                readResponseBuilder.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
+                if (readLACPiggyBack) {
+                    readResponseBuilder.setEntryId(entryId);
+                } else {
+                    long knownLAC = requestProcessor.bookie.readLastAddConfirmed(ledgerId);
+                    readResponseBuilder.setMaxLAC(knownLAC);
+                }
+                registerSuccessfulEvent(readStats, startTimeSw);
+                readResponseBuilder.setStatus(StatusCode.EOK);
+                return readResponseBuilder.build();
+            } finally {
+                ReferenceCountUtil.release(entryBody);
+            }
         }
-        StatusCode status;
-        ByteBuf entryBody = null;
-        try {
-            Future<Boolean> fenceResult = null;
-            if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
-                LOG.warn("Ledger fence request received for ledger: {} from address: {}", ledgerId,
-                         channel.remoteAddress());
+    }
+
+    protected ReadResponse getReadResponse() {
+        final Stopwatch startTimeSw = Stopwatch.createStarted();
 
-                if (readRequest.hasMasterKey()) {
+        final ReadResponse.Builder readResponse = ReadResponse.newBuilder()
+            .setLedgerId(ledgerId)
+            .setEntryId(entryId);
+        try {
+            // handle fence reqest
+            if (RequestUtils.isFenceRequest(readRequest)) {
+                LOG.info("Ledger fence request received for ledger: {} from address: {}", ledgerId,
+                    channel.remoteAddress());
+                if (!readRequest.hasMasterKey()) {
+                    LOG.error(
+                        "Fence ledger request received without master key for ledger:{} from address: {}",
+                        ledgerId, channel.remoteAddress());
+                    throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+                } else {
                     byte[] masterKey = readRequest.getMasterKey().toByteArray();
                     fenceResult = requestProcessor.bookie.fenceLedger(ledgerId, masterKey);
-                } else {
-                    LOG.error("Fence ledger request received without master key for ledger:{} from address: {}",
-                              ledgerId, channel.remoteAddress());
-                    throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
-            entryBody = requestProcessor.bookie.readEntry(ledgerId, entryId);
-            if (null != fenceResult) {
-                // TODO:
-                // currently we don't have readCallback to run in separated read
-                // threads. after BOOKKEEPER-429 is complete, we could improve
-                // following code to make it not wait here
-                //
-                // For now, since we only try to wait after read entry. so writing
-                // to journal and read entry are executed in different thread
-                // it would be fine.
-                try {
-                    Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
-                    if (null == fenced || !fenced) {
-                        // if failed to fence, fail the read request to make it retry.
-                        status = StatusCode.EIO;
-                    } else {
-                        status = StatusCode.EOK;
-                        readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
-                    }
-                } catch (InterruptedException ie) {
-                    LOG.error("Interrupting fence read entry (lid: {}, eid: {})",
-                              new Object[] { ledgerId, entryId, ie });
-                    status = StatusCode.EIO;
-                } catch (ExecutionException ee) {
-                    LOG.error("Failed to fence read entry (lid: {}, eid: {})",
-                              new Object[] { ledgerId, entryId, ee });
-                    status = StatusCode.EIO;
-                } catch (TimeoutException te) {
-                    LOG.error("Timeout to fence read entry (lid: {}, eid: {})",
-                              new Object[] { ledgerId, entryId, te });
-                    status = StatusCode.EIO;
-                }
+            return readEntry(readResponse, entryId, startTimeSw);
+        } catch (Bookie.NoLedgerException e) {
+            if (RequestUtils.isFenceRequest(readRequest)) {
+                LOG.info("No ledger found reading entry {} when fencing ledger {}", entryId, ledgerId);
             } else {
-                readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
-                status = StatusCode.EOK;
+                LOG.info("No ledger found while reading entry: {} from ledger: {}", entryId, ledgerId);
             }
-        } catch (Bookie.NoLedgerException e) {
-            status = StatusCode.ENOLEDGER;
-            LOG.error("No ledger found while reading entry:{} from ledger: {}", entryId, ledgerId);
+            return buildResponse(readResponse, StatusCode.ENOLEDGER, startTimeSw);
         } catch (Bookie.NoEntryException e) {
-            status = StatusCode.ENOENTRY;
             if (LOG.isDebugEnabled()) {
-                LOG.debug("No entry found while reading entry:{} from ledger:{}", entryId, ledgerId);
+                LOG.debug("No entry found while reading entry: {} from ledger: {}", entryId, ledgerId);
             }
+            return buildResponse(readResponse, StatusCode.ENOENTRY, startTimeSw);
         } catch (IOException e) {
-            status = StatusCode.EIO;
-            LOG.error("IOException while reading entry:{} from ledger:{}", entryId, ledgerId);
+            LOG.error("IOException while reading entry: {} from ledger {} ", new Object[] { entryId, ledgerId, e });
+            return buildResponse(readResponse, StatusCode.EIO, startTimeSw);
         } catch (BookieException e) {
-            LOG.error("Unauthorized access to ledger:{} while reading entry:{} in request from address: {}",
-                    new Object[]{ledgerId, entryId, channel.remoteAddress()});
-            status = StatusCode.EUA;
+            LOG.error(
+                "Unauthorized access to ledger:{} while reading entry:{} in request from address: {}",
+                new Object[] { ledgerId, entryId, channel.remoteAddress() });
+            return buildResponse(readResponse, StatusCode.EUA, startTimeSw);
         }
+    }
+
+    @Override
+    public void safeRun() {
+        requestProcessor.readEntrySchedulingDelayStats.registerSuccessfulEvent(
+            MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+
+        if (!isVersionCompatible()) {
+            ReadResponse readResponse = ReadResponse.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .setStatus(StatusCode.EBADVERSION)
+                .build();
+            sendResponse(readResponse);
+            return;
+        }
+
+        executeOp();
+    }
+
+    protected void executeOp() {
+        ReadResponse readResponse = getReadResponse();
+        if (null != readResponse) {
+            sendResponse(readResponse);
+        }
+    }
 
-        if (status == StatusCode.EOK) {
-            requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+    private void getFenceResponse(ReadResponse.Builder readResponse,
+                                  ByteBuf entryBody,
+                                  boolean fenceResult) {
+        StatusCode status;
+        if (!fenceResult) {
+            status = StatusCode.EIO;
+            registerFailedEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
         } else {
-            requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
-                    TimeUnit.NANOSECONDS);
+            status = StatusCode.EOK;
+            readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
+            registerSuccessfulEvent(requestProcessor.fenceReadWaitStats, lastPhaseStartTime);
         }
 
-        ReferenceCountUtil.release(entryBody);
+        if (null != entryBody) {
+            ReferenceCountUtil.release(entryBody);
+        }
 
-        // Finally set status and return. The body would have been updated if
-        // a read went through.
         readResponse.setStatus(status);
-        return readResponse.build();
     }
 
-    @Override
-    public void safeRun() {
-        ReadResponse readResponse = getReadResponse();
-        sendResponse(readResponse);
+    private void sendFenceResponse(ReadResponse.Builder readResponse,
+                                   ByteBuf entryBody,
+                                   boolean fenceResult,
+                                   Stopwatch startTimeSw) {
+        // build the fence read response
+        getFenceResponse(readResponse, entryBody, fenceResult);
+        // register fence read stat
+        registerEvent(!fenceResult, requestProcessor.fenceReadEntryStats, startTimeSw);
+        // send the fence read response
+        sendResponse(readResponse.build());
+    }
+
+    protected ReadResponse buildResponse(
+            ReadResponse.Builder readResponseBuilder,
+            StatusCode statusCode,
+            Stopwatch startTimeSw) {
+        registerEvent(!statusCode.equals(StatusCode.EOK), readStats, startTimeSw);
+        readResponseBuilder.setStatus(statusCode);
+        return readResponseBuilder.build();
     }
 
-    private void sendResponse(ReadResponse readResponse) {
+    protected void sendResponse(ReadResponse readResponse) {
         Response.Builder response = Response.newBuilder()
                 .setHeader(getHeader())
                 .setStatus(readResponse.getStatus())
                 .setReadResponse(readResponse);
         sendResponse(response.getStatus(),
                      response.build(),
-                     requestProcessor.readRequestStats);
+                     reqStats);
+    }
+
+    //
+    // Stats Methods
+    //
+
+    protected void registerSuccessfulEvent(OpStatsLogger statsLogger, Stopwatch startTime) {
+        registerEvent(false, statsLogger, startTime);
+    }
+
+    protected void registerFailedEvent(OpStatsLogger statsLogger, Stopwatch startTime) {
+        registerEvent(true, statsLogger, startTime);
+    }
+
+    protected void registerEvent(boolean failed, OpStatsLogger statsLogger, Stopwatch startTime) {
+        if (failed) {
+            statsLogger.registerFailedEvent(startTime.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        } else {
+            statsLogger.registerSuccessfulEvent(startTime.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index e9cc1cb..01ebead 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -109,6 +109,6 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
             .setReadLacResponse(readLacResponse);
         sendResponse(response.getStatus(),
                 response.build(),
-                requestProcessor.readRequestStats);
+                requestProcessor.readLacRequestStats);
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
new file mode 100644
index 0000000..c735bef
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+
+/**
+ * Utilities for requests.
+ */
+class RequestUtils {
+
+    public static boolean isFenceRequest(ReadRequest readRequest) {
+        return readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER);
+    }
+
+    public static boolean isLongPollReadRequest(ReadRequest readRequest) {
+        return !isFenceRequest(readRequest) && readRequest.hasPreviousLAC();
+    }
+
+    public static boolean shouldPiggybackEntry(ReadRequest readRequest) {
+        return readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.ENTRY_PIGGYBACK);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/58b92de8/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index e8ffb34..d710102 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -107,7 +107,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
                     .setStatus(writeLacResponse.getStatus())
                     .setWriteLacResponse(writeLacResponse);
             Response resp = response.build();
-            sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacStats);
+            sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacRequestStats);
         }
     }
 }


Mime
View raw message