distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [21/31] incubator-distributedlog git commit: DL-162: Use log segment entry store interface
Date Fri, 30 Dec 2016 00:07:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
index 97f694f..f481561 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
@@ -18,29 +18,25 @@
 package com.twitter.distributedlog;
 
 import java.io.IOException;
-import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.twitter.distributedlog.selector.FirstDLSNNotLessThanSelector;
 import com.twitter.distributedlog.selector.FirstTxIdNotLessThanSelector;
 import com.twitter.distributedlog.selector.LastRecordSelector;
 import com.twitter.distributedlog.selector.LogRecordSelector;
-import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction0;
 import scala.runtime.BoxedUnit;
 
@@ -58,14 +54,14 @@ public class ReadUtils {
     //
 
     /**
-     * Read last record from a ledger.
+     * Read last record from a log segment.
      *
      * @param streamName
      *          fully qualified stream name (used for logging)
      * @param l
-     *          ledger descriptor.
+     *          log segment metadata.
      * @param fence
-     *          whether to fence the ledger.
+     *          whether to fence the log segment.
      * @param includeControl
      *          whether to include control record.
      * @param includeEndOfStream
@@ -78,8 +74,8 @@ public class ReadUtils {
      *          num of records scanned to get last record
      * @param executorService
      *          executor service used for processing entries
-     * @param handleCache
-     *          ledger handle cache
+     * @param entryStore
+     *          log segment entry store
      * @return a future with last record.
      */
     public static Future<LogRecordWithDLSN> asyncReadLastRecord(
@@ -92,20 +88,20 @@ public class ReadUtils {
             final int scanMaxBatchSize,
             final AtomicInteger numRecordsScanned,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache) {
+            final LogSegmentEntryStore entryStore) {
         final LogRecordSelector selector = new LastRecordSelector();
         return asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize,
-                               scanMaxBatchSize, numRecordsScanned, executorService, handleCache,
+                               scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
                                selector, true /* backward */, 0L);
     }
 
     /**
-     * Read first record from a ledger with a DLSN larger than that given.
+     * Read first record from a log segment with a DLSN larger than that given.
      *
      * @param streamName
      *          fully qualified stream name (used for logging)
      * @param l
-     *          ledger descriptor.
+     *          log segment metadata.
      * @param scanStartBatchSize
      *          first num entries used for read last record scan
      * @param scanMaxBatchSize
@@ -114,6 +110,8 @@ public class ReadUtils {
      *          num of records scanned to get last record
      * @param executorService
      *          executor service used for processing entries
+     * @param entryStore
+     *          log segment entry store
      * @param dlsn
      *          threshold dlsn
      * @return a future with last record.
@@ -125,7 +123,7 @@ public class ReadUtils {
             final int scanMaxBatchSize,
             final AtomicInteger numRecordsScanned,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache,
+            final LogSegmentEntryStore entryStore,
             final DLSN dlsn) {
         long startEntryId = 0L;
         if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) {
@@ -133,7 +131,7 @@ public class ReadUtils {
         }
         final LogRecordSelector selector = new FirstDLSNNotLessThanSelector(dlsn);
         return asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize,
-                               scanMaxBatchSize, numRecordsScanned, executorService, handleCache,
+                               scanMaxBatchSize, numRecordsScanned, executorService, entryStore,
                                selector, false /* backward */, startEntryId);
     }
 
@@ -233,14 +231,12 @@ public class ReadUtils {
     }
 
     /**
-     * Read record from a given range of ledger entries.
+     * Read record from a given range of log segment entries.
      *
      * @param streamName
      *          fully qualified stream name (used for logging)
-     * @param ledgerDescriptor
-     *          ledger descriptor.
-     * @param handleCache
-     *          ledger handle cache.
+     * @param reader
+     *          log segment random access reader
      * @param executorService
      *          executor service used for processing entries
      * @param context
@@ -249,8 +245,7 @@ public class ReadUtils {
      */
     private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries(
             final String streamName,
-            final LedgerDescriptor ledgerDescriptor,
-            LedgerHandleCache handleCache,
+            final LogSegmentRandomAccessEntryReader reader,
             final LogSegmentMetadata metadata,
             final ExecutorService executorService,
             final ScanContext context,
@@ -260,22 +255,19 @@ public class ReadUtils {
         final long endEntryId = context.curEndEntryId.get();
         if (LOG.isDebugEnabled()) {
             LOG.debug("{} reading entries [{} - {}] from {}.",
-                    new Object[] { streamName, startEntryId, endEntryId, ledgerDescriptor });
+                    new Object[] { streamName, startEntryId, endEntryId, metadata});
         }
-        FutureEventListener<Enumeration<LedgerEntry>> readEntriesListener =
-            new FutureEventListener<Enumeration<LedgerEntry>>() {
+        FutureEventListener<List<Entry.Reader>> readEntriesListener =
+            new FutureEventListener<List<Entry.Reader>>() {
                 @Override
-                public void onSuccess(final Enumeration<LedgerEntry> entries) {
+                public void onSuccess(final List<Entry.Reader> entries) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("{} finished reading entries [{} - {}] from {}",
-                                new Object[]{ streamName, startEntryId, endEntryId, ledgerDescriptor });
+                                new Object[]{ streamName, startEntryId, endEntryId, metadata});
                     }
-                    LogRecordWithDLSN record = null;
-                    while (entries.hasMoreElements()) {
-                        LedgerEntry entry = entries.nextElement();
+                    for (Entry.Reader entry : entries) {
                         try {
-                            visitEntryRecords(
-                                    streamName, metadata, ledgerDescriptor.getLogSegmentSequenceNo(), entry, context, selector);
+                            visitEntryRecords(entry, context, selector);
                         } catch (IOException ioe) {
                             // exception is only thrown due to bad ledger entry, so it might be corrupted
                             // we shouldn't do anything beyond this point. throw the exception to application
@@ -284,24 +276,21 @@ public class ReadUtils {
                         }
                     }
 
-                    record = selector.result();
+                    LogRecordWithDLSN record = selector.result();
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("{} got record from entries [{} - {}] of {} : {}",
                                 new Object[]{streamName, startEntryId, endEntryId,
-                                        ledgerDescriptor, record});
+                                        metadata, record});
                     }
                     promise.setValue(record);
                 }
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    String errMsg = "Error reading entries [" + startEntryId + "-" + endEntryId
-                                + "] for reading record of " + streamName;
-                    promise.setException(new IOException(errMsg,
-                            BKException.create(FutureUtils.bkResultCode(cause))));
+                    promise.setException(cause);
                 }
             };
-        handleCache.asyncReadEntries(ledgerDescriptor, startEntryId, endEntryId)
+        reader.readEntries(startEntryId, endEntryId)
                 .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
         return promise;
     }
@@ -309,10 +298,6 @@ public class ReadUtils {
     /**
      * Process each record using LogRecordSelector.
      *
-     * @param streamName
-     *          fully qualified stream name (used for logging)
-     * @param logSegmentSeqNo
-     *          ledger sequence number
      * @param entry
      *          ledger entry
      * @param context
@@ -321,22 +306,13 @@ public class ReadUtils {
      * @throws IOException
      */
     private static void visitEntryRecords(
-            String streamName,
-            LogSegmentMetadata metadata,
-            long logSegmentSeqNo,
-            LedgerEntry entry,
+            Entry.Reader entry,
             ScanContext context,
             LogRecordSelector selector) throws IOException {
-        Entry.Reader reader = Entry.newBuilder()
-                .setLogSegmentInfo(logSegmentSeqNo, metadata.getStartSequenceId())
-                .setEntryId(entry.getEntryId())
-                .setEnvelopeEntry(metadata.getEnvelopeEntries())
-                .setInputStream(entry.getEntryInputStream())
-                .buildReader();
-        LogRecordWithDLSN nextRecord = reader.nextRecord();
+        LogRecordWithDLSN nextRecord = entry.nextRecord();
         while (nextRecord != null) {
             LogRecordWithDLSN record = nextRecord;
-            nextRecord = reader.nextRecord();
+            nextRecord = entry.nextRecord();
             context.numRecordsScanned.incrementAndGet();
             if (!context.includeControl && record.isControl()) {
                 continue;
@@ -353,10 +329,8 @@ public class ReadUtils {
      *
      * @param streamName
      *          fully qualified stream name (used for logging)
-     * @param ledgerDescriptor
-     *          ledger descriptor.
-     * @param handleCache
-     *          ledger handle cache.
+     * @param reader
+     *          log segment random access reader
      * @param executorService
      *          executor service used for processing entries
      * @param promise
@@ -366,8 +340,7 @@ public class ReadUtils {
      */
     private static void asyncReadRecordFromEntries(
             final String streamName,
-            final LedgerDescriptor ledgerDescriptor,
-            final LedgerHandleCache handleCache,
+            final LogSegmentRandomAccessEntryReader reader,
             final LogSegmentMetadata metadata,
             final ExecutorService executorService,
             final Promise<LogRecordWithDLSN> promise,
@@ -380,7 +353,7 @@ public class ReadUtils {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("{} read record from [{} - {}] of {} : {}",
                                 new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(),
-                                        ledgerDescriptor, value});
+                                        metadata, value});
                     }
                     if (null != value) {
                         promise.setValue(value);
@@ -393,8 +366,7 @@ public class ReadUtils {
                     }
                     // scan next range
                     asyncReadRecordFromEntries(streamName,
-                            ledgerDescriptor,
-                            handleCache,
+                            reader,
                             metadata,
                             executorService,
                             promise,
@@ -407,14 +379,13 @@ public class ReadUtils {
                     promise.setException(cause);
                 }
             };
-        asyncReadRecordFromEntries(streamName, ledgerDescriptor, handleCache, metadata, executorService, context, selector)
+        asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector)
                 .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
     }
 
     private static void asyncReadRecordFromLogSegment(
             final String streamName,
-            final LedgerDescriptor ledgerDescriptor,
-            final LedgerHandleCache handleCache,
+            final LogSegmentRandomAccessEntryReader reader,
             final LogSegmentMetadata metadata,
             final ExecutorService executorService,
             final int scanStartBatchSize,
@@ -426,16 +397,10 @@ public class ReadUtils {
             final LogRecordSelector selector,
             final boolean backward,
             final long startEntryId) {
-        final long lastAddConfirmed;
-        try {
-            lastAddConfirmed = handleCache.getLastAddConfirmed(ledgerDescriptor);
-        } catch (BKException e) {
-            promise.setException(e);
-            return;
-        }
+        final long lastAddConfirmed = reader.getLastAddConfirmed();
         if (lastAddConfirmed < 0) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Ledger {} is empty for {}.", new Object[] { ledgerDescriptor, streamName });
+                LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName });
             }
             promise.setValue(null);
             return;
@@ -444,7 +409,7 @@ public class ReadUtils {
                 startEntryId, lastAddConfirmed,
                 scanStartBatchSize, scanMaxBatchSize,
                 includeControl, includeEndOfStream, backward, numRecordsScanned);
-        asyncReadRecordFromEntries(streamName, ledgerDescriptor, handleCache, metadata, executorService,
+        asyncReadRecordFromEntries(streamName, reader, metadata, executorService,
                                    promise, context, selector);
     }
 
@@ -458,25 +423,25 @@ public class ReadUtils {
             final int scanMaxBatchSize,
             final AtomicInteger numRecordsScanned,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache,
+            final LogSegmentEntryStore entryStore,
             final LogRecordSelector selector,
             final boolean backward,
             final long startEntryId) {
 
         final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
 
-        FutureEventListener<LedgerDescriptor> openLedgerListener =
-            new FutureEventListener<LedgerDescriptor>() {
+        FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
                 @Override
-                public void onSuccess(final LedgerDescriptor ledgerDescriptor) {
+                public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} Opened logsegment {} for reading record",
+                        LOG.debug("{} Opened log segment {} for reading record",
                                 streamName, l);
                     }
                     promise.ensure(new AbstractFunction0<BoxedUnit>() {
                         @Override
                         public BoxedUnit apply() {
-                            handleCache.asyncCloseLedger(ledgerDescriptor);
+                            reader.asyncClose();
                             return BoxedUnit.UNIT;
                         }
                     });
@@ -485,7 +450,7 @@ public class ReadUtils {
                                 (backward ? "backward" : "forward"), streamName, l});
                     }
                     asyncReadRecordFromLogSegment(
-                            streamName, ledgerDescriptor, handleCache, l, executorService,
+                            streamName, reader, l, executorService,
                             scanStartBatchSize, scanMaxBatchSize,
                             includeControl, includeEndOfStream,
                             promise, numRecordsScanned, selector, backward, startEntryId);
@@ -493,13 +458,11 @@ public class ReadUtils {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    String errMsg = "Error opening log segment [" + l + "] for reading record of " + streamName;
-                    promise.setException(new IOException(errMsg,
-                            BKException.create(FutureUtils.bkResultCode(cause))));
+                    promise.setException(cause);
                 }
             };
-        handleCache.asyncOpenLedger(l, fence)
-                .addEventListener(FutureEventListenerRunnable.of(openLedgerListener, executorService));
+        entryStore.openRandomAccessReader(l, fence)
+                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
         return promise;
     }
 
@@ -530,8 +493,8 @@ public class ReadUtils {
      *          transaction id
      * @param executorService
      *          executor service used for processing entries
-     * @param handleCache
-     *          ledger handle cache
+     * @param entryStore
+     *          log segment entry store
      * @param nWays
      *          how many number of entries to search in parallel
      * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>.
@@ -541,7 +504,7 @@ public class ReadUtils {
             final LogSegmentMetadata segment,
             final long transactionId,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache,
+            final LogSegmentEntryStore entryStore,
             final int nWays) {
         if (!segment.isInProgress()) {
             if (segment.getLastTxId() < transactionId) {
@@ -554,25 +517,19 @@ public class ReadUtils {
 
         final Promise<Optional<LogRecordWithDLSN>> promise =
                 new Promise<Optional<LogRecordWithDLSN>>();
-        final FutureEventListener<LedgerDescriptor> openLedgerListener =
-            new FutureEventListener<LedgerDescriptor>() {
+        final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
+            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
                 @Override
-                public void onSuccess(final LedgerDescriptor ld) {
+                public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
                     promise.ensure(new AbstractFunction0<BoxedUnit>() {
                         @Override
                         public BoxedUnit apply() {
-                            handleCache.asyncCloseLedger(ld);
+                            reader.asyncClose();
                             return BoxedUnit.UNIT;
                         }
 
                     });
-                    long lastEntryId;
-                    try {
-                        lastEntryId = handleCache.getLastAddConfirmed(ld);
-                    } catch (BKException e) {
-                        promise.setException(e);
-                        return;
-                    }
+                    long lastEntryId = reader.getLastAddConfirmed();
                     if (lastEntryId < 0) {
                         // it means that the log segment is created but not written yet or an empty log segment.
                         // it is equivalent to 'all log records whose transaction id is less than provided transactionId'
@@ -586,8 +543,7 @@ public class ReadUtils {
                                 new FirstTxIdNotLessThanSelector(transactionId);
                         asyncReadRecordFromEntries(
                                 logName,
-                                ld,
-                                handleCache,
+                                reader,
                                 segment,
                                 executorService,
                                 new SingleEntryScanContext(0L),
@@ -608,11 +564,10 @@ public class ReadUtils {
                     }
                     getLogRecordNotLessThanTxIdFromEntries(
                             logName,
-                            ld,
                             segment,
                             transactionId,
                             executorService,
-                            handleCache,
+                            reader,
                             Lists.newArrayList(0L, lastEntryId),
                             nWays,
                             Optional.<LogRecordWithDLSN>absent(),
@@ -621,14 +576,12 @@ public class ReadUtils {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    String errMsg = "Error opening log segment [" + segment
-                            + "] for find record from " + logName;
-                    promise.setException(new IOException(errMsg,
-                            BKException.create(FutureUtils.bkResultCode(cause))));
+                    promise.setException(cause);
                 }
             };
-        handleCache.asyncOpenLedger(segment, false)
-                .addEventListener(FutureEventListenerRunnable.of(openLedgerListener, executorService));
+
+        entryStore.openRandomAccessReader(segment, false)
+                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
         return promise;
     }
 
@@ -644,8 +597,8 @@ public class ReadUtils {
      *          provided transaction id to search
      * @param executorService
      *          executor service
-     * @param handleCache
-     *          handle cache
+     * @param reader
+     *          log segment random access reader
      * @param entriesToSearch
      *          list of entries to search
      * @param nWays
@@ -657,11 +610,10 @@ public class ReadUtils {
      */
     private static void getLogRecordNotLessThanTxIdFromEntries(
             final String logName,
-            final LedgerDescriptor ld,
             final LogSegmentMetadata segment,
             final long transactionId,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache,
+            final LogSegmentRandomAccessEntryReader reader,
             final List<Long> entriesToSearch,
             final int nWays,
             final Optional<LogRecordWithDLSN> prevFoundRecord,
@@ -672,8 +624,7 @@ public class ReadUtils {
             LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
             Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries(
                     logName,
-                    ld,
-                    handleCache,
+                    reader,
                     segment,
                     executorService,
                     new SingleEntryScanContext(entryId),
@@ -686,11 +637,10 @@ public class ReadUtils {
                     public void onSuccess(List<LogRecordWithDLSN> resultList) {
                         processSearchResults(
                                 logName,
-                                ld,
                                 segment,
                                 transactionId,
                                 executorService,
-                                handleCache,
+                                reader,
                                 resultList,
                                 nWays,
                                 prevFoundRecord,
@@ -711,11 +661,10 @@ public class ReadUtils {
      */
     static void processSearchResults(
             final String logName,
-            final LedgerDescriptor ld,
             final LogSegmentMetadata segment,
             final long transactionId,
             final ExecutorService executorService,
-            final LedgerHandleCache handleCache,
+            final LogSegmentRandomAccessEntryReader reader,
             final List<LogRecordWithDLSN> searchResults,
             final int nWays,
             final Optional<LogRecordWithDLSN> prevFoundRecord,
@@ -758,11 +707,10 @@ public class ReadUtils {
         }
         getLogRecordNotLessThanTxIdFromEntries(
                 logName,
-                ld,
                 segment,
                 transactionId,
                 executorService,
-                handleCache,
+                reader,
                 nextSearchBatch,
                 nWays,
                 Optional.of(foundRecord),

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
index 22b81a1..0a3fdb0 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.BookKeeperClient;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LedgerHandleCache;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ReadUtils;
@@ -30,6 +29,9 @@ import com.twitter.distributedlog.ZooKeeperClientBuilder;
 import com.twitter.distributedlog.acl.ZKAccessControl;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
@@ -39,10 +41,12 @@ import com.twitter.distributedlog.thrift.AccessControlEntry;
 import com.twitter.distributedlog.tools.DistributedLogTool;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.util.Await;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -50,8 +54,6 @@ import org.apache.commons.cli.ParseException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.net.URI;
@@ -194,18 +196,18 @@ public class DistributedLogAdmin extends DistributedLogTool {
     public static void checkAndRepairDLNamespace(final URI uri,
                                                  final com.twitter.distributedlog.DistributedLogManagerFactory factory,
                                                  final MetadataUpdater metadataUpdater,
-                                                 final ExecutorService executorService,
+                                                 final OrderedScheduler scheduler,
                                                  final BookKeeperClient bkc,
                                                  final String digestpw,
                                                  final boolean verbose,
                                                  final boolean interactive) throws IOException {
-        checkAndRepairDLNamespace(uri, factory, metadataUpdater, executorService, bkc, digestpw, verbose, interactive, 1);
+        checkAndRepairDLNamespace(uri, factory, metadataUpdater, scheduler, bkc, digestpw, verbose, interactive, 1);
     }
 
     public static void checkAndRepairDLNamespace(final URI uri,
                                                  final com.twitter.distributedlog.DistributedLogManagerFactory factory,
                                                  final MetadataUpdater metadataUpdater,
-                                                 final ExecutorService executorService,
+                                                 final OrderedScheduler scheduler,
                                                  final BookKeeperClient bkc,
                                                  final String digestpw,
                                                  final boolean verbose,
@@ -222,7 +224,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
             return;
         }
         Map<String, StreamCandidate> streamCandidates =
-                checkStreams(factory, streams, executorService, bkc, digestpw, concurrency);
+                checkStreams(factory, streams, scheduler, bkc, digestpw, concurrency);
         if (verbose) {
             System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
         }
@@ -248,7 +250,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
     private static Map<String, StreamCandidate> checkStreams(
             final com.twitter.distributedlog.DistributedLogManagerFactory factory,
             final Collection<String> streams,
-            final ExecutorService executorService,
+            final OrderedScheduler scheduler,
             final BookKeeperClient bkc,
             final String digestpw,
             final int concurrency) throws IOException {
@@ -273,7 +275,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
                     StreamCandidate candidate;
                     try {
                         LOG.info("Checking stream {}.", stream);
-                        candidate = checkStream(factory, stream, executorService, bkc, digestpw);
+                        candidate = checkStream(factory, stream, scheduler, bkc, digestpw);
                         LOG.info("Checked stream {} - {}.", stream, candidate);
                     } catch (IOException e) {
                         LOG.error("Error on checking stream {} : ", stream, e);
@@ -316,7 +318,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
     private static StreamCandidate checkStream(
             final com.twitter.distributedlog.DistributedLogManagerFactory factory,
             final String streamName,
-            final ExecutorService executorService,
+            final OrderedScheduler scheduler,
             final BookKeeperClient bkc,
             String digestpw) throws IOException {
         DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
@@ -328,7 +330,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
             List<Future<LogSegmentCandidate>> futures =
                     new ArrayList<Future<LogSegmentCandidate>>(segments.size());
             for (LogSegmentMetadata segment : segments) {
-                futures.add(checkLogSegment(streamName, segment, executorService, bkc, digestpw));
+                futures.add(checkLogSegment(streamName, segment, scheduler, bkc, digestpw));
             }
             List<LogSegmentCandidate> segmentCandidates;
             try {
@@ -354,17 +356,19 @@ public class DistributedLogAdmin extends DistributedLogTool {
     private static Future<LogSegmentCandidate> checkLogSegment(
             final String streamName,
             final LogSegmentMetadata metadata,
-            final ExecutorService executorService,
+            final OrderedScheduler scheduler,
             final BookKeeperClient bkc,
             final String digestpw) {
         if (metadata.isInProgress()) {
             return Future.value(null);
         }
 
-        final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
-                .bkc(bkc)
-                .conf(new DistributedLogConfiguration().setBKDigestPW(digestpw))
-                .build();
+        final LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
+                new DistributedLogConfiguration().setBKDigestPW(digestpw),
+                bkc,
+                scheduler,
+                NullStatsLogger.INSTANCE,
+                AsyncFailureInjector.NULL);
         return ReadUtils.asyncReadLastRecord(
                 streamName,
                 metadata,
@@ -374,8 +378,8 @@ public class DistributedLogAdmin extends DistributedLogTool {
                 4,
                 16,
                 new AtomicInteger(0),
-                executorService,
-                handleCache
+                scheduler,
+                entryStore
         ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
             @Override
             public LogSegmentCandidate apply(LogRecordWithDLSN record) {
@@ -388,12 +392,6 @@ public class DistributedLogAdmin extends DistributedLogTool {
                     return null;
                 }
             }
-        }).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                handleCache.clear();
-                return BoxedUnit.UNIT;
-            }
         });
     }
 
@@ -736,10 +734,14 @@ public class DistributedLogAdmin extends DistributedLogTool {
                             getLogSegmentMetadataStore()) :
                     LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
                             getLogSegmentMetadataStore());
+            OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                    .name("dlck-scheduler")
+                    .corePoolSize(Runtime.getRuntime().availableProcessors())
+                    .build();
             ExecutorService executorService = Executors.newCachedThreadPool();
             BookKeeperClient bkc = getBookKeeperClient();
             try {
-                checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, executorService,
+                checkAndRepairDLNamespace(getUri(), getFactory(), metadataUpdater, scheduler,
                                           bkc, getConf().getBKDigestPW(), verbose, !getForce(), concurrency);
             } finally {
                 SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index 4650949..f7f4acf 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.impl.logsegment;
 
+import com.twitter.distributedlog.BookKeeperClient;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.exceptions.BKTransmitException;
@@ -24,6 +25,7 @@ import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
+import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Future;
@@ -33,13 +35,22 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 import static com.google.common.base.Charsets.UTF_8;
 
 /**
  * BookKeeper Based Entry Store
  */
-public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallback.OpenCallback {
+public class BKLogSegmentEntryStore implements
+        LogSegmentEntryStore,
+        AsyncCallback.OpenCallback,
+        AsyncCallback.DeleteCallback {
+
+    private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
 
     private static class OpenReaderRequest {
 
@@ -56,20 +67,32 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba
 
     }
 
+    private static class DeleteLogSegmentRequest {
+
+        private final LogSegmentMetadata segment;
+        private final Promise<LogSegmentMetadata> deletePromise;
+
+        DeleteLogSegmentRequest(LogSegmentMetadata segment) {
+            this.segment = segment;
+            this.deletePromise = new Promise<LogSegmentMetadata>();
+        }
+
+    }
+
     private final byte[] passwd;
-    private final BookKeeper bk;
+    private final BookKeeperClient bkc;
     private final OrderedScheduler scheduler;
     private final DistributedLogConfiguration conf;
     private final StatsLogger statsLogger;
     private final AsyncFailureInjector failureInjector;
 
     public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
-                                  BookKeeper bk,
+                                  BookKeeperClient bkc,
                                   OrderedScheduler scheduler,
                                   StatsLogger statsLogger,
                                   AsyncFailureInjector failureInjector) {
         this.conf = conf;
-        this.bk = bk;
+        this.bkc = bkc;
         this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
         this.scheduler = scheduler;
         this.statsLogger = statsLogger;
@@ -77,6 +100,36 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba
     }
 
     @Override
+    public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) {
+        DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment);
+        BookKeeper bk;
+        try {
+            bk = this.bkc.get();
+        } catch (IOException e) {
+            return Future.exception(e);
+        }
+        bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request);
+        return request.deletePromise;
+    }
+
+    @Override
+    public void deleteComplete(int rc, Object ctx) {
+        DeleteLogSegmentRequest deleteRequest = (DeleteLogSegmentRequest) ctx;
+        if (BKException.Code.NoSuchLedgerExistsException == rc) {
+            logger.warn("No ledger {} found to delete for {}.",
+                    deleteRequest.segment.getLogSegmentId(), deleteRequest.segment);
+        } else if (BKException.Code.OK != rc) {
+            logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}",
+                    new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment,
+                            BKException.getMessage(rc) });
+            FutureUtils.setException(deleteRequest.deletePromise,
+                    new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc));
+            return;
+        }
+        FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment);
+    }
+
+    @Override
     public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment) {
         throw new UnsupportedOperationException("Not supported yet");
     }
@@ -84,6 +137,12 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba
     @Override
     public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
                                                     long startEntryId) {
+        BookKeeper bk;
+        try {
+            bk = this.bkc.get();
+        } catch (IOException e) {
+            return Future.exception(e);
+        }
         OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);
         if (segment.isInProgress()) {
             bk.asyncOpenLedgerNoRecovery(
@@ -113,15 +172,64 @@ public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallba
             return;
         }
         // successfully open a ledger
-        LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
-                request.segment,
-                lh,
-                request.startEntryId,
-                bk,
-                scheduler,
-                conf,
-                statsLogger,
-                failureInjector);
-        FutureUtils.setValue(request.openPromise, reader);
+        try {
+            LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
+                    request.segment,
+                    lh,
+                    request.startEntryId,
+                    bkc.get(),
+                    scheduler,
+                    conf,
+                    statsLogger,
+                    failureInjector);
+            FutureUtils.setValue(request.openPromise, reader);
+        } catch (IOException e) {
+            FutureUtils.setException(request.openPromise, e);
+        }
+
+    }
+
+    @Override
+    public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment,
+                                                                            final boolean fence) {
+        final BookKeeper bk;
+        try {
+            bk = this.bkc.get();
+        } catch (IOException e) {
+            return Future.exception(e);
+        }
+        final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>();
+        AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() {
+            @Override
+            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+                if (BKException.Code.OK != rc) {
+                    FutureUtils.setException(
+                            openPromise,
+                            new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc));
+                    return;
+                }
+                LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader(
+                        segment,
+                        lh,
+                        conf);
+                FutureUtils.setValue(openPromise, reader);
+            }
+        };
+        if (segment.isInProgress() && !fence) {
+            bk.asyncOpenLedgerNoRecovery(
+                    segment.getLogSegmentId(),
+                    BookKeeper.DigestType.CRC32,
+                    passwd,
+                    this,
+                    openCallback);
+        } else {
+            bk.asyncOpenLedger(
+                    segment.getLogSegmentId(),
+                    BookKeeper.DigestType.CRC32,
+                    passwd,
+                    this,
+                    openCallback);
+        }
+        return openPromise;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
new file mode 100644
index 0000000..9cec80c
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.List;
+
+/**
+ * BookKeeper ledger based random access entry reader.
+ */
+class BKLogSegmentRandomAccessEntryReader implements
+        LogSegmentRandomAccessEntryReader,
+        ReadCallback {
+
+    private final long lssn;
+    private final long startSequenceId;
+    private final boolean envelopeEntries;
+    private final boolean deserializeRecordSet;
+    // state
+    private final LogSegmentMetadata metadata;
+    private final LedgerHandle lh;
+    private Promise<Void> closePromise = null;
+
+    BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata,
+                                        LedgerHandle lh,
+                                        DistributedLogConfiguration conf) {
+        this.metadata = metadata;
+        this.lssn = metadata.getLogSegmentSequenceNumber();
+        this.startSequenceId = metadata.getStartSequenceId();
+        this.envelopeEntries = metadata.getEnvelopeEntries();
+        this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
+        this.lh = lh;
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return lh.getLastAddConfirmed();
+    }
+
+    @Override
+    public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) {
+        Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>();
+        lh.asyncReadEntries(startEntryId, endEntryId, this, promise);
+        return promise;
+    }
+
+    Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
+        return Entry.newBuilder()
+                .setLogSegmentInfo(lssn, startSequenceId)
+                .setEntryId(entry.getEntryId())
+                .setEnvelopeEntry(envelopeEntries)
+                .deserializeRecordSet(deserializeRecordSet)
+                .setInputStream(entry.getEntryInputStream())
+                .buildReader();
+    }
+
+    @Override
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
+        Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx;
+        if (BKException.Code.OK == rc) {
+            List<Entry.Reader> entryList = Lists.newArrayList();
+            while (entries.hasMoreElements()) {
+                try {
+                    entryList.add(processReadEntry(entries.nextElement()));
+                } catch (IOException ioe) {
+                    FutureUtils.setException(promise, ioe);
+                    return;
+                }
+            }
+            FutureUtils.setValue(promise, entryList);
+        } else {
+            FutureUtils.setException(promise,
+                    new BKTransmitException("Failed to read entries :", rc));
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        final Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        BKUtils.closeLedgers(lh).proxyTo(closeFuture);
+        return closeFuture;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
index d8611f9..850f9c8 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -28,6 +28,14 @@ import com.twitter.util.Future;
 public interface LogSegmentEntryStore {
 
     /**
+     * Delete the actual log segment from the entry store.
+     *
+     * @param segment log segment metadata
+     * @return future represent the delete result
+     */
+    Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment);
+
+    /**
      * Open the writer for writing data to the log <i>segment</i>.
      *
      * @param segment the log <i>segment</i> to write data to
@@ -45,4 +53,13 @@ public interface LogSegmentEntryStore {
     Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
                                              long startEntryId);
 
+    /**
+     * Open the reader for reading entries from a random access log <i>segment</i>.
+     *
+     * @param segment the log <i>segment</i> to read entries from
+     * @param fence the flag to fence log segment
+     * @return future represent the opened random access reader
+     */
+    Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment,
+                                                                     boolean fence);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
new file mode 100644
index 0000000..70472ca
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.util.List;
+
+/**
+ * An interface class to read entries {@link com.twitter.distributedlog.Entry}
+ * from a random access log segment.
+ */
+public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable {
+
+    /**
+     * Read entries [startEntryId, endEntryId] from a random access log segment.
+     *
+     * @param startEntryId start entry id
+     * @param endEntryId end entry id
+     * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId].
+     */
+    Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId);
+
+    /**
+     * Return the last add confirmed entry id (LAC).
+     *
+     * @return the last add confirmed entry id.
+     */
+    long getLastAddConfirmed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java
deleted file mode 100644
index 1f15221..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadPhase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.readahead;
-
-/**
- * Enum code represents readahead phases
- */
-public enum ReadAheadPhase {
-    ERROR(-5),
-    TRUNCATED(-4),
-    INTERRUPTED(-3),
-    STOPPED(-2),
-    EXCEPTION_HANDLING(-1),
-    SCHEDULE_READAHEAD(0),
-    GET_LEDGERS(1),
-    OPEN_LEDGER(2),
-    CLOSE_LEDGER(3),
-    READ_LAST_CONFIRMED(4),
-    READ_ENTRIES(5);
-
-    int code;
-
-    ReadAheadPhase(int code) {
-        this.code = code;
-    }
-
-    int getCode() {
-        return this.code;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
deleted file mode 100644
index a58218b..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadTracker.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.readahead;
-
-import com.twitter.distributedlog.ReadAheadCache;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * ReadAheadTracker is tracking the progress of readahead worker. so we could use it to investigate where
- * the readahead worker is.
- */
-public class ReadAheadTracker {
-    // ticks is used to differentiate that the worker enter same phase in different time.
-    final AtomicLong ticks = new AtomicLong(0);
-    // which phase that the worker is in.
-    ReadAheadPhase phase;
-    private final StatsLogger statsLogger;
-    // Gauges and their labels
-    private static final String phaseGaugeLabel = "phase";
-    private final Gauge<Number> phaseGauge;
-    private static final String ticksGaugeLabel = "ticks";
-    private final Gauge<Number> ticksGauge;
-    private static final String cachEntriesGaugeLabel = "cache_entries";
-    private final Gauge<Number> cacheEntriesGauge;
-
-    ReadAheadTracker(String streamName,
-                     final ReadAheadCache cache,
-                     ReadAheadPhase initialPhase,
-                     StatsLogger statsLogger) {
-        this.statsLogger = statsLogger;
-        this.phase = initialPhase;
-        phaseGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return ReadAheadPhase.SCHEDULE_READAHEAD.getCode();
-            }
-
-            @Override
-            public Number getSample() {
-                return phase.getCode();
-            }
-        };
-        this.statsLogger.registerGauge(phaseGaugeLabel, phaseGauge);
-
-        ticksGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return ticks.get();
-            }
-        };
-        this.statsLogger.registerGauge(ticksGaugeLabel, ticksGauge);
-
-        cacheEntriesGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return cache.getNumCachedEntries();
-            }
-        };
-        this.statsLogger.registerGauge(cachEntriesGaugeLabel, cacheEntriesGauge);
-    }
-
-    ReadAheadPhase getPhase() {
-        return this.phase;
-    }
-
-    public void enterPhase(ReadAheadPhase readAheadPhase) {
-        this.ticks.incrementAndGet();
-        this.phase = readAheadPhase;
-    }
-
-    public void unregisterGauge() {
-        this.statsLogger.unregisterGauge(phaseGaugeLabel, phaseGauge);
-        this.statsLogger.unregisterGauge(ticksGaugeLabel, ticksGauge);
-        this.statsLogger.unregisterGauge(cachEntriesGaugeLabel, cacheEntriesGauge);
-    }
-}


Mime
View raw message