distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [03/20] incubator-distributedlog git commit: DL-101: Improve session expire handling on fetching log segments for readers
Date Wed, 28 Dec 2016 01:05:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 6a8f90e..a1e29a2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -18,16 +18,25 @@
 package com.twitter.distributedlog;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
+import com.twitter.distributedlog.callback.LogSegmentListener;
+import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LockCancelledException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.lock.DistributedLock;
@@ -35,6 +44,7 @@ import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.readahead.ReadAheadWorker;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
@@ -55,6 +65,8 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -102,11 +114,9 @@ import scala.runtime.BoxedUnit;
  * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
  * for detail stats.
  */
-class BKLogReadHandler extends BKLogHandler {
+class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
-    private static final int LAYOUT_VERSION = -1;
-
     protected final ZKLogMetadataForReader logMetadataForReader;
     protected final ReadAheadCache readAheadCache;
     protected final LedgerHandleCache handleCache;
@@ -123,6 +133,16 @@ class BKLogReadHandler extends BKLogHandler {
     private DistributedLock readLock;
     private Future<Void> lockAcquireFuture;
 
+    // notify the state change about the read handler
+    protected final AsyncNotification readerStateNotification;
+
+    // log segments listener
+    protected boolean logSegmentsNotificationDisabled = false;
+    protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
+            new CopyOnWriteArraySet<LogSegmentListener>();
+    protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments =
+            new Versioned<List<LogSegmentMetadata>>(null, Version.NEW);
+
     // stats
     private final AlertStatsLogger alertStatsLogger;
     private final StatsLogger handlerStatsLogger;
@@ -132,26 +152,35 @@ class BKLogReadHandler extends BKLogHandler {
     /**
      * Construct a Bookkeeper journal manager.
      */
-    public BKLogReadHandler(ZKLogMetadataForReader logMetadata,
-                            Optional<String> subscriberId,
-                            DistributedLogConfiguration conf,
-                            DynamicDistributedLogConfiguration dynConf,
-                            ZooKeeperClientBuilder zkcBuilder,
-                            BookKeeperClientBuilder bkcBuilder,
-                            LogSegmentMetadataStore metadataStore,
-                            OrderedScheduler scheduler,
-                            OrderedScheduler lockStateExecutor,
-                            OrderedScheduler readAheadExecutor,
-                            AlertStatsLogger alertStatsLogger,
-                            ReadAheadExceptionsLogger readAheadExceptionsLogger,
-                            StatsLogger statsLogger,
-                            StatsLogger perLogStatsLogger,
-                            String clientId,
-                            AsyncNotification notification,
-                            boolean isHandleForReading,
-                            boolean deserializeRecordSet) {
-        super(logMetadata, conf, zkcBuilder, bkcBuilder, metadataStore, scheduler,
-              statsLogger, alertStatsLogger, notification, LogSegmentFilter.DEFAULT_FILTER, clientId);
+    BKLogReadHandler(ZKLogMetadataForReader logMetadata,
+                     Optional<String> subscriberId,
+                     DistributedLogConfiguration conf,
+                     DynamicDistributedLogConfiguration dynConf,
+                     ZooKeeperClientBuilder zkcBuilder,
+                     BookKeeperClientBuilder bkcBuilder,
+                     LogSegmentMetadataStore metadataStore,
+                     LogSegmentMetadataCache metadataCache,
+                     OrderedScheduler scheduler,
+                     OrderedScheduler lockStateExecutor,
+                     OrderedScheduler readAheadExecutor,
+                     AlertStatsLogger alertStatsLogger,
+                     ReadAheadExceptionsLogger readAheadExceptionsLogger,
+                     StatsLogger statsLogger,
+                     StatsLogger perLogStatsLogger,
+                     String clientId,
+                     AsyncNotification readerStateNotification,
+                     boolean isHandleForReading,
+                     boolean deserializeRecordSet) {
+        super(logMetadata,
+                conf,
+                zkcBuilder,
+                bkcBuilder,
+                metadataStore,
+                metadataCache,
+                scheduler,
+                statsLogger,
+                alertStatsLogger,
+                clientId);
         this.logMetadataForReader = logMetadata;
         this.dynConf = dynConf;
         this.readAheadExecutor = readAheadExecutor;
@@ -161,6 +190,7 @@ class BKLogReadHandler extends BKLogHandler {
         this.handlerStatsLogger =
                 BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger);
         this.readAheadExceptionsLogger = readAheadExceptionsLogger;
+        this.readerStateNotification = readerStateNotification;
 
         handleCache = LedgerHandleCache.newBuilder()
                 .bkc(this.bookKeeperClient)
@@ -171,7 +201,7 @@ class BKLogReadHandler extends BKLogHandler {
                 getFullyQualifiedName(),
                 handlerStatsLogger,
                 alertStatsLogger,
-                notification,
+                readerStateNotification,
                 dynConf.getReadAheadMaxRecords(),
                 deserializeRecordSet,
                 conf.getTraceReadAheadDeliveryLatency(),
@@ -308,14 +338,14 @@ class BKLogReadHandler extends BKLogHandler {
                 if (null != readAheadCache) {
                     readAheadCache.clear();
                 }
+                if (null != readAheadWorker) {
+                    unregisterListener(readAheadWorker);
+                }
                 if (null != handleCache) {
                     handleCache.clear();
                 }
-                // No-op
-                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
-                        logMetadata.getLogSegmentsPath(),
-                        BKLogReadHandler.this,
-                        true);
+                // unregister the log segment listener
+                metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this);
                 return Future.Void();
             }
         });
@@ -326,6 +356,52 @@ class BKLogReadHandler extends BKLogHandler {
         return asyncClose();
     }
 
+    /**
+     * Start fetch the log segments and register the {@link LogSegmentNamesListener}.
+     * The future is satisfied only on a successful fetch or encountered a fatal failure.
+     *
+     * @return future represents the fetch result
+     */
+    Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() {
+        Promise<Versioned<List<LogSegmentMetadata>>> promise =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        asyncStartFetchLogSegments(promise);
+        return promise;
+    }
+
+    void asyncStartFetchLogSegments(final Promise<Versioned<List<LogSegmentMetadata>>> promise) {
+        readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                this).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof LogNotFoundException ||
+                        cause instanceof LogSegmentNotFoundException ||
+                        cause instanceof UnexpectedException) {
+                    // indicate some inconsistent behavior, abort
+                    metadataException.compareAndSet(null, (IOException) cause);
+                    // notify the reader that read handler is in error state
+                    notifyReaderOnError(cause);
+                    FutureUtils.setException(promise, cause);
+                    return;
+                }
+                scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        asyncStartFetchLogSegments(promise);
+                    }
+                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+                // no-op
+                FutureUtils.setValue(promise, segments);
+            }
+        });
+    }
+
     public void startReadAhead(LedgerReadPosition startPosition,
                                AsyncFailureInjector failureInjector) {
         if (null == readAheadWorker) {
@@ -334,7 +410,6 @@ class BKLogReadHandler extends BKLogHandler {
                     dynConf,
                     logMetadataForReader,
                     this,
-                    zooKeeperClient,
                     readAheadExecutor,
                     handleCache,
                     startPosition,
@@ -345,8 +420,16 @@ class BKLogReadHandler extends BKLogHandler {
                     perLogStatsLogger,
                     alertStatsLogger,
                     failureInjector,
-                    notification);
-            readAheadWorker.start();
+                    readerStateNotification);
+            registerListener(readAheadWorker);
+            // start the readahead worker after the log segments are fetched
+            asyncStartFetchLogSegments().map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                @Override
+                public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                    readAheadWorker.start(logSegments.getValue());
+                    return BoxedUnit.UNIT;
+                }
+            });
         }
     }
 
@@ -407,10 +490,110 @@ class BKLogReadHandler extends BKLogHandler {
     }
 
     @VisibleForTesting
-    void disableReadAheadZKNotification() {
-        if (null != readAheadWorker) {
-            readAheadWorker.disableZKNotification();
+    void disableReadAheadLogSegmentsNotification() {
+        logSegmentsNotificationDisabled = true;
+    }
+
+    @Override
+    public void onSegmentsUpdated(final Versioned<List<String>> segments) {
+        synchronized (this) {
+            if (lastNotifiedLogSegments.getVersion() != Version.NEW &&
+                    lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != Version.Occurred.BEFORE) {
+                // the log segments has been read, and it is possibly a retry from last segments update
+                return;
+            }
+        }
+
+        Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        readLogSegmentsPromise.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof LogNotFoundException ||
+                        cause instanceof LogSegmentNotFoundException ||
+                        cause instanceof UnexpectedException) {
+                    // indicate some inconsistent behavior, abort
+                    metadataException.compareAndSet(null, (IOException) cause);
+                    // notify the reader that read handler is in error state
+                    notifyReaderOnError(cause);
+                    return;
+                }
+                scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        onSegmentsUpdated(segments);
+                    }
+                }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> logSegments) {
+                List<LogSegmentMetadata> segmentsToNotify = null;
+                synchronized (BKLogReadHandler.this) {
+                    Versioned<List<LogSegmentMetadata>> lastLogSegments = lastNotifiedLogSegments;
+                    if (lastLogSegments.getVersion() == Version.NEW ||
+                            lastLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
+                        lastNotifiedLogSegments = logSegments;
+                        segmentsToNotify = logSegments.getValue();
+                    }
+                }
+                if (null != segmentsToNotify) {
+                    notifyUpdatedLogSegments(segmentsToNotify);
+                }
+            }
+        });
+        // log segments list is updated, read their metadata
+        readLogSegmentsFromStore(
+                segments,
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                readLogSegmentsPromise);
+    }
+
+    @Override
+    public void onLogStreamDeleted() {
+        notifyLogStreamDeleted();
+    }
+
+    //
+    // Listener for log segments
+    //
+
+    protected void registerListener(LogSegmentListener listener) {
+        listeners.add(listener);
+    }
+
+    protected void unregisterListener(LogSegmentListener listener) {
+        listeners.remove(listener);
+    }
+
+    protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {
+        if (logSegmentsNotificationDisabled) {
+            return;
+        }
+
+        for (LogSegmentListener listener : listeners) {
+            List<LogSegmentMetadata> listToReturn =
+                    new ArrayList<LogSegmentMetadata>(segments);
+            Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR);
+            listener.onSegmentsUpdated(listToReturn);
+        }
+    }
+
+    protected void notifyLogStreamDeleted() {
+        if (logSegmentsNotificationDisabled) {
+            return;
+        }
+
+        for (LogSegmentListener listener : listeners) {
+            listener.onLogStreamDeleted();
         }
     }
 
+    // notify the errors
+    protected void notifyReaderOnError(Throwable cause) {
+        if (null != readerStateNotification) {
+            readerStateNotification.notifyOnError(cause);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 4665ed5..5d3be7d 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -33,6 +33,8 @@ import com.twitter.distributedlog.function.GetLastTxIdFunction;
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.logsegment.RollingPolicy;
 import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
@@ -63,6 +65,7 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -76,6 +79,7 @@ import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -99,9 +103,6 @@ class BKLogWriteHandler extends BKLogHandler {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
     protected final DistributedLock lock;
-    protected final int ensembleSize;
-    protected final int writeQuorumSize;
-    protected final int ackQuorumSize;
     protected final LedgerAllocator ledgerAllocator;
     protected final MaxTxId maxTxId;
     protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
@@ -117,6 +118,10 @@ class BKLogWriteHandler extends BKLogHandler {
     // tracking the inprogress log segments
     protected final LinkedList<Long> inprogressLSSNs;
 
+    // Fetch LogSegments State: write can continue without full list of log segments while truncation needs
+    private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
+    private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
+
     // Recover Functions
     private final RecoverLogSegmentFunction recoverLogSegmentFunction =
             new RecoverLogSegmentFunction();
@@ -162,6 +167,7 @@ class BKLogWriteHandler extends BKLogHandler {
                       ZooKeeperClientBuilder zkcBuilder,
                       BookKeeperClientBuilder bkcBuilder,
                       LogSegmentMetadataStore metadataStore,
+                      LogSegmentMetadataCache metadataCache,
                       OrderedScheduler scheduler,
                       LedgerAllocator allocator,
                       StatsLogger statsLogger,
@@ -173,8 +179,16 @@ class BKLogWriteHandler extends BKLogHandler {
                       FeatureProvider featureProvider,
                       DynamicDistributedLogConfiguration dynConf,
                       DistributedLock lock /** owned by handler **/) {
-        super(logMetadata, conf, zkcBuilder, bkcBuilder, metadataStore,
-              scheduler, statsLogger, alertStatsLogger, null, WRITE_HANDLE_FILTER, clientId);
+        super(logMetadata,
+                conf,
+                zkcBuilder,
+                bkcBuilder,
+                metadataStore,
+                metadataCache,
+                scheduler,
+                statsLogger,
+                alertStatsLogger,
+                clientId);
         this.perLogStatsLogger = perLogStatsLogger;
         this.writeLimiter = writeLimiter;
         this.featureProvider = featureProvider;
@@ -183,23 +197,6 @@ class BKLogWriteHandler extends BKLogHandler {
         this.lock = lock;
         this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
 
-        ensembleSize = conf.getEnsembleSize();
-
-        if (ensembleSize < conf.getWriteQuorumSize()) {
-            writeQuorumSize = ensembleSize;
-            LOG.warn("Setting write quorum size {} greater than ensemble size {}",
-                conf.getWriteQuorumSize(), ensembleSize);
-        } else {
-            writeQuorumSize = conf.getWriteQuorumSize();
-        }
-        if (writeQuorumSize < conf.getAckQuorumSize()) {
-            ackQuorumSize = writeQuorumSize;
-            LOG.warn("Setting write ack quorum size {} greater than write quorum size {}",
-                conf.getAckQuorumSize(), writeQuorumSize);
-        } else {
-            ackQuorumSize = conf.getAckQuorumSize();
-        }
-
         if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
             this.regionId = regionId;
         } else {
@@ -215,9 +212,12 @@ class BKLogWriteHandler extends BKLogHandler {
         maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(),
                 conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData());
 
-        // Schedule fetching ledgers list in background before we access it.
-        // We don't need to watch the ledgers list changes for writer, as it manages ledgers list.
-        scheduleGetLedgersTask(false, true);
+        // Schedule fetching log segment list in background before we access it.
+        // We don't need to watch the log segment list changes for writer, as it manages log segment list.
+        fetchForWrite = readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                WRITE_HANDLE_FILTER,
+                null);
 
         // Initialize other parameters.
         setLastLedgerRollingTimeMillis(Utils.nowInMillis());
@@ -237,6 +237,59 @@ class BKLogWriteHandler extends BKLogHandler {
         deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
     }
 
+    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
+            final Comparator<LogSegmentMetadata> comparator) {
+        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
+                try {
+                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                } catch (UnexpectedException e) {
+                    FutureUtils.setException(promise, e);
+                }
+            }
+        });
+        return promise;
+    }
+
+    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
+            final Comparator<LogSegmentMetadata> comparator) {
+        Future<Versioned<List<LogSegmentMetadata>>> result;
+        synchronized (this) {
+            if (null == fetchForTruncation) {
+                fetchForTruncation = readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null);
+            }
+            result = fetchForTruncation;
+        }
+
+        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
+        result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
+                try {
+                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                } catch (UnexpectedException e) {
+                    FutureUtils.setException(promise, e);
+                }
+            }
+        });
+        return promise;
+    }
+
     // Transactional operations for MaxLogSegmentSequenceNo
     void storeMaxSequenceNumber(final Transaction txn,
                                 final MaxLogSegmentSequenceNo maxSeqNo,
@@ -413,7 +466,7 @@ class BKLogWriteHandler extends BKLogHandler {
         boolean logSegmentsFound = false;
 
         if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) {
-            List<LogSegmentMetadata> ledgerListDesc = getFilteredLedgerListDesc(false, false);
+            List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
             Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc);
 
             if (null == nextLogSegmentSeqNo) {
@@ -452,17 +505,27 @@ class BKLogWriteHandler extends BKLogHandler {
         return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
     }
 
-    protected Future<BKLogSegmentWriter> asyncStartLogSegment(long txId,
-                                                              boolean bestEffort,
-                                                              boolean allowMaxTxID) {
-        Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
+    protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
+                                                              final boolean bestEffort,
+                                                              final boolean allowMaxTxID) {
+        final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
         try {
             lock.checkOwnershipAndReacquire();
         } catch (LockingException e) {
             FutureUtils.setException(promise, e);
             return promise;
         }
-        doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> list) {
+                doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
+            }
+        });
         return promise;
     }
 
@@ -732,7 +795,8 @@ class BKLogWriteHandler extends BKLogHandler {
         // we only record sequence id when both write version and logsegment's version support sequence id
         if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion())
                 && segment.supportsSequenceId()) {
-            List<LogSegmentMetadata> logSegmentDescList = getFilteredLedgerListDesc(false, false);
+            List<LogSegmentMetadata> logSegmentDescList =
+                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
             startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment);
         }
 
@@ -776,14 +840,46 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
-                                                long logSegmentSeqNo,
-                                                long ledgerId,
-                                                long firstTxId,
-                                                long lastTxId,
-                                                int recordCount,
-                                                long lastEntryId,
-                                                long lastSlotId,
+                                                final long logSegmentSeqNo,
+                                                final long ledgerId,
+                                                final long firstTxId,
+                                                final long lastTxId,
+                                                final int recordCount,
+                                                final long lastEntryId,
+                                                final long lastSlotId,
                                                 final Promise<LogSegmentMetadata> promise) {
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+                doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
+                        inprogressZnodeName,
+                        logSegmentSeqNo,
+                        ledgerId,
+                        firstTxId,
+                        lastTxId,
+                        recordCount,
+                        lastEntryId,
+                        lastSlotId,
+                        promise);
+            }
+        });
+    }
+
+    private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
+            final String inprogressZnodeName,
+            long logSegmentSeqNo,
+            long ledgerId,
+            long firstTxId,
+            long lastTxId,
+            int recordCount,
+            long lastEntryId,
+            long lastSlotId,
+            final Promise<LogSegmentMetadata> promise) {
         try {
             lock.checkOwnershipAndReacquire();
         } catch (IOException ioe) {
@@ -912,7 +1008,7 @@ class BKLogWriteHandler extends BKLogHandler {
         } catch (IOException ioe) {
             return Future.exception(ioe);
         }
-        return asyncGetFilteredLedgerList(false, false).flatMap(recoverLogSegmentsFunction);
+        return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction);
     }
 
     class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
@@ -1002,8 +1098,7 @@ class BKLogWriteHandler extends BKLogHandler {
             List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
             return Future.value(emptyList);
         }
-        scheduleGetAllLedgersTaskIfNeeded();
-        return asyncGetFullLedgerList(false, false).flatMap(
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
                 new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
                     @Override
                     public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
@@ -1068,7 +1163,7 @@ class BKLogWriteHandler extends BKLogHandler {
             return Future.exception(new IllegalArgumentException(
                     "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
         }
-        return asyncGetFullLedgerList(false, false).flatMap(
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
                 new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
             @Override
             public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
@@ -1097,7 +1192,7 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
-        return asyncGetFullLedgerList(true, false).flatMap(
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
             new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
                 @Override
                 public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
@@ -1260,16 +1355,7 @@ class BKLogWriteHandler extends BKLogHandler {
         return Utils.closeSequence(scheduler,
                 lock,
                 ledgerAllocator
-        ).flatMap(new AbstractFunction1<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void result) {
-                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
-                        logMetadata.getLogSegmentsPath(),
-                        BKLogWriteHandler.this,
-                        false);
-                return Future.Void();
-            }
-        });
+        );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index ac670c2..28e69b2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -238,8 +238,8 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
     // Test Methods
     //
     @VisibleForTesting
-    void disableReadAheadZKNotification() {
-        reader.bkLedgerManager.disableReadAheadZKNotification();
+    void disableReadAheadLogSegmentsNotification() {
+        reader.bkLedgerManager.disableReadAheadLogSegmentsNotification();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 5d0e59a..1f5427c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -212,6 +212,14 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     public static final String BKDL_UNPARTITIONED_STREAM_NAME = "unpartitionedStreamName";
     public static final String BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT = "<default>";
 
+    // Log Segment Cache Parameters
+    public static final String BKDL_LOGSEGMENT_CACHE_TTL_MS = "logSegmentCacheTTLMs";
+    public static final long BKDL_LOGSEGMENT_CACHE_TTL_MS_DEFAULT = 600000; // 10 mins
+    public static final String BKDL_LOGSEGMENT_CACHE_MAX_SIZE = "logSegmentCacheMaxSize";
+    public static final long BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT = 10000;
+    public static final String BKDL_LOGSEGMENT_CACHE_ENABLED = "logSegmentCacheEnabled";
+    public static final boolean BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT = true;
+
     //
     // DL Writer Settings
     //
@@ -1644,6 +1652,69 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     }
 
     //
+    // LogSegment Cache Settings
+    //
+
+    /**
+     * Get the log segment cache entry TTL in milliseconds.
+     *
+     * @return log segment cache ttl in milliseconds.
+     */
+    public long getLogSegmentCacheTTLMs() {
+        return getLong(BKDL_LOGSEGMENT_CACHE_TTL_MS, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
+    }
+
+    /**
+     * Set the log segment cache entry TTL in milliseconds.
+     *
+     * @param ttlMs TTL in milliseconds
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheTTLMs(long ttlMs) {
+        setProperty(BKDL_LOGSEGMENT_CACHE_TTL_MS, ttlMs);
+        return this;
+    }
+
+    /**
+     * Get the maximum size of the log segment cache.
+     *
+     * @return maximum size of the log segment cache.
+     */
+    public long getLogSegmentCacheMaxSize() {
+        return getLong(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT);
+    }
+
+    /**
+     * Set the maximum size of the log segment cache.
+     *
+     * @param maxSize maximum size of the log segment cache.
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheMaxSize(long maxSize) {
+        setProperty(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, maxSize);
+        return this;
+    }
+
+    /**
+     * Is log segment cache enabled?
+     *
+     * @return true if log segment cache is enabled; otherwise false
+     */
+    public boolean isLogSegmentCacheEnabled() {
+        return getBoolean(BKDL_LOGSEGMENT_CACHE_ENABLED, BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT);
+    }
+
+    /**
+     * Enable/disable log segment cache.
+     *
+     * @return distributedlog configuration
+     */
+    public DistributedLogConfiguration setLogSegmentCacheEnabled(boolean enabled) {
+        setProperty(BKDL_LOGSEGMENT_CACHE_ENABLED, enabled);
+        return this;
+    }
+
+    //
     // DL Writer General Settings
     //
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
index 994b141..2297579 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
@@ -23,6 +23,8 @@ import java.util.Comparator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
+import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
@@ -600,12 +602,18 @@ public class LogSegmentMetadata {
                 @Override
                 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                     if (KeeperException.Code.OK.intValue() != rc) {
-                        result.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        if (KeeperException.Code.NONODE.intValue() == rc) {
+                            FutureUtils.setException(result, new LogSegmentNotFoundException(path));
+                        } else {
+                            FutureUtils.setException(result,
+                                    new ZKException("Failed to read log segment metadata from " + path,
+                                            KeeperException.Code.get(rc)));
+                        }
                         return;
                     }
                     try {
                         LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck);
-                        result.setValue(metadata);
+                        FutureUtils.setValue(result, metadata);
                     } catch (IOException ie) {
                         LOG.error("Error on parsing log segment metadata from {} : ", path, ie);
                         result.setException(ie);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java
index bedd4dc..2196245 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java
@@ -22,7 +22,7 @@ import com.twitter.distributedlog.LogSegmentMetadata;
 import java.util.List;
 
 /**
- * Listener on log segments changes for a given stream.
+ * Listener on log segments changes for a given stream used by {@link com.twitter.distributedlog.BKLogReadHandler}
  */
 public interface LogSegmentListener {
 
@@ -34,4 +34,9 @@ public interface LogSegmentListener {
      *          updated list of segments.
      */
     void onSegmentsUpdated(List<LogSegmentMetadata> segments);
+
+    /**
+     * Notified when the log stream is deleted.
+     */
+    void onLogStreamDeleted();
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java
index 3e89431..e38f305 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java
@@ -17,10 +17,13 @@
  */
 package com.twitter.distributedlog.callback;
 
+import org.apache.bookkeeper.versioning.Versioned;
+
 import java.util.List;
 
 /**
- * Listener on list of log segments changes for a given stream.
+ * Listener on list of log segments changes for a given stream used by
+ * {@link com.twitter.distributedlog.logsegment.LogSegmentMetadataStore}.
  */
 public interface LogSegmentNamesListener {
     /**
@@ -30,5 +33,10 @@ public interface LogSegmentNamesListener {
      * @param segments
      *          updated list of segments.
      */
-    void onSegmentsUpdated(List<String> segments);
+    void onSegmentsUpdated(Versioned<List<String>> segments);
+
+    /**
+     * Notified when the log stream is deleted.
+     */
+    void onLogStreamDeleted();
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java
new file mode 100644
index 0000000..698a088
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.function;
+
+import com.twitter.distributedlog.io.AsyncCloseable;
+import scala.Function0;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Function to close {@link com.twitter.distributedlog.io.AsyncCloseable}
+ */
+public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> {
+
+    /**
+     * Return a function to close an {@link AsyncCloseable}.
+     *
+     * @param closeable closeable to close
+     * @return function to close an {@link AsyncCloseable}
+     */
+    public static Function0<BoxedUnit> of(AsyncCloseable closeable) {
+        return new CloseAsyncCloseableFunction(closeable);
+    }
+
+    private final AsyncCloseable closeable;
+
+    private CloseAsyncCloseableFunction(AsyncCloseable closeable) {
+        this.closeable = closeable;
+    }
+
+    @Override
+    public BoxedUnit apply() {
+        closeable.asyncClose();
+        return BoxedUnit.UNIT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java
new file mode 100644
index 0000000..4e7844c
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.function;
+
+import com.twitter.distributedlog.LogSegmentMetadata;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+import java.util.List;
+
+/**
+ * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned}
+ */
+public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> {
+
+    public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>>
+            GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>();
+
+    @Override
+    public T apply(Versioned<T> versionedValue) {
+        return versionedValue.getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index cb53b23..f0d2797 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -22,6 +22,8 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -88,30 +90,21 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         public void onSuccess(final Versioned<List<String>> segments) {
             // reset the back off after a successful operation
             currentZKBackOffMs = store.minZKBackoffMs;
-            final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
-                    store.listeners.get(logSegmentsPath);
-            if (null != listenerSet) {
-                store.submitTask(logSegmentsPath, new Runnable() {
-                    @Override
-                    public void run() {
-                        for (VersionedLogSegmentNamesListener listener : listenerSet.values()) {
-                            listener.onSegmentsUpdated(segments);
-                        }
-                    }
-                });
-            }
+            store.notifyLogSegmentsUpdated(
+                    logSegmentsPath,
+                    store.listeners.get(logSegmentsPath),
+                    segments);
         }
 
         @Override
         public void onFailure(Throwable cause) {
-            int backoffMs = store.minZKBackoffMs;
-            if ((cause instanceof KeeperException)) {
-                KeeperException ke = (KeeperException) cause;
-                if (KeeperException.Code.NONODE == ke.code()) {
-                    // the log segment has been deleted, remove all the registered listeners
-                    store.listeners.remove(logSegmentsPath);
-                    return;
-                }
+            int backoffMs;
+            if (cause instanceof LogNotFoundException) {
+                // the log segment has been deleted, remove all the registered listeners
+                store.notifyLogStreamDeleted(logSegmentsPath,
+                        store.listeners.remove(logSegmentsPath));
+                return;
+            } else {
                 backoffMs = currentZKBackOffMs;
                 currentZKBackOffMs = Math.min(2 * currentZKBackOffMs, store.maxZKBackoffMs);
             }
@@ -121,7 +114,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         @Override
         public void run() {
             if (null != store.listeners.get(logSegmentsPath)) {
-                store.getLogSegmentNames(logSegmentsPath, store).addEventListener(this);
+                store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this);
             } else {
                 logger.debug("Log segments listener for {} has been removed.", logSegmentsPath);
             }
@@ -146,7 +139,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
                     lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
                 lastNotifiedLogSegments = logSegments;
-                listener.onSegmentsUpdated(logSegments.getValue());
+                listener.onSegmentsUpdated(logSegments);
             }
         }
 
@@ -309,7 +302,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
         switch (event.getType()) {
             case NodeDeleted:
-                listeners.remove(path);
+                notifyLogStreamDeleted(path, listeners.remove(path));
                 break;
             case NodeChildrenChanged:
                 new ReadLogSegmentsTask(path, this).run();
@@ -324,17 +317,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck);
     }
 
-    @Override
-    public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
-        return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() {
-            @Override
-            public List<String> apply(Versioned<List<String>> list) {
-                return list.getValue();
-            }
-        });
-    }
-
-    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+    Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
         Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
@@ -354,46 +337,59 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             /** cversion: the number of changes to the children of this znode **/
             ZkVersion zkVersion = new ZkVersion(stat.getCversion());
             result.setValue(new Versioned(children, zkVersion));
+        } else if (KeeperException.Code.NONODE.intValue() == rc) {
+            result.setException(new LogNotFoundException("Log " + path + " not found"));
         } else {
-            result.setException(KeeperException.create(KeeperException.Code.get(rc)));
+            result.setException(new ZKException("Failed to get log segments from " + path,
+                    KeeperException.Code.get(rc)));
         }
     }
 
     @Override
-    public void registerLogSegmentListener(String logSegmentsPath,
-                                           LogSegmentNamesListener listener) {
+    public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
+                                                              LogSegmentNamesListener listener) {
+        Watcher zkWatcher;
         if (null == listener) {
-            return;
-        }
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
-                    listeners.get(logSegmentsPath);
-            if (null == listenerSet) {
-                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
-                        new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
-                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
-                        listeners.putIfAbsent(logSegmentsPath, newListenerSet);
-                if (null != oldListenerSet) {
-                    listenerSet = oldListenerSet;
+            zkWatcher = null;
+        } else {
+            closeLock.readLock().lock();
+            try {
+                if (closed) {
+                    zkWatcher = null;
                 } else {
-                    listenerSet = newListenerSet;
-                }
-            }
-            synchronized (listenerSet) {
-                listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
-                if (!listeners.containsKey(logSegmentsPath)) {
-                    // listener set has been removed, add it back
-                    listeners.put(logSegmentsPath, listenerSet);
+                    Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                            listeners.get(logSegmentsPath);
+                    if (null == listenerSet) {
+                        Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
+                                new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
+                        Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
+                                listeners.putIfAbsent(logSegmentsPath, newListenerSet);
+                        if (null != oldListenerSet) {
+                            listenerSet = oldListenerSet;
+                        } else {
+                            listenerSet = newListenerSet;
+                        }
+                    }
+                    synchronized (listenerSet) {
+                        listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
+                        if (!listeners.containsKey(logSegmentsPath)) {
+                            // listener set has been removed, add it back
+                            if (null != listeners.putIfAbsent(logSegmentsPath, listenerSet)) {
+                                logger.debug("Listener set is already found for log segments path {}", logSegmentsPath);
+                            }
+                        }
+                    }
+                    zkWatcher = ZKLogSegmentMetadataStore.this;
                 }
+            } finally {
+                closeLock.readLock().unlock();
             }
-            new ReadLogSegmentsTask(logSegmentsPath, this).run();
-        } finally {
-            closeLock.readLock().unlock();
         }
+        Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
+        if (null != listener) {
+            getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this));
+        }
+        return zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
     }
 
     @Override
@@ -433,4 +429,38 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
     }
 
+    // Notifications
+
+    void notifyLogStreamDeleted(String logSegmentsPath,
+                                final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners) {
+        if (null == listeners) {
+            return;
+        }
+        this.submitTask(logSegmentsPath, new Runnable() {
+            @Override
+            public void run() {
+                for (LogSegmentNamesListener listener : listeners.keySet()) {
+                    listener.onLogStreamDeleted();
+                }
+            }
+        });
+
+    }
+
+    void notifyLogSegmentsUpdated(String logSegmentsPath,
+                                  final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners,
+                                  final Versioned<List<String>> segments) {
+        if (null == listeners) {
+            return;
+        }
+        this.submitTask(logSegmentsPath, new Runnable() {
+            @Override
+            public void run() {
+                for (VersionedLogSegmentNamesListener listener : listeners.values()) {
+                    listener.onSegmentsUpdated(segments);
+                }
+            }
+        });
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java
deleted file mode 100644
index 9716f95..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Managing log segments in local cache.
- *
- * <p>
- * Caching of log segment metadata assumes that the data contained in the ZNodes for individual
- * log segments is never updated after creation i.e we never call setData. A log segment
- * is finalized by creating a new ZNode and deleting the in progress node. This code will have
- * to change if we change the behavior
- * </p>
- */
-public class LogSegmentCache {
-
-    static final Logger LOG = LoggerFactory.getLogger(LogSegmentCache.class);
-
-    protected final String streamName;
-    protected final Map<String, LogSegmentMetadata> logSegments =
-            new HashMap<String, LogSegmentMetadata>();
-    protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments =
-            new ConcurrentHashMap<Long, LogSegmentMetadata>();
-
-    public LogSegmentCache(String streamName) {
-        this.streamName = streamName;
-    }
-
-    /**
-     * Retrieve log segments from the cache.
-     *
-     * - first sort the log segments in ascending order
-     * - do validation and assign corresponding sequence id
-     * - apply comparator after validation
-     *
-     * @param comparator
-     *          comparator to sort the returned log segments.
-     * @return list of sorted and filtered log segments.
-     * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap)
-     */
-    public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator)
-        throws UnexpectedException {
-        List<LogSegmentMetadata> segmentsToReturn;
-        synchronized (logSegments) {
-            segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size());
-            segmentsToReturn.addAll(logSegments.values());
-        }
-        Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR);
-        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
-        LogSegmentMetadata prevSegment = null;
-        for (int i = 0; i < segmentsToReturn.size(); i++) {
-            LogSegmentMetadata segment = segmentsToReturn.get(i);
-
-            // validation on ledger sequence number
-            // - we are ok that if there are same log segments exist. it is just same log segment in different
-            //   states (inprogress vs completed). it could happen during completing log segment without transaction
-            if (null != prevSegment
-                    && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
-                    && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
-                    && prevSegment.getLogSegmentSequenceNumber() != segment.getLogSegmentSequenceNumber()
-                    && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) {
-                LOG.error("{} found ledger sequence number gap between log segment {} and {}",
-                        new Object[] { streamName, prevSegment, segment });
-                throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment "
-                        + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber());
-            }
-
-            // assign sequence id
-            if (!segment.isInProgress()) {
-                if (segment.supportsSequenceId()) {
-                    startSequenceId = segment.getStartSequenceId() + segment.getRecordCount();
-                    if (null != prevSegment && prevSegment.supportsSequenceId()
-                            && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) {
-                        LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}",
-                                new Object[] { streamName, segment, prevSegment });
-                    }
-                } else {
-                    startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
-                }
-            } else {
-                if (segment.supportsSequenceId()) {
-                    LogSegmentMetadata newSegment = segment.mutator()
-                            .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId)
-                            .build();
-                    segmentsToReturn.set(i, newSegment);
-                }
-                break;
-            }
-            prevSegment = segment;
-        }
-        if (comparator != LogSegmentMetadata.COMPARATOR) {
-            Collections.sort(segmentsToReturn, comparator);
-        }
-        return segmentsToReturn;
-    }
-
-    /**
-     * Add the segment <i>metadata</i> for <i>name</i> in the cache.
-     *
-     * @param name
-     *          segment name.
-     * @param metadata
-     *          segment metadata.
-     */
-    public void add(String name, LogSegmentMetadata metadata) {
-        synchronized (logSegments) {
-            if (!logSegments.containsKey(name)) {
-                logSegments.put(name, metadata);
-                LOG.info("{} added log segment ({} : {}) to cache.",
-                        new Object[]{ streamName, name, metadata });
-            }
-            LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLedgerId());
-            if (null == oldMetadata) {
-                lid2LogSegments.put(metadata.getLedgerId(), metadata);
-            } else {
-                if (oldMetadata.isInProgress() && !metadata.isInProgress()) {
-                    lid2LogSegments.put(metadata.getLedgerId(), metadata);
-                } else {
-                    lid2LogSegments.put(oldMetadata.getLedgerId(), oldMetadata);
-                }
-            }
-        }
-    }
-
-    /**
-     * Retrieve log segment <code>name</code> from the cache.
-     *
-     * @param name
-     *          name of the log segment.
-     * @return log segment metadata
-     */
-    public LogSegmentMetadata get(String name) {
-        synchronized (logSegments) {
-            return logSegments.get(name);
-        }
-    }
-
-    /**
-     * Update the log segment cache with removed/added segments.
-     *
-     * @param segmentsRemoved
-     *          segments that removed
-     * @param segmentsAdded
-     *          segments that added
-     */
-    public void update(Set<String> segmentsRemoved,
-                       Map<String, LogSegmentMetadata> segmentsAdded) {
-        synchronized (logSegments) {
-            for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) {
-                add(entry.getKey(), entry.getValue());
-            }
-            for (String segment : segmentsRemoved) {
-                remove(segment);
-            }
-        }
-    }
-
-    /**
-     * Diff with new received segment list <code>segmentReceived</code>.
-     *
-     * @param segmentsReceived
-     *          new received segment list
-     * @return segments added (left) and removed (right).
-     */
-    public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) {
-        Set<String> segmentsAdded;
-        Set<String> segmentsRemoved;
-        synchronized (logSegments) {
-            Set<String> segmentsCached = logSegments.keySet();
-            segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy();
-            segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy();
-        }
-        return Pair.of(segmentsAdded, segmentsRemoved);
-    }
-
-    /**
-     * Remove log segment <code>name</code> from the cache.
-     *
-     * @param name
-     *          name of the log segment.
-     * @return log segment metadata.
-     */
-    public LogSegmentMetadata remove(String name) {
-        synchronized (logSegments) {
-            LogSegmentMetadata metadata = logSegments.remove(name);
-            if (null != metadata) {
-                lid2LogSegments.remove(metadata.getLedgerId(), metadata);
-                LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata);
-            }
-            return metadata;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
new file mode 100644
index 0000000..d4ca3ea
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cache the log segment metadata
+ */
+public class LogSegmentMetadataCache implements RemovalListener<String, LogSegmentMetadata> {
+
+    private static final Logger logger = LoggerFactory.getLogger(LogSegmentMetadataCache.class);
+
+    private final Cache<String, LogSegmentMetadata> cache;
+    private final boolean isCacheEnabled;
+
+    public LogSegmentMetadataCache(DistributedLogConfiguration conf,
+                                   Ticker ticker) {
+        cache = CacheBuilder.newBuilder()
+                .concurrencyLevel(conf.getNumWorkerThreads())
+                .initialCapacity(1024)
+                .expireAfterAccess(conf.getLogSegmentCacheTTLMs(), TimeUnit.MILLISECONDS)
+                .maximumSize(conf.getLogSegmentCacheMaxSize())
+                .removalListener(this)
+                .ticker(ticker)
+                .recordStats()
+                .build();
+        this.isCacheEnabled = conf.isLogSegmentCacheEnabled();
+        logger.info("Log segment cache is enabled = {}", this.isCacheEnabled);
+    }
+
+    /**
+     * Add the log <i>segment</i> of <i>path</i> to the cache.
+     *
+     * @param path the path of the log segment
+     * @param segment log segment metadata
+     */
+    public void put(String path, LogSegmentMetadata segment) {
+        if (isCacheEnabled) {
+            cache.put(path, segment);
+        }
+    }
+
+    /**
+     * Invalid the cache entry associated with <i>path</i>.
+     *
+     * @param path the path of the log segment
+     */
+    public void invalidate(String path) {
+        if (isCacheEnabled) {
+            cache.invalidate(path);
+        }
+    }
+
+    /**
+     * Retrieve the log segment of <i>path</i> from the cache.
+     *
+     * @param path the path of the log segment.
+     * @return log segment metadata if exists, otherwise null.
+     */
+    public LogSegmentMetadata get(String path) {
+        return cache.getIfPresent(path);
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<String, LogSegmentMetadata> notification) {
+        if (notification.wasEvicted()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Log segment of {} was evicted.", notification.getKey());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
index 430e15f..2ea1671 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -130,24 +130,17 @@ public interface LogSegmentMetadataStore extends Closeable {
     Future<LogSegmentMetadata> getLogSegment(String logSegmentPath);
 
     /**
-     * Retrieve the list of log segments under <code>logSegmentsPath</code>.
+     * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i>
+     * for subsequent changes for the list of log segments.
      *
      * @param logSegmentsPath
      *          path to store list of log segments
-     * @return future of the retrieved list of log segment names
-     */
-    Future<List<String>> getLogSegmentNames(String logSegmentsPath);
-
-    /**
-     * Register a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>.
-     *
-     * @param logSegmentsPath
-     *          log segments path
      * @param listener
      *          log segment listener on log segment changes
+     * @return future of the retrieved list of log segment names
      */
-    void registerLogSegmentListener(String logSegmentsPath,
-                                    LogSegmentNamesListener listener);
+    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
+                                                       LogSegmentNamesListener listener);
 
     /**
      * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
new file mode 100644
index 0000000..f242941
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Managing log segments in local cache.
+ *
+ * <p>
+ * Caching of log segment metadata assumes that the data contained in the ZNodes for individual
+ * log segments is never updated after creation i.e we never call setData. A log segment
+ * is finalized by creating a new ZNode and deleting the in progress node. This code will have
+ * to change if we change the behavior
+ * </p>
+ */
+public class PerStreamLogSegmentCache {
+
+    static final Logger LOG = LoggerFactory.getLogger(PerStreamLogSegmentCache.class);
+
+    protected final String streamName;
+    protected final boolean validateLogSegmentSequenceNumber;
+    protected final Map<String, LogSegmentMetadata> logSegments =
+            new HashMap<String, LogSegmentMetadata>();
+    protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments =
+            new ConcurrentHashMap<Long, LogSegmentMetadata>();
+
+    @VisibleForTesting
+    PerStreamLogSegmentCache(String streamName) {
+        this(streamName, true);
+    }
+
+    public PerStreamLogSegmentCache(String streamName,
+                                    boolean validateLogSegmentSequenceNumber) {
+        this.streamName = streamName;
+        this.validateLogSegmentSequenceNumber = validateLogSegmentSequenceNumber;
+    }
+
+    /**
+     * Retrieve log segments from the cache.
+     *
+     * - first sort the log segments in ascending order
+     * - do validation and assign corresponding sequence id
+     * - apply comparator after validation
+     *
+     * @param comparator
+     *          comparator to sort the returned log segments.
+     * @return list of sorted and filtered log segments.
+     * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap)
+     */
+    public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator)
+        throws UnexpectedException {
+        List<LogSegmentMetadata> segmentsToReturn;
+        synchronized (logSegments) {
+            segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size());
+            segmentsToReturn.addAll(logSegments.values());
+        }
+        Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR);
+
+        LogSegmentMetadata prevSegment = null;
+        if (validateLogSegmentSequenceNumber) {
+            // validation ledger sequence number to ensure the log segments are unique.
+            for (int i = 0; i < segmentsToReturn.size(); i++) {
+                LogSegmentMetadata segment = segmentsToReturn.get(i);
+
+                if (null != prevSegment
+                        && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
+                        && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value
+                        && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) {
+                    LOG.error("{} found ledger sequence number gap between log segment {} and {}",
+                            new Object[] { streamName, prevSegment, segment });
+                    throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment "
+                            + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber());
+                }
+                prevSegment = segment;
+            }
+        }
+
+        prevSegment = null;
+        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
+        for (int i = 0; i < segmentsToReturn.size(); i++) {
+                LogSegmentMetadata segment = segmentsToReturn.get(i);
+            // assign sequence id
+            if (!segment.isInProgress()) {
+                if (segment.supportsSequenceId()) {
+                    startSequenceId = segment.getStartSequenceId() + segment.getRecordCount();
+                    if (null != prevSegment && prevSegment.supportsSequenceId()
+                            && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) {
+                        LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}",
+                                new Object[] { streamName, segment, prevSegment });
+                    }
+                } else {
+                    startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
+                }
+            } else {
+                if (segment.supportsSequenceId()) {
+                    LogSegmentMetadata newSegment = segment.mutator()
+                            .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId)
+                            .build();
+                    segmentsToReturn.set(i, newSegment);
+                }
+
+                break;
+            }
+            prevSegment = segment;
+        }
+        if (comparator != LogSegmentMetadata.COMPARATOR) {
+            Collections.sort(segmentsToReturn, comparator);
+        }
+        return segmentsToReturn;
+    }
+
+    /**
+     * Add the segment <i>metadata</i> for <i>name</i> in the cache.
+     *
+     * @param name
+     *          segment name.
+     * @param metadata
+     *          segment metadata.
+     */
+    public void add(String name, LogSegmentMetadata metadata) {
+        synchronized (logSegments) {
+            if (!logSegments.containsKey(name)) {
+                logSegments.put(name, metadata);
+                LOG.info("{} added log segment ({} : {}) to cache.",
+                        new Object[]{ streamName, name, metadata });
+            }
+            LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLedgerId());
+            if (null == oldMetadata) {
+                lid2LogSegments.put(metadata.getLedgerId(), metadata);
+            } else {
+                if (oldMetadata.isInProgress() && !metadata.isInProgress()) {
+                    lid2LogSegments.put(metadata.getLedgerId(), metadata);
+                } else {
+                    lid2LogSegments.put(oldMetadata.getLedgerId(), oldMetadata);
+                }
+            }
+        }
+    }
+
+    /**
+     * Retrieve log segment <code>name</code> from the cache.
+     *
+     * @param name
+     *          name of the log segment.
+     * @return log segment metadata
+     */
+    public LogSegmentMetadata get(String name) {
+        synchronized (logSegments) {
+            return logSegments.get(name);
+        }
+    }
+
+    /**
+     * Update the log segment cache with removed/added segments.
+     *
+     * @param segmentsRemoved
+     *          segments that removed
+     * @param segmentsAdded
+     *          segments that added
+     */
+    public void update(Set<String> segmentsRemoved,
+                       Map<String, LogSegmentMetadata> segmentsAdded) {
+        synchronized (logSegments) {
+            for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) {
+                add(entry.getKey(), entry.getValue());
+            }
+            for (String segment : segmentsRemoved) {
+                remove(segment);
+            }
+        }
+    }
+
+    /**
+     * Diff with new received segment list <code>segmentReceived</code>.
+     *
+     * @param segmentsReceived
+     *          new received segment list
+     * @return segments added (left) and removed (right).
+     */
+    public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) {
+        Set<String> segmentsAdded;
+        Set<String> segmentsRemoved;
+        synchronized (logSegments) {
+            Set<String> segmentsCached = logSegments.keySet();
+            segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy();
+            segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy();
+        }
+        return Pair.of(segmentsAdded, segmentsRemoved);
+    }
+
+    /**
+     * Remove log segment <code>name</code> from the cache.
+     *
+     * @param name
+     *          name of the log segment.
+     * @return log segment metadata.
+     */
+    public LogSegmentMetadata remove(String name) {
+        synchronized (logSegments) {
+            LogSegmentMetadata metadata = logSegments.remove(name);
+            if (null != metadata) {
+                lid2LogSegments.remove(metadata.getLedgerId(), metadata);
+                LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata);
+            }
+            return metadata;
+        }
+    }
+
+
+}


Mime
View raw message