bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [3/3] bookkeeper git commit: BOOKKEEPER-874: Explict LAC from Writer to Bookies
Date Tue, 31 Jan 2017 03:02:02 GMT
BOOKKEEPER-874: Explict LAC from Writer to Bookies

Introduce a new feature for sending explicit LAC to bookies.
A writable LedgerHandle creates a timer thread to send explicit LACs
at the intervals specified through configuration paramenter,
explicitLacInterval. If this is set to zero, this feature is disabled,
no timer thread is created.

Explicit LAC is sent only if the client did not get a chance to send
LAC through piggyback method for "explicitLacInterval" time.
To implement this, introduced two new protocol messages to the
Bookkeeper protocol -  WRITE_LAC and READ_LAC, in addition to its
current READ_ENTRY and ADD_ENTRY.

Reviewed-by: Charan Reddy Guttapalem <cguttapalemsalesforce.com>
Signed-off-by: Venkateswararao Jujjuri (JV) <vjujjurisalesforce.com>
Co-Author : Charan Reddy Guttapalem <cguttapalemsalesforce.com>

Author: JV <vjujjuri@salesforce.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #89 from reddycharan/explicitlacsinglecommit


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/c813b3d3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/c813b3d3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/c813b3d3

Branch: refs/heads/master
Commit: c813b3d3298586ded02032a57a99d5fc6c974581
Parents: 42e8f12
Author: JV <vjujjuri@salesforce.com>
Authored: Mon Jan 30 19:01:48 2017 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Jan 30 19:01:48 2017 -0800

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |    3 +
 .../bookie/BookKeeperServerStats.java           |    2 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |   26 +
 .../org/apache/bookkeeper/bookie/FileInfo.java  |   33 +
 .../bookkeeper/bookie/IndexPersistenceMgr.java  |   28 +
 .../bookie/InterleavedLedgerStorage.java        |    8 +
 .../apache/bookkeeper/bookie/LedgerCache.java   |    3 +
 .../bookkeeper/bookie/LedgerCacheImpl.java      |    9 +
 .../bookkeeper/bookie/LedgerDescriptor.java     |    5 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java |    9 +
 .../apache/bookkeeper/bookie/LedgerStorage.java |    4 +
 .../apache/bookkeeper/client/AsyncCallback.java |   14 +
 .../apache/bookkeeper/client/BKException.java   |    2 +-
 .../apache/bookkeeper/client/BookKeeper.java    |   21 +-
 .../client/BookKeeperClientStats.java           |    6 +
 .../apache/bookkeeper/client/DigestManager.java |   55 +
 .../client/ExplicitLacFlushPolicy.java          |  153 +
 .../apache/bookkeeper/client/LedgerHandle.java  |  140 +-
 .../bookkeeper/client/PendingReadLacOp.java     |  145 +
 .../bookkeeper/client/PendingWriteLacOp.java    |  114 +
 .../bookkeeper/client/ReadOnlyLedgerHandle.java |    8 +
 .../bookkeeper/conf/ClientConfiguration.java    |   26 +-
 .../apache/bookkeeper/proto/BookieClient.java   |   71 +
 .../proto/BookieRequestProcessor.java           |   31 +
 .../proto/BookkeeperInternalCallbacks.java      |    8 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 6115 +++++++++++++-----
 .../proto/PerChannelBookieClient.java           |  293 +-
 .../bookkeeper/proto/ReadLacProcessorV3.java    |  108 +
 .../bookkeeper/proto/WriteLacProcessorV3.java   |  113 +
 .../bookkeeper/util/OrderedSafeExecutor.java    |  118 +-
 .../src/main/proto/BookkeeperProtocol.proto     |   31 +-
 .../bookkeeper/bookie/TestSyncThread.java       |    9 +
 .../bookkeeper/client/BookKeeperTest.java       |  124 +
 .../bookkeeper/meta/LedgerManagerTestCase.java  |   12 +
 34 files changed, 6354 insertions(+), 1493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index e2a2be6..c7fd2ca 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -91,6 +91,9 @@ ledgerDirectories=/tmp/bk-data
 # If it is set to less than zero, the minor compaction is disabled. 
 # minorCompactionInterval=3600
 
+# Interval between sending an explicit LAC in seconds
+explicitLacInterval = 1
+
 # Threshold of major compaction
 # For those entry log files whose remaining size percentage reaches below
 # this threshold will be compacted in a major compaction.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 239f923..9f1dbbb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -35,6 +35,8 @@ public interface BookKeeperServerStats {
     public final static String READ_ENTRY_FENCE_REQUEST = "READ_ENTRY_FENCE_REQUEST";
     public final static String READ_ENTRY_FENCE_WAIT = "READ_ENTRY_FENCE_WAIT";
     public final static String READ_ENTRY_FENCE_READ = "READ_ENTRY_FENCE_READ";
+    public final static String WRITE_LAC = "WRITE_LAC";
+    public final static String READ_LAC = "READ_LAC";
 
     // Bookie Operations
     public final static String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 82db3b0..bbbfa51 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -57,6 +57,7 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -1325,6 +1326,30 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    public void setExplicitLac(ByteBuffer entry, Object ctx, byte[] masterKey)
+            throws IOException, BookieException {
+        try {
+            long ledgerId = entry.getLong();
+            LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+            entry.rewind();
+            synchronized (handle) {
+                handle.setExplicitLac(entry);
+            }
+        } catch (NoWritableLedgerDirException e) {
+            transitionToReadOnlyMode();
+            throw new IOException(e);
+        }
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
+        ByteBuffer lac;
+        LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+        synchronized (handle) {
+            lac = handle.getExplicitLac();
+        }
+        return lac;
+    }
+
     /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
@@ -1566,4 +1591,5 @@ public class Bookie extends BookieCriticalThread {
     public int getExitCode() {
         return exitCode;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 38ff0d9..307b46b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -62,6 +62,8 @@ class FileInfo {
 
     private FileChannel fc;
     private File lf;
+    private ByteBuffer explicitLac = null;
+
     byte[] masterKey;
 
     /**
@@ -112,6 +114,37 @@ class FileInfo {
         return sizeSinceLastwrite;
     }
 
+    public ByteBuffer getExplicitLac() {
+        LOG.debug("fileInfo:GetLac: {}", explicitLac);
+        ByteBuffer retLac = null;
+        synchronized(this) {
+            if (explicitLac != null) {
+                retLac = ByteBuffer.allocate(explicitLac.capacity());
+                explicitLac.rewind();//copy from the beginning
+                retLac.put(explicitLac);
+                explicitLac.rewind();
+                retLac.flip();
+            }
+        }
+        return retLac;
+    }
+
+    public void setExplicitLac(ByteBuffer lac) {
+        synchronized(this) {
+            if (explicitLac == null) {
+                explicitLac = ByteBuffer.allocate(lac.capacity());
+            }
+            explicitLac.put(lac);
+            explicitLac.rewind();
+            
+            long ledgerId = explicitLac.getLong();            
+            long explicitLacValue = explicitLac.getLong();
+            setLastAddConfirmed(explicitLacValue);
+            explicitLac.rewind();
+        }
+        LOG.debug("fileInfo:SetLac: {}", explicitLac);
+    }
+
     synchronized public void readHeader() throws IOException {
         if (lf.exists()) {
             if (fc != null) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index f1ffcde..1ea000c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -389,6 +389,34 @@ public class IndexPersistenceMgr {
         }
     }
 
+    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            fi.setExplicitLac(lac);
+            return;
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.getExplicitLac();
+        } catch (IOException e) {
+            LOG.error("Exception during getLastAddConfirmed: {}", e);
+            return null;
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
     int getOpenFileLimit() {
         return openFileLimit;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index ee6edd5..308110b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -215,6 +215,14 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         return ledgerCache.isFenced(ledgerId);
     }
 
+    public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+        ledgerCache.setExplicitLac(ledgerId, lac);
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        return ledgerCache.getExplicitLac(ledgerId);
+    }
+
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
         ledgerCache.setMasterKey(ledgerId, masterKey);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 4e0fdc1..e004cb6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) in
@@ -50,4 +51,6 @@ interface LedgerCache extends Closeable {
     void deleteLedger(long ledgerId) throws IOException;
 
     LedgerCacheBean getJMXBean();
+    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException;
+    ByteBuffer getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index db84268..cece79f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -136,6 +137,14 @@ public class LedgerCacheImpl implements LedgerCache {
         return indexPersistenceManager.isFenced(ledgerId);
     }
 
+    public void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+        indexPersistenceManager.setExplicitLac(ledgerId, lac);
+    }
+
+    public ByteBuffer getExplicitLac(long ledgerId) {
+        return indexPersistenceManager.getExplicitLac(ledgerId);
+    }
+
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
         indexPersistenceManager.setMasterKey(ledgerId, masterKey);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index c7a8c97..bcb0c30 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -59,5 +59,10 @@ public abstract class LedgerDescriptor {
 
     abstract long addEntry(ByteBuffer entry) throws IOException;
     abstract ByteBuffer readEntry(long entryId) throws IOException;
+
     abstract long getLastAddConfirmed() throws IOException;
+
+    abstract void setExplicitLac(ByteBuffer entry) throws IOException;
+
+    abstract  ByteBuffer getExplicitLac();
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 266236d..bf1c129 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -69,6 +69,15 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     }
 
     @Override
+    void setExplicitLac(ByteBuffer lac) throws IOException {
+        ledgerStorage.setExplicitlac(ledgerId, lac);
+    }
+
+    @Override
+    ByteBuffer getExplicitLac() {
+        return ledgerStorage.getExplicitLac(ledgerId);
+    }
+    @Override
     long addEntry(ByteBuffer entry) throws IOException {
         long ledgerId = entry.getLong();
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index b0015bd..84a309f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -149,4 +149,8 @@ public interface LedgerStorage {
      * Get the JMX management bean for this LedgerStorage
      */
     BKMBeanInfo getJMXBean();
+
+    void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException;
+
+    ByteBuffer getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index d3f1728..05067d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -36,6 +36,20 @@ public interface AsyncCallback {
         void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
     }
 
+    public interface AddLacCallback {
+        /**
+         * Callback declaration
+         *
+         * @param rc
+         *          return code
+         * @param lh
+         *          ledger handle
+         * @param ctx
+         *          context object
+         */
+        void addLacComplete(int rc, LedgerHandle lh, Object ctx);
+    }
+
     public interface CloseCallback {
         /**
          * Callback definition

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 349709d..2377c1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -40,7 +40,7 @@ public abstract class BKException extends Exception {
     /**
      * Create an exception from an error code
      * @param code return error code
-     * @return correponding exception
+     * @return corresponding exception
      */
     public static BKException create(int code) {
         switch (code) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 2f8a0b8..8959462 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -95,6 +95,9 @@ public class BookKeeper implements AutoCloseable {
     private OpStatsLogger deleteOpLogger;
     private OpStatsLogger readOpLogger;
     private OpStatsLogger addOpLogger;
+    private OpStatsLogger writeLacOpLogger;
+    private OpStatsLogger readLacOpLogger;
+
 
     // whether the socket factory is one we created, or is owned by whoever
     // instantiated us
@@ -121,6 +124,7 @@ public class BookKeeper implements AutoCloseable {
     final EnsemblePlacementPolicy placementPolicy;
 
     final ClientConfiguration conf;
+    final int explicitLacInterval;
 
     // Close State
     boolean closed = false;
@@ -275,7 +279,7 @@ public class BookKeeper implements AutoCloseable {
     }
 
     /**
-     * Contructor for use with the builder. Other constructors also use it.
+     * Constructor for use with the builder. Other constructors also use it.
      */
     private BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
@@ -369,10 +373,16 @@ public class BookKeeper implements AutoCloseable {
         this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
         this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
         this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+        this.explicitLacInterval = conf.getExplictLacInterval();
+        LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
 
         scheduleBookieHealthCheckIfEnabled();
     }
 
+    public int getExplicitLacInterval() {
+        return explicitLacInterval;
+    }
+
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
                                                                       DNSToSwitchMapping dnsResolver,
                                                                       HashedWheelTimer timer,
@@ -906,8 +916,11 @@ public class BookKeeper implements AutoCloseable {
      * to add entries to the ledger. Any attempt to add entries will throw an
      * exception.
      *
-     * Reads from the returned ledger will only be able to read entries up until
+     * Reads from the returned ledger will be able to read entries up until
      * the lastConfirmedEntry at the point in time at which the ledger was opened.
+     * If an attempt is made to read beyond the ledger handle's LAC, an attempt is made
+     * to get the latest LAC from bookies or metadata, and if the entry_id of the read request
+     * is less than or equal to the new LAC, read will be allowed to proceed.
      *
      * @param lId
      *          ledger identifier
@@ -1199,6 +1212,8 @@ public class BookKeeper implements AutoCloseable {
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
+        writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
+        readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
     }
 
     OpStatsLogger getCreateOpLogger() { return createOpLogger; }
@@ -1206,4 +1221,6 @@ public class BookKeeper implements AutoCloseable {
     OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
     OpStatsLogger getReadOpLogger() { return readOpLogger; }
     OpStatsLogger getAddOpLogger() { return addOpLogger; }
+    OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
+    OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index e245ea3..a020425 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -28,6 +28,8 @@ public interface BookKeeperClientStats {
     public final static String OPEN_OP = "LEDGER_OPEN";
     public final static String ADD_OP = "ADD_ENTRY";
     public final static String READ_OP = "READ_ENTRY";
+    public final static String WRITE_LAC_OP = "WRITE_LAC";
+    public final static String READ_LAC_OP = "READ_LAC";
     public final static String PENDING_ADDS = "NUM_PENDING_ADD";
     public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
     public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS";
@@ -40,4 +42,8 @@ public interface BookKeeperClientStats {
     public final static String CHANNEL_TIMEOUT_READ = "TIMEOUT_READ_ENTRY";
     public final static String CHANNEL_ADD_OP = "ADD_ENTRY";
     public final static String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
+    public final static String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
+    public final static String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
+    public final static String CHANNEL_READ_LAC_OP = "READ_LAC";
+    public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
index 2753680..c72f31a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
@@ -40,6 +40,7 @@ abstract class DigestManager {
     static final Logger logger = LoggerFactory.getLogger(DigestManager.class);
 
     static final int METADATA_LENGTH = 32;
+    static final int LAC_METADATA_LENGTH = 16;
 
     long ledgerId;
 
@@ -102,6 +103,32 @@ abstract class DigestManager {
         return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength));
     }
 
+    /**
+     * Computes the digest for writeLac for sending.
+     *
+     * @param lac
+     * @return
+     */
+
+    public ChannelBuffer computeDigestAndPackageForSendingLac(long lac) {
+
+        byte[] bufferArray = new byte[LAC_METADATA_LENGTH + macCodeLength];
+        ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
+        buffer.putLong(ledgerId);
+        buffer.putLong(lac);
+        buffer.flip();
+
+        update(buffer.array(), 0, LAC_METADATA_LENGTH);
+        byte[] digest = getValueAndReset();
+
+        buffer.limit(buffer.capacity());
+        buffer.position(LAC_METADATA_LENGTH);
+        buffer.put(digest);
+        buffer.flip();
+
+        return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer));
+    }
+
     private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {
         verifyDigest(LedgerHandle.INVALID_ENTRY_ID, dataReceived, true);
     }
@@ -153,6 +180,34 @@ abstract class DigestManager {
 
     }
 
+    long verifyDigestAndReturnLac(ChannelBuffer dataReceived) throws BKDigestMatchException{
+        ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
+        byte[] digest;
+        if ((LAC_METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) {
+            logger.error("Data received is smaller than the minimum for this digest type."
+                    + " Either the packet it corrupt, or the wrong digest is configured. "
+                    + " Digest type: {}, Packet Length: {}",
+                    this.getClass().getName(), dataReceived.readableBytes());
+            throw new BKDigestMatchException();
+        }
+        update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), LAC_METADATA_LENGTH);
+        digest = getValueAndReset();
+        for (int i = 0; i < digest.length; i++) {
+            if (digest[i] != dataReceived.getByte(LAC_METADATA_LENGTH + i)) {
+                logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
+                throw new BKDigestMatchException();
+            }
+        }
+        long actualLedgerId = dataReceived.readLong();
+        long lac = dataReceived.readLong();
+        if (actualLedgerId != ledgerId) {
+            logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+                         + actualLedgerId);
+            throw new BKDigestMatchException();
+        }
+        return lac;
+    }
+
     /**
      * Verify that the digest matches and returns the data in the entry.
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
new file mode 100644
index 0000000..65ef8af
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.bookkeeper.client.LedgerHandle.LastAddConfirmedCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+interface ExplicitLacFlushPolicy {
+    void stopExplicitLacFlush();
+
+    void updatePiggyBackedLac(long piggyBackedLac);
+
+    static final ExplicitLacFlushPolicy VOID_EXPLICITLAC_FLUSH_POLICY = new ExplicitLacFlushPolicy() {
+        @Override
+        public void stopExplicitLacFlush() {
+            // void method
+        }
+
+        @Override
+        public void updatePiggyBackedLac(long piggyBackedLac) {
+            // void method
+        }
+    };
+
+    class ExplicitLacFlushPolicyImpl implements ExplicitLacFlushPolicy {
+        final static Logger LOG = LoggerFactory.getLogger(ExplicitLacFlushPolicyImpl.class);
+
+        volatile long piggyBackedLac = LedgerHandle.INVALID_ENTRY_ID;
+        volatile long explicitLac = LedgerHandle.INVALID_ENTRY_ID;
+        final LedgerHandle lh;
+        ScheduledFuture<?> scheduledFuture;
+
+        ExplicitLacFlushPolicyImpl(LedgerHandle lh) {
+            this.lh = lh;
+            scheduleExplictLacFlush();
+            LOG.debug("Scheduled Explicit Last Add Confirmed Update");
+        }
+
+        private long getExplicitLac() {
+            return explicitLac;
+        }
+
+        private void setExplicitLac(long explicitLac) {
+            this.explicitLac = explicitLac;
+        }
+
+        private long getPiggyBackedLac() {
+            return piggyBackedLac;
+        }
+
+        public void setPiggyBackedLac(long piggyBackedLac) {
+            this.piggyBackedLac = piggyBackedLac;
+        }
+
+        private void scheduleExplictLacFlush() {
+            int explicitLacIntervalInSec = lh.bk.getExplicitLacInterval();
+            final SafeRunnable updateLacTask = new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    // Made progress since previous explicitLAC through
+                    // Piggyback, so no need to send an explicit LAC update to
+                    // bookies.
+                    if (getExplicitLac() < getPiggyBackedLac()) {
+                        LOG.debug("ledgerid: {}", lh.getId());
+                        LOG.debug("explicitLac:{} piggybackLac:{}", getExplicitLac(),
+                                getPiggyBackedLac());
+                        setExplicitLac(getPiggyBackedLac());
+                        return;
+                    }
+
+                    if (lh.getLastAddConfirmed() > getExplicitLac()) {
+                        // Send Explicit LAC
+                        LOG.debug("ledgerid: {}", lh.getId());
+                        asyncExplicitLacFlush(lh.getLastAddConfirmed());
+                        setExplicitLac(lh.getLastAddConfirmed());
+                        LOG.debug("After sending explict LAC lac: {}  explicitLac:{}", lh.getLastAddConfirmed(),
+                                getExplicitLac());
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return String.format("UpdateLacTask ledgerId - (%d)", lh.getId());
+                }
+            };
+            try {
+                scheduledFuture = lh.bk.mainWorkerPool.scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                        explicitLacIntervalInSec, explicitLacIntervalInSec, SECONDS);
+            } catch (RejectedExecutionException re) {
+                LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
+                        lh.getId(), re);
+            }
+        }
+
+        /**
+         * Make a LastAddUpdate request.
+         */
+        void asyncExplicitLacFlush(final long explicitLac) {
+            final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
+            final PendingWriteLacOp op = new PendingWriteLacOp(lh, cb, null);
+            op.setLac(explicitLac);
+            try {
+                LOG.debug("Sending Explicit LAC: {}", explicitLac);
+                lh.bk.mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        ChannelBuffer toSend = lh.macManager
+                                .computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+                        op.initiate(toSend);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                cb.addLacComplete(lh.bk.getReturnRc(BKException.Code.InterruptedException), lh, null);
+            }
+        }
+
+        @Override
+        public void stopExplicitLacFlush() {
+            scheduledFuture.cancel(true);
+        }
+
+        @Override
+        public void updatePiggyBackedLac(long piggyBackedLac) {
+            setPiggyBackedLac(piggyBackedLac);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 11212a7..290caa9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -31,12 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
@@ -55,8 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 /**
  * Ledger handle contains ledger metadata and is used to access the read and
@@ -71,6 +71,7 @@ public class LedgerHandle implements AutoCloseable {
     final long ledgerId;
     long lastAddPushed;
     volatile long lastAddConfirmed;
+
     long length;
     final DigestManager macManager;
     final DistributionSchedule distributionSchedule;
@@ -85,6 +86,7 @@ public class LedgerHandle implements AutoCloseable {
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     Queue<PendingAddOp> pendingAddOps;
+    ExplicitLacFlushPolicy explicitLacFlushPolicy;
 
     final Counter ensembleChangeCounter;
     final Counter lacUpdateHitsCounter;
@@ -97,6 +99,7 @@ public class LedgerHandle implements AutoCloseable {
         this.metadata = metadata;
         this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
+
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
             length = metadata.getLength();
@@ -130,6 +133,15 @@ public class LedgerHandle implements AutoCloseable {
                                                   return pendingAddOps.size();
                                               }
                                           });
+        initializeExplicitLacFlushPolicy();
+    }
+
+    protected void initializeExplicitLacFlushPolicy() {
+        if (!metadata.isClosed() && bk.getExplicitLacInterval() > 0) {
+            explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
+        } else {
+            explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
+        }
     }
 
     /**
@@ -272,6 +284,8 @@ public class LedgerHandle implements AutoCloseable {
 
         asyncClose(new SyncCloseCallback(), counter);
 
+        explicitLacFlushPolicy.stopExplicitLacFlush();
+        
         SynchCallbackUtils.waitForResult(counter);
     }
 
@@ -478,11 +492,18 @@ public class LedgerHandle implements AutoCloseable {
      * @param ctx
      *          control object
      */
-    public void asyncReadEntries(long firstEntry, long lastEntry,
-                                 ReadCallback cb, Object ctx) {
+    public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
         // Little sanity check
-        if (firstEntry < 0 || lastEntry > lastAddConfirmed
-                || firstEntry > lastEntry) {
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx);
+            return;
+        }
+
+        if (lastEntry > lastAddConfirmed) {
+            LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
             cb.readComplete(BKException.Code.ReadException, this, null, ctx);
             return;
         }
@@ -929,6 +950,86 @@ public class LedgerHandle implements AutoCloseable {
         return ctx.getlastConfirmed();
     }
 
+    /**
+     * Obtains asynchronously the explicit last add confirmed from a quorum of
+     * bookies. This call obtains the the explicit last add confirmed each
+     * bookie has received for this ledger and returns the maximum. If in the
+     * write LedgerHandle, explicitLAC feature is not enabled then this will
+     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
+     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
+     * returns the value of the last add confirmed from the metadata.
+     *
+     * @see #getLastAddConfirmed()
+     * 
+     * @param cb
+     *          callback to return read explicit last confirmed
+     * @param ctx
+     *          callback context
+     */
+    public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) {
+        boolean isClosed;
+        synchronized (this) {
+            isClosed = metadata.isClosed();
+            if (isClosed) {
+                lastAddConfirmed = metadata.getLastEntryId();
+                length = metadata.getLength();
+            }
+        }
+        if (isClosed) {
+            cb.readLastConfirmedComplete(BKException.Code.OK, lastAddConfirmed, ctx);
+            return;
+        }
+
+        PendingReadLacOp.LacCallback innercb = new PendingReadLacOp.LacCallback() {
+
+            @Override
+            public void getLacComplete(int rc, long lac) {
+                if (rc == BKException.Code.OK) {
+                    // here we are trying to update lac only but not length 
+                    updateLastConfirmed(lac, 0);
+                    cb.readLastConfirmedComplete(rc, lac, ctx);
+                } else {
+                    cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx);
+                }
+            }
+        };
+        new PendingReadLacOp(this, innercb).initiate();
+    }
+
+    /**
+     * Obtains synchronously the explicit last add confirmed from a quorum of
+     * bookies. This call obtains the the explicit last add confirmed each
+     * bookie has received for this ledger and returns the maximum. If in the
+     * write LedgerHandle, explicitLAC feature is not enabled then this will
+     * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit
+     * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the
+     * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it
+     * returns the value of the last add confirmed from the metadata.
+     *
+     * @see #getLastAddConfirmed()
+     *
+     * @return The entry id of the explicit last confirmed write or
+     *         {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been
+     *         confirmed or if explicitLAC feature is not enabled in write
+     *         LedgerHandle.
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public long readExplicitLastConfirmed() throws InterruptedException, BKException {
+        LastConfirmedCtx ctx = new LastConfirmedCtx();
+        asyncReadExplicitLastConfirmed(new SyncReadLastConfirmedCallback(), ctx);
+        synchronized (ctx) {
+            while (!ctx.ready()) {
+                ctx.wait();
+            }
+        }
+        if (ctx.getRC() != BKException.Code.OK) {
+            throw BKException.create(ctx.getRC());
+        }
+        return ctx.getlastConfirmed();
+    }
+
     // close the ledger and send fails to all the adds in the pipeline
     void handleUnrecoverableErrorDuringAdd(int rc) {
         if (metadata.isInRecovery()) {
@@ -976,8 +1077,11 @@ public class LedgerHandle implements AutoCloseable {
                 LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId, lastAddConfirmed);
                 return;
             }
+
             pendingAddOps.remove();
+            explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
             lastAddConfirmed = pendingAddOp.entryId;
+
             pendingAddOp.submitCallback(BKException.Code.OK);
         }
 
@@ -1327,6 +1431,30 @@ public class LedgerHandle implements AutoCloseable {
         }
     }
 
+    static class LastAddConfirmedCallback implements AddLacCallback {
+        static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback();
+        /**
+         * Implementation of callback interface for synchronous read method.
+         *
+         * @param rc
+         *          return code
+         * @param leder
+         *          ledger identifier
+         * @param entry
+         *          entry identifier
+         * @param ctx
+         *          control object
+         */
+        @Override
+        public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
+            if (rc != BKException.Code.OK) {
+                LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
+            } else {
+                LOG.debug("Callback LAC Updated for: {} ", lh.getId());
+            }
+        }
+    }
+
     static class SyncReadCallback implements ReadCallback {
         /**
          * Implementation of callback interface for synchronous read method.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
new file mode 100644
index 0000000..64e266f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.DigestManager.RecoveryData;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a pending ReadLac operation.
+ *
+ * LAC is stored in two places on bookies.
+ * 1. WriteLac operation sends Explicit LAC and is stored in memory on each bookie.
+ * 2. Each AddEntry operation piggy-backs LAC which is stored on bookie's disk.
+ *
+ * This operation returns both of those entries and we pick the latest LAC out of
+ * available answers.
+ *
+ * This is an optional protocol operations to facilitate tailing readers
+ * to be up to date with the writer. This is best effort to get latest LAC
+ * from bookies, and doesn't affect the correctness of the protocol.
+ */
+
+class PendingReadLacOp implements ReadLacCallback {
+    static final Logger LOG = LoggerFactory.getLogger(PendingReadLacOp.class);
+    LedgerHandle lh;
+    LacCallback cb;
+    int numResponsesPending;
+    volatile boolean completed = false;
+    int lastSeenError = BKException.Code.ReadException;
+    final DistributionSchedule.QuorumCoverageSet coverageSet;
+    long maxLac = LedgerHandle.INVALID_ENTRY_ID;
+
+    /*
+     * Wrapper to get Lac from the request
+     */
+    interface LacCallback {
+        public void getLacComplete(int rc, long lac);
+    }
+
+    PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
+        this.lh = lh;
+        this.cb = cb;
+        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.coverageSet = lh.distributionSchedule.getCoverageSet();
+    }
+
+    public void initiate() {
+        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+            lh.bk.bookieClient.readLac(lh.metadata.currentEnsemble.get(i),
+                    lh.ledgerId, this, i);
+        }
+    }
+
+    @Override
+    public void readLacComplete(int rc, long ledgerId, final ChannelBuffer lacBuffer, final ChannelBuffer lastEntryBuffer, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+        numResponsesPending--;
+        boolean heardValidResponse = false;
+
+        if (completed) {
+            return;
+        }
+
+        if (rc == BKException.Code.OK) {
+            try {
+                // Each bookie may have two store LAC in two places.
+                // One is in-memory copy in FileInfo and other is
+                // piggy-backed LAC on the last entry.
+                // This routine picks both of them and compares to return
+                // the latest Lac.
+
+                // Extract lac from FileInfo on the ledger.
+                long lac = lh.macManager.verifyDigestAndReturnLac(lacBuffer);
+                if (lac > maxLac) {
+                    maxLac = lac;
+                }
+
+                // Extract lac from last entry on the disk
+                RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(lastEntryBuffer);
+                if (recoveryData.lastAddConfirmed > maxLac) {
+                    maxLac = recoveryData.lastAddConfirmed;
+                }
+                heardValidResponse = true;
+            } catch (BKDigestMatchException e) {
+                // Too bad, this bookie did not give us a valid answer, we
+                // still might be able to recover. So, continue
+                LOG.error("Mac mismatch while reading  ledger: " + ledgerId + " LAC from bookie: "
+                        + lh.metadata.currentEnsemble.get(bookieIndex));
+                rc = BKException.Code.DigestMatchException;
+            }
+        }
+
+        if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
+            heardValidResponse = true;
+        }
+
+        if (rc == BKException.Code.UnauthorizedAccessException && !completed) {
+            cb.getLacComplete(rc, maxLac);
+            completed = true;
+            return;
+        }
+
+        if (!heardValidResponse && BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        // We don't consider a success until we have coverage set responses.
+        if (heardValidResponse
+                && coverageSet.addBookieAndCheckCovered(bookieIndex)
+                && !completed) {
+            completed = true;
+            LOG.debug("Read LAC complete with enough validResponse for ledger: {} LAC: {}",
+                    ledgerId, maxLac);
+            cb.getLacComplete(BKException.Code.OK, maxLac);
+            return;
+        }
+
+        if (numResponsesPending == 0 && !completed) {
+            LOG.info("While readLac ledger: " + ledgerId + " did not hear success responses from all of ensemble");
+            cb.getLacComplete(lastSeenError, maxLac);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
new file mode 100644
index 0000000..dc7368b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a pending WriteLac operation. When it has got
+ * success from Ack Quorum bookies, sends success back to the application,
+ * otherwise failure is sent back to the caller.
+ *
+ * This is an optional protocol operations to facilitate tailing readers
+ * to be up to date with the writer. This is best effort to get latest LAC
+ * from bookies, and doesn't affect the correctness of the protocol.
+ */
+class PendingWriteLacOp implements WriteLacCallback {
+    private final static Logger LOG = LoggerFactory.getLogger(PendingWriteLacOp.class);
+    ChannelBuffer toSend;
+    AddLacCallback cb;
+    long lac;
+    Object ctx;
+    Set<Integer> writeSet;
+    Set<Integer> receivedResponseSet;
+
+    DistributionSchedule.AckSet ackSet;
+    boolean completed = false;
+    int lastSeenError = BKException.Code.WriteException;
+
+    LedgerHandle lh;
+    OpStatsLogger putLacOpLogger;
+
+    PendingWriteLacOp(LedgerHandle lh, AddLacCallback cb, Object ctx) {
+        this.lh = lh;
+        this.cb = cb;
+        this.ctx = ctx;
+        this.lac = LedgerHandle.INVALID_ENTRY_ID;
+        ackSet = lh.distributionSchedule.getAckSet();
+        putLacOpLogger = lh.bk.getWriteLacOpLogger();
+    }
+
+    void setLac(long lac) {
+        this.lac = lac;
+        this.writeSet = new HashSet<Integer>(lh.distributionSchedule.getWriteSet(lac));
+        this.receivedResponseSet = new HashSet<Integer>(writeSet);
+    }
+
+    void sendWriteLacRequest(int bookieIndex) {
+        lh.bk.bookieClient.writeLac(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
+                lac, toSend, this, bookieIndex);
+    }
+
+    void initiate(ChannelBuffer toSend) {
+        this.toSend = toSend;
+        for (int bookieIndex: writeSet) {
+            sendWriteLacRequest(bookieIndex);
+        }
+    }
+
+    @Override
+    public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+        int bookieIndex = (Integer) ctx;
+
+        if (completed) {
+            return;
+        }
+
+        if (BKException.Code.OK != rc) {
+            lastSeenError = rc;
+        }
+
+        // We got response.
+        receivedResponseSet.remove(bookieIndex);
+
+        if (rc == BKException.Code.OK) {
+            if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+                completed = true;
+                cb.addLacComplete(rc, lh, ctx);
+                return;
+            }
+        } else {
+            LOG.warn("WriteLac did not succeed: Ledger {} on {}", new Object[] { ledgerId, addr });
+        }
+        
+        if(receivedResponseSet.isEmpty()){
+            completed = true;
+            cb.addLacComplete(lastSeenError, lh, ctx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 711f209..1834eff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -54,6 +54,10 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
                     ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
             if (Version.Occurred.BEFORE == occurred) {
                 LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m);
+                if (this.m.isClosed()) {
+                        ReadOnlyLedgerHandle.this.lastAddConfirmed = this.m.getLastEntryId();
+                        ReadOnlyLedgerHandle.this.length = this.m.getLength();
+                }
                 ReadOnlyLedgerHandle.this.metadata = this.m;
             }
         }
@@ -170,4 +174,8 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
         return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
     }
 
+    @Override
+    protected void initializeExplicitLacFlushPolicy() {
+        explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7353c3f..fa42dc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -65,6 +65,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
     protected final static String READ_ENTRY_TIMEOUT_SEC = "readEntryTimeoutSec";
     protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis";
+    protected final static String EXPLICIT_LAC_INTERVAL = "explicitLacInterval";
     protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
     protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected final static String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
@@ -76,7 +77,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = "bookieErrorThresholdPerInterval";
     protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = "bookieQuarantineTimeSeconds";
 
-    // Number Woker Threads
+    // Number Worker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
 
     // Ensemble Placement Policy
@@ -595,6 +596,29 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the configured interval between  explicit LACs to bookies.
+     * Generally LACs are piggy-backed on writes, and user can configure
+     * the interval between these protocol messages. A value of '0' disables
+     * sending any explicit LACs.
+     *
+     * @return interval between explicit LACs
+     */
+    public int getExplictLacInterval() {
+        return getInt(EXPLICIT_LAC_INTERVAL, 0);
+    }
+
+    /**
+     * Set the interval to check the need for sending an explicit LAC.
+     * @param interval
+     *        Number of seconds between checking the need for sending an explict LAC.
+     * @return Client configuration.
+     */
+    public ClientConfiguration setExplictLacInterval(int interval) {
+        setProperty(EXPLICIT_LAC_INTERVAL, interval);
+        return this;
+    }
+
+    /**
      * Get the tick duration in milliseconds that used for the
      * {@link org.jboss.netty.util.HashedWheelTimer} that used by PCBC to timeout
      * requests.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 9b0865a..4a742da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -33,10 +33,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -166,6 +169,41 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return clientPool;
     }
 
+    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
+            final long lac, final ChannelBuffer toSend, final WriteLacCallback cb, final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, lac);
+            if (client == null) {
+                cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                  ledgerId, addr, ctx);
+                return;
+            }
+
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.writeLacComplete(rc, ledgerId, addr, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+                        }
+                        return;
+                    }
+                    pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+                }
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
             final long entryId,
             final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
@@ -243,6 +281,39 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
+    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, BookieProtocol.LAST_ADD_CONFIRMED);
+            if (client == null) {
+                cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, ctx);
+                return;
+            }
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc,PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submitOrdered(ledgerId, new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.readLacComplete(rc, ledgerId, null, null, ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.readLacComplete(getRc(BKException.Code.InterruptedException),
+                                    ledgerId, null, null, ctx);
+                        }
+                        return;
+                    }
+                    pcbc.readLac(ledgerId, cb, ctx);
+                }
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx) {
         closeLock.readLock().lock();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7b227fa..210bc72 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -41,6 +41,9 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
+
 
 public class BookieRequestProcessor implements RequestProcessor {
 
@@ -73,6 +76,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
+    final OpStatsLogger writeLacStats;
+    final OpStatsLogger readLacStats;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
                                   StatsLogger statsLogger) {
@@ -86,6 +91,8 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
         this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
         this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
+        this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
+        this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
     }
 
     @Override
@@ -135,6 +142,12 @@ public class BookieRequestProcessor implements RequestProcessor {
                             .setAuthResponse(message);
                     c.write(authResponse.build());
                     break;
+                case WRITE_LAC:
+                    processWriteLacRequestV3(r,c);
+                    break;
+                case READ_LAC:
+                    processReadLacRequestV3(r,c);
+                    break;
                 default:
                     LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =
@@ -185,6 +198,24 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
+    private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
+        if (null == writeThreadPool) {
+            writeLac.run();
+        } else {
+            writeThreadPool.submit(writeLac);
+        }
+    }
+
+    private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
+        if (null == readThreadPool) {
+            readLac.run();
+        } else {
+            readThreadPool.submit(readLac);
+        }
+    }
+
     private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
         WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
         if (null == writeThreadPool) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index e6e7802..261c93d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -65,6 +65,14 @@ public class BookkeeperInternalCallbacks {
         void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx);
     }
 
+    public interface ReadLacCallback {
+        void readLacComplete(int rc, long ledgerId, ChannelBuffer lac, ChannelBuffer buffer, Object ctx);
+    }
+
+    public interface WriteLacCallback {
+        void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx);
+    }
+
     public interface GenericCallback<T> {
         void operationComplete(int rc, T result);
     }


Mime
View raw message