bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [1/3] bookkeeper git commit: BOOKKEEPER-874: Explict LAC from Writer to Bookies
Date Tue, 31 Jan 2017 03:02:00 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 42e8f1294 -> c813b3d32


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index a4fb761..3fb73e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.auth.ClientAuthProvider;
+import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -39,13 +40,22 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -124,7 +134,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final OpStatsLogger readEntryOpLogger;
     private final OpStatsLogger readTimeoutOpLogger;
     private final OpStatsLogger addEntryOpLogger;
+    private final OpStatsLogger writeLacOpLogger;
+    private final OpStatsLogger readLacOpLogger;
     private final OpStatsLogger addTimeoutOpLogger;
+    private final OpStatsLogger writeLacTimeoutOpLogger;
+    private final OpStatsLogger readLacTimeoutOpLogger;
 
     /**
      * The following member variables do not need to be concurrent, or volatile
@@ -192,8 +206,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
         readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
         addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
+        writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
+        readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
+        writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
+        readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
 
         this.pcbcPool = pcbcPool;
 
@@ -238,6 +256,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) {
+        //Thread.dumpStack();
         closeLock.readLock().lock();
         try {
             if (ConnectionState.CLOSED == state) {
@@ -365,6 +384,60 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
     }
 
+    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ChannelBuffer toSend, WriteLacCallback cb, Object ctx) {
+        final long txnId = getTxnId();
+        final int entrySize = toSend.readableBytes();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.WRITE_LAC);
+        // writeLac is mostly like addEntry hence uses addEntryTimeout
+        completionObjects.put(completionKey,
+                new WriteLacCompletion(writeLacOpLogger, cb, ctx, lac, scheduleTimeout(completionKey, addEntryTimeout)));
+
+        // Build the request
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.WRITE_LAC)
+                .setTxnId(txnId);
+        WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setLac(lac)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+        final Request writeLacRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setWriteLacRequest(writeLacBuilder)
+                .build();
+
+        final Channel c = channel;
+        if (c == null) {
+            errorOutWriteLacKey(completionKey);
+            return;
+        }
+        try {
+            ChannelFuture future = c.write(writeLacRequest);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}",
+                                    ledgerId, c.getRemoteAddress());
+                        }
+                    } else {
+                        if (!(future.getCause() instanceof ClosedChannelException)) {
+                            LOG.warn("Writing Lac(lid={} to channel {} failed : ",
+                                    new Object[] { ledgerId, c, future.getCause() });
+                        }
+                        errorOutWriteLacKey(completionKey);
+                    }
+                }
+            });
+        } catch (Throwable e) {
+            LOG.warn("writeLac operation failed", e);
+            errorOutWriteLacKey(completionKey);
+        }
+    }
+
     /**
      * This method should be called only after connection has been checked for
      * {@link #connectIfNeededAndDoOp(GenericCallback)}
@@ -502,6 +575,52 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
+    public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+        completionObjects.put(completionKey,
+                new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
+                        scheduleTimeout(completionKey, readEntryTimeout)));
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_LAC)
+                .setTxnId(txnId);
+        ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+                .setLedgerId(ledgerId);
+        final Request readLacRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadLacRequest(readLacBuilder)
+                .build();
+        final Channel c = channel;
+        if (c == null) {
+            errorOutReadLacKey(completionKey);
+            return;
+        }
+
+        try {
+            ChannelFuture future = c.write(readLacRequest);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        LOG.debug("Succssfully wrote request {} to {}",
+                                    readLacRequest, c.getRemoteAddress());
+                    } else {
+                        if (!(future.getCause() instanceof ClosedChannelException)) {
+                            LOG.warn("Writing readLac(lid = {}) to channel {} failed : ",
+                                    new Object[] { ledgerId, c, future.getCause() });
+                        }
+                        errorOutReadLacKey(completionKey);
+                    }
+                }
+            });
+        } catch(Throwable e) {
+            LOG.warn("Read LAC operation {} failed", readLacRequest, e);
+            errorOutReadLacKey(completionKey);
+        }
+    }
+
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
@@ -649,6 +768,54 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         });
     }
 
+    void errorOutWriteLacKey(final CompletionKey key) {
+        errorOutWriteLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+        final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
+        if (null == writeLacCompletion) {
+            return;
+        }
+        executor.submitOrdered(writeLacCompletion.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                String bAddress = "null";
+                Channel c = channel;
+                if (c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
+                LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}",
+                          new Object[] { writeLacCompletion.ledgerId, bAddress});
+                writeLacCompletion.cb.writeLacComplete(rc, writeLacCompletion.ledgerId, addr, writeLacCompletion.ctx);
+            }
+        });
+    }
+
+    void errorOutReadLacKey(final CompletionKey key) {
+        errorOutReadLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+    }
+
+    void errorOutReadLacKey(final CompletionKey key, final int rc) {
+        final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
+        if (null == readLacCompletion) {
+            return;
+        }
+        executor.submitOrdered(readLacCompletion.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                String bAddress = "null";
+                Channel c = channel;
+                if (c != null) {
+                    bAddress = c.getRemoteAddress().toString();
+                }
+                LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}",
+                          new Object[] { readLacCompletion.ledgerId, bAddress});
+                readLacCompletion.cb.readLacComplete(rc, readLacCompletion.ledgerId, null, null, readLacCompletion.ctx);
+            }
+        });
+    }
+
     void errorOutAddKey(final CompletionKey key) {
         errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException);
     }
@@ -836,6 +1003,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                         case READ_ENTRY:
                             handleReadResponse(response, completionValue);
                             break;
+                        case WRITE_LAC:
+                            handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
+                            break;
+                        case READ_LAC:
+                            handleReadLacResponse(response.getReadLacResponse(), completionValue);
+                            break;
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
                                       type, addr);
@@ -853,7 +1026,26 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleAddResponse(Response response, CompletionValue completionValue) {
+    void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
+        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
+        WriteLacCompletion plc = (WriteLacCompletion)completionValue;
+
+        long ledgerId = writeLacResponse.getLedgerId();
+        StatusCode status = writeLacResponse.getStatus();
+
+        LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
+
+        // convert to BKException code
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.error("writeLac for ledger: " + ledgerId + " failed on bookie: " + addr
+                        + " with code:" + status);
+            rcToRet = BKException.Code.WriteException;
+        }
+        plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
+    }
+
+ void handleAddResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
         AddResponse addResponse = response.getAddResponse();
@@ -866,7 +1058,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
                     + entryId + " rc: " + status);
         }
-        // convert to BKException code because thats what the uppper
+        // convert to BKException code because thats what the upper
         // layers expect. This is UGLY, there should just be one set of
         // error codes.
         Integer rcToRet = statusCodeToExceptionCode(status);
@@ -880,6 +1072,36 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
+    void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
+        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
+        ReadLacCompletion glac = (ReadLacCompletion)completionValue;
+
+        long ledgerId = readLacResponse.getLedgerId();
+        StatusCode status = readLacResponse.getStatus();
+        ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+        ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+
+       // Thread.dumpStack();
+
+        if (readLacResponse.hasLacBody()) {
+            lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+        }
+
+        if (readLacResponse.hasLastEntryBody()) {
+            lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+        }
+
+        LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
+        // convert to BKException code
+        Integer rcToRet = statusCodeToExceptionCode(status);
+        if (null == rcToRet) {
+            LOG.debug("readLac for ledger: " + ledgerId + " failed on bookie: " + addr
+                      + " with code:" + status);
+            rcToRet = BKException.Code.ReadException;
+        }
+        glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
+    }
+
     void handleReadResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
@@ -940,6 +1162,63 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     // visible for testing
+    static class WriteLacCompletion extends CompletionValue {
+        final WriteLacCallback cb;
+
+        public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) {
+            this(null, cb, ctx, ledgerId, null);
+        }
+
+        public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback,
+                final Object originalCtx, final long ledgerId, final Timeout timeout) {
+            super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
+            final long startTime = MathUtils.nowInNano();
+            this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() {
+                @Override
+                public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
+                    cancelTimeout();
+                    long latency = MathUtils.elapsedNanos(startTime);
+                    if (rc != BKException.Code.OK) {
+                        writeLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+                    } else {
+                        writeLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                    }
+                    originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx);
+                }
+            };
+
+        }
+    }
+
+    // visible for testing
+    static class ReadLacCompletion extends CompletionValue {
+        final ReadLacCallback cb;
+
+        public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) {
+            this (null, cb, ctx, ledgerId, null);
+        }
+
+        public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback,
+                final Object ctx, final long ledgerId, final Timeout timeout) {
+            super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
+            final long startTime = MathUtils.nowInNano();
+            this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() {
+                @Override
+                public void readLacComplete(int rc, long ledgerId, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, Object ctx) {
+                    cancelTimeout();
+                    long latency = MathUtils.elapsedNanos(startTime);
+                    if (rc != BKException.Code.OK) {
+                        readLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
+                    } else {
+                        readLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
+                    }
+                    originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+                }
+            };
+        }
+    }
+
+    // visible for testing
     static class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
@@ -1070,11 +1349,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             if (OperationType.ADD_ENTRY == operationType) {
                 errorOutAddKey(this, BKException.Code.TimeoutException);
                 addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else {
+            } else if (OperationType.READ_ENTRY == operationType) {
                 errorOutReadKey(this, BKException.Code.TimeoutException);
                 readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            } else if (OperationType.WRITE_LAC == operationType) {
+                errorOutWriteLacKey(this, BKException.Code.TimeoutException);
+                writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            } else {
+                errorOutReadLacKey(this, BKException.Code.TimeoutException);
+                readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
             }
-        }
+	}
     }
 
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
new file mode 100644
index 0000000..e9a4c13
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+class ReadLacProcessorV3 extends PacketProcessorBaseV3 {
+    private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
+
+    public ReadLacProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private ReadLacResponse getReadLacResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        ReadLacRequest readLacRequest = request.getReadLacRequest();
+        long ledgerId = readLacRequest.getLedgerId();
+
+        final ReadLacResponse.Builder readLacResponse = ReadLacResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            readLacResponse.setStatus(StatusCode.EBADVERSION);
+            return readLacResponse.build();
+        }
+
+        logger.debug("Received ReadLac request: {}", request);
+        StatusCode status = StatusCode.EOK;
+        ByteBuffer lastEntry;
+        ByteBuffer lac;
+        try {
+            lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            lac = requestProcessor.bookie.getExplicitLac(ledgerId);
+            if (lac != null) {
+                readLacResponse.setLacBody(ByteString.copyFrom(lac));
+                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry));
+            } else {
+                status = StatusCode.ENOENTRY;
+            }
+        } catch (Bookie.NoLedgerException e) {
+            status = StatusCode.ENOLEDGER;
+            logger.error("No ledger found while performing readLac from ledger: {}", ledgerId);
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            logger.error("IOException while performing readLac from ledger: {}", ledgerId);
+        }
+        if (status == StatusCode.EOK) {
+            requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    TimeUnit.NANOSECONDS);
+        } else {
+            requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    TimeUnit.NANOSECONDS);
+        }
+        // Finally set the status and return
+        readLacResponse.setStatus(status);
+        return readLacResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        ReadLacResponse readLacResponse = getReadLacResponse();
+        sendResponse(readLacResponse);
+    }
+
+    private void sendResponse(ReadLacResponse readLacResponse) {
+        Response.Builder response = Response.newBuilder()
+            .setHeader(getHeader())
+            .setStatus(readLacResponse.getStatus())
+            .setReadLacResponse(readLacResponse);
+        sendResponse(response.getStatus(),
+                response.build(),
+                requestProcessor.readRequestStats);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
new file mode 100644
index 0000000..104f561
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class WriteLacProcessorV3 extends PacketProcessorBaseV3 {
+    private final static Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class);
+
+    public WriteLacProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private WriteLacResponse getWriteLacResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        WriteLacRequest writeLacRequest = request.getWriteLacRequest();
+        long lac = writeLacRequest.getLac();
+        long ledgerId = writeLacRequest.getLedgerId();
+
+        final WriteLacResponse.Builder writeLacResponse = WriteLacResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            writeLacResponse.setStatus(StatusCode.EBADVERSION);
+            return writeLacResponse.build();
+        }
+
+        if (requestProcessor.bookie.isReadOnly()) {
+            logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
+            writeLacResponse.setStatus(StatusCode.EREADONLY);
+            return writeLacResponse.build();
+        }
+
+        StatusCode status = null;
+        ByteBuffer lacToAdd = writeLacRequest.getBody().asReadOnlyByteBuffer();
+        byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
+
+        try {
+            requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey);
+            status = StatusCode.EOK;
+        } catch (IOException e) {
+            logger.error("Error saving lac for ledger:{}",
+                          new Object[] { lac, ledgerId, e });
+            status = StatusCode.EIO;
+        } catch (BookieException e) {
+            logger.error("Unauthorized access to ledger:{} while adding lac:{}",
+                                                  ledgerId, lac);
+            status = StatusCode.EUA;
+        } catch (Throwable t) {
+            logger.error("Unexpected exception while writing {}@{} : ",
+                    new Object[] { lac, t });
+            // some bad request which cause unexpected exception
+            status = StatusCode.EBADREQ;
+        }
+
+        // If everything is okay, we return null so that the calling function
+        // dosn't return a response back to the caller.
+        if (status.equals(StatusCode.EOK)) {
+            requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        } else {
+            requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        }
+        writeLacResponse.setStatus(status);
+        return writeLacResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        WriteLacResponse writeLacResponse = getWriteLacResponse();
+        if (null != writeLacResponse) {
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(writeLacResponse.getStatus())
+                    .setWriteLacResponse(writeLacResponse);
+            Response resp = response.build();
+            sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacStats);
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index f1d0e9f..64e524d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -21,12 +21,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -58,9 +61,8 @@ import org.slf4j.LoggerFactory;
 public class OrderedSafeExecutor {
     final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
     final String name;
-    final ThreadPoolExecutor threads[];
+    final ScheduledThreadPoolExecutor threads[];
     final long threadIds[];
-    final BlockingQueue<Runnable> queues[];
     final Random rand = new Random();
     final OpStatsLogger taskExecutionStats;
     final OpStatsLogger taskPendingStats;
@@ -173,17 +175,15 @@ public class OrderedSafeExecutor {
 
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
-        threads = new ThreadPoolExecutor[numThreads];
+        threads = new ScheduledThreadPoolExecutor[numThreads];
         threadIds = new long[numThreads];
-        queues = new BlockingQueue[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            queues[i] = new LinkedBlockingQueue<Runnable>();
-            threads[i] =  new ThreadPoolExecutor(1, 1,
-                    0L, TimeUnit.MILLISECONDS, queues[i],
+            threads[i] =  new ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
                         .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
+            threads[i].setMaximumPoolSize(1);
 
             // Save thread ids
             final int idx = i;
@@ -209,7 +209,7 @@ public class OrderedSafeExecutor {
 
                 @Override
                 public Number getSample() {
-                    return queues[idx].size();
+                    return threads[idx].getQueue().size();
                 }
             });
             statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() {
@@ -242,7 +242,7 @@ public class OrderedSafeExecutor {
         this.traceTaskExecution = traceTaskExecution;
     }
 
-    ExecutorService chooseThread() {
+    ScheduledExecutorService chooseThread() {
         // skip random # generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -252,7 +252,7 @@ public class OrderedSafeExecutor {
 
     }
 
-    ExecutorService chooseThread(Object orderingKey) {
+    ScheduledExecutorService chooseThread(Object orderingKey) {
         // skip hashcode generation in this special case
         if (threads.length == 1) {
             return threads[0];
@@ -286,6 +286,104 @@ public class OrderedSafeExecutor {
         chooseThread(orderingKey).submit(timedRunnable(r));
     }
 
+    /**
+     * Creates and executes a one-shot action that becomes enabled after the given delay.
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param delay - the time from now to delay execution
+     * @param unit - the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     */
+    public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
+        return chooseThread().schedule(command, delay, unit);
+    }
+
+    /**
+     * Creates and executes a one-shot action that becomes enabled after the given delay.
+     * 
+     * @param orderingKey - the key used for ordering 
+     * @param command - the SafeRunnable to execute
+     * @param delay - the time from now to delay execution
+     * @param unit - the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion
+     */
+    public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
+        return chooseThread(orderingKey).schedule(command, delay, unit);
+    }
+
+    /** 
+     * Creates and executes a periodic action that becomes enabled first after
+     * the given initial delay, and subsequently with the given period; 
+     * 
+     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param period - the period between successive executions
+     * @param unit - the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() 
+     * method will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
+        return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /** 
+     * Creates and executes a periodic action that becomes enabled first after
+     * the given initial delay, and subsequently with the given period; 
+     * 
+     * For more details check scheduleAtFixedRate in interface ScheduledExecutorService
+     * 
+     * @param orderingKey - the key used for ordering
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param period - the period between successive executions
+     * @param unit - the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
+            long period, TimeUnit unit) {
+        return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently 
+     * with the given delay between the termination of one execution and the commencement of the next.
+     * 
+     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * 
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param delay - the delay between the termination of one execution and the commencement of the next
+     * @param unit - the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
+            TimeUnit unit) {
+        return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently 
+     * with the given delay between the termination of one execution and the commencement of the next.
+     * 
+     * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService
+     * 
+     * @param orderingKey - the key used for ordering
+     * @param command - the SafeRunnable to execute
+     * @param initialDelay - the time to delay first execution
+     * @param delay - the delay between the termination of one execution and the commencement of the next
+     * @param unit - the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of the task, and whose get() method 
+     * will throw an exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
+            long delay, TimeUnit unit) {
+        return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
     private long getThreadID(Object orderingKey) {
         // skip hashcode generation in this special case
         if (threadIds.length == 1) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index aabf80b..9ce9baf 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -57,6 +57,8 @@ enum OperationType {
     RANGE_ADD_ENTRY = 4;
 
     AUTH = 5;
+    WRITE_LAC = 6;
+    READ_LAC = 7;
 }
 
 /**
@@ -74,6 +76,8 @@ message Request {
     optional ReadRequest readRequest = 100;
     optional AddRequest addRequest = 101;
     optional AuthMessage authRequest = 102;
+    optional WriteLacRequest writeLacRequest = 103;
+    optional ReadLacRequest readLacRequest = 104;
 }
 
 message ReadRequest {
@@ -99,6 +103,17 @@ message AddRequest {
     required bytes body = 4;
 }
 
+message WriteLacRequest {
+    required int64 ledgerId = 1;
+    required int64 lac = 2;
+    required bytes masterKey = 3;
+    required bytes body = 4;
+}
+
+message ReadLacRequest {
+    required int64 ledgerId = 1;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -109,6 +124,8 @@ message Response {
     optional ReadResponse readResponse = 100;
     optional AddResponse addResponse = 101;
     optional AuthMessage authResponse = 102;
+    optional WriteLacResponse writeLacResponse = 103;
+    optional ReadLacResponse readLacResponse = 104;
 }
 
 message ReadResponse {
@@ -127,4 +144,16 @@ message AddResponse {
 message AuthMessage {
     required string authPluginName = 1;
     required bytes payload = 2;
-}
\ No newline at end of file
+}
+
+message WriteLacResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+}
+
+message ReadLacResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    optional bytes lacBody = 3; // lac sent by PutLacRequest
+    optional bytes lastEntryBody = 4; // Actual last entry on the disk
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 988c2a2..1ce30e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -334,6 +334,15 @@ public class TestSyncThread {
         }
 
         @Override
+        public void setExplicitlac(long ledgerId, ByteBuffer lac) {
+        }
+
+        @Override
+        public ByteBuffer getExplicitLac(long ledgerId) {
+            return null;
+        }
+
+        @Override
         public Checkpoint checkpoint(Checkpoint checkpoint)
                 throws IOException {
             return checkpoint;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index e87fdc0..a2532c9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -309,4 +309,128 @@ public class BookKeeperTest extends BaseTestCase {
         }
         Assert.assertTrue("BookKeeper should be closed!", _bkc.closed);
     }
+
+    @Test(timeout = 60000)
+    public void testReadHandleWithNoExplicitLAC() throws Exception {
+        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString());
+        confWithNoExplicitLAC.setExplictLacInterval(0);
+
+        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
+
+        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
+        int entryId = 0;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        Thread.sleep(3000);
+        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
+        // to read explicitlac for rlh, it will be LedgerHandle.INVALID_ENTRY_ID. But it
+        // wont throw some exception.
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        Assert.assertTrue(
+                "Expected Explicit LAC of rlh: " + LedgerHandle.INVALID_ENTRY_ID + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == LedgerHandle.INVALID_ENTRY_ID));
+        Assert.assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        try {
+            rlh.readEntries(numOfEntries - 1, numOfEntries - 1);
+            fail("rlh readEntries beyond " + (numOfEntries - 2) + " should fail with ReadException");
+        } catch (BKException.BKReadException readException) {
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithNoExplicitLAC.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadHandleWithExplicitLAC() throws Exception {
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString());
+        int explictLacInterval = 1;
+        confWithExplicitLAC.setExplictLacInterval(explictLacInterval);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        Thread.sleep((2 * explictLacInterval + 1) * 1000);
+        Assert.assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made   
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+        
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        Assert.assertTrue(
+                "Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 1)));
+        // readExplicitLastConfirmed updates the lac of rlh.
+        Assert.assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        
+        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+        int entryId = numOfEntries;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithExplicitLAC.close();
+    }	
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 5387424..4c2ddaa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -206,5 +206,17 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         @Override
         public void flushEntriesLocationsIndex() throws IOException {
         }
+
+        @Override
+        public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public ByteBuffer getExplicitLac(long ledgerId) {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 }


Mime
View raw message