distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [14/31] incubator-distributedlog git commit: DL-159: ReadAhead Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader interface
Date Fri, 30 Dec 2016 00:07:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
new file mode 100644
index 0000000..94e618a
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
@@ -0,0 +1,966 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.callback.LogSegmentListener;
+import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
+import com.twitter.distributedlog.exceptions.DLIllegalStateException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.io.AsyncCloseable;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Futures;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * New ReadAhead Reader that uses {@link com.twitter.distributedlog.logsegment.LogSegmentEntryReader}.
+ *
+ * NOTE: all the state changes happen in the same thread. All *unsafe* methods should be submitted to the order
+ * scheduler using stream name as the key.
+ */
+public class ReadAheadEntryReader implements
+        AsyncCloseable,
+        LogSegmentListener,
+        FutureEventListener<List<Entry.Reader>> {
+
+    private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class);
+
+    //
+    // Static Functions
+    //
+
+    private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+        @Override
+        public BoxedUnit apply(LogSegmentEntryReader reader) {
+            reader.start();
+            return BoxedUnit.UNIT;
+        }
+    };
+
+    //
+    // Internal Classes
+    //
+
+    class SegmentReader implements FutureEventListener<LogSegmentEntryReader> {
+
+        private LogSegmentMetadata metadata;
+        private final long startEntryId;
+        private Future<LogSegmentEntryReader> openFuture = null;
+        private LogSegmentEntryReader reader = null;
+        private boolean isStarted = false;
+        private boolean isClosed = false;
+
+        SegmentReader(LogSegmentMetadata metadata,
+                      long startEntryId) {
+            this.metadata = metadata;
+            this.startEntryId = startEntryId;
+        }
+
+        synchronized LogSegmentEntryReader getEntryReader() {
+            return reader;
+        }
+
+        synchronized boolean isBeyondLastAddConfirmed() {
+            return null != reader && reader.isBeyondLastAddConfirmed();
+        }
+
+        synchronized LogSegmentMetadata getSegment() {
+            return metadata;
+        }
+
+        synchronized boolean isReaderOpen() {
+            return null != openFuture;
+        }
+
+        synchronized void openReader() {
+            if (null != openFuture) {
+                return;
+            }
+            openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this);
+        }
+
+        synchronized boolean isReaderStarted() {
+            return isStarted;
+        }
+
+        synchronized void startRead() {
+            if (isStarted) {
+                return;
+            }
+            isStarted = true;
+            if (null != reader) {
+                reader.start();
+            } else {
+                openFuture.onSuccess(START_READER_FUNC);
+            }
+        }
+
+        synchronized Future<List<Entry.Reader>> readNext() {
+            if (null != reader) {
+                checkCatchingUpStatus(reader);
+                return reader.readNext(numReadAheadEntries);
+            } else {
+                return openFuture.flatMap(readFunc);
+            }
+        }
+
+        synchronized void updateLogSegmentMetadata(final LogSegmentMetadata segment) {
+            if (null != reader) {
+                reader.onLogSegmentMetadataUpdated(segment);
+                this.metadata = segment;
+            } else {
+                openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(LogSegmentEntryReader reader) {
+                        reader.onLogSegmentMetadataUpdated(segment);
+                        synchronized (SegmentReader.this) {
+                            SegmentReader.this.metadata = segment;
+                        }
+                        return BoxedUnit.UNIT;
+                    }
+                });
+            }
+        }
+
+        @Override
+        synchronized public void onSuccess(LogSegmentEntryReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            // no-op, the failure will be propagated on first read.
+        }
+
+        synchronized boolean isClosed() {
+            return isClosed;
+        }
+
+        synchronized Future<Void> close() {
+            if (null == openFuture) {
+                return Future.Void();
+            }
+            return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() {
+                @Override
+                public Future<Void> apply(LogSegmentEntryReader reader) {
+                    return reader.asyncClose();
+                }
+            }).ensure(new Function0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    synchronized (SegmentReader.this) {
+                        isClosed = true;
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
+    private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> {
+
+        private final int numEntries;
+
+        ReadEntriesFunc(int numEntries) {
+            this.numEntries = numEntries;
+        }
+
+        @Override
+        public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
+            checkCatchingUpStatus(reader);
+            return reader.readNext(numEntries);
+        }
+    }
+
+    private abstract class CloseableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            synchronized (ReadAheadEntryReader.this) {
+                if (null != closePromise) {
+                    return;
+                }
+                safeRun();
+            }
+
+        }
+
+        abstract void safeRun();
+
+    }
+
+    //
+    // Functions
+    //
+    private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc;
+    private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() {
+        @Override
+        public BoxedUnit apply() {
+            removeClosedSegmentReaders();
+            return BoxedUnit.UNIT;
+        }
+    };
+
+    //
+    // Resources
+    //
+    private final DistributedLogConfiguration conf;
+    private final BKLogReadHandler readHandler;
+    private final LogSegmentEntryStore entryStore;
+    private final OrderedScheduler scheduler;
+
+    //
+    // Parameters
+    //
+    private final String streamName;
+    private final DLSN fromDLSN;
+    private final int maxCachedEntries;
+    private final int numReadAheadEntries;
+    private final int idleWarnThresholdMillis;
+
+    //
+    // Cache
+    //
+    private final LinkedBlockingQueue<Entry.Reader> entryQueue;
+
+    //
+    // State of the reader
+    //
+
+    private boolean isInitialized;
+    private boolean readAheadPaused = false;
+    private Promise<Void> closePromise = null;
+    // segment readers
+    private long currentSegmentSequenceNumber;
+    private SegmentReader currentSegmentReader;
+    private SegmentReader nextSegmentReader;
+    private DLSN lastDLSN;
+    private final EntryPosition nextEntryPosition;
+    private volatile boolean isCatchingUp = true;
+    private final LinkedList<SegmentReader> segmentReaders;
+    private final LinkedList<SegmentReader> segmentReadersToClose;
+    // last exception that this reader encounters
+    private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(null);
+    // last entry added time
+    private final Stopwatch lastEntryAddedTime;
+    // state change notification
+    private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications =
+            new CopyOnWriteArraySet<AsyncNotification>();
+    // idle reader check task
+    private final ScheduledFuture<?> idleReaderCheckTask;
+
+    //
+    // Stats
+    //
+    private final AlertStatsLogger alertStatsLogger;
+
+    public ReadAheadEntryReader(String streamName,
+                                DLSN fromDLSN,
+                                DistributedLogConfiguration conf,
+                                BKLogReadHandler readHandler,
+                                LogSegmentEntryStore entryStore,
+                                OrderedScheduler scheduler,
+                                Ticker ticker,
+                                AlertStatsLogger alertStatsLogger) {
+        this.streamName = streamName;
+        this.fromDLSN = lastDLSN = fromDLSN;
+        this.nextEntryPosition = new EntryPosition(
+                fromDLSN.getLogSegmentSequenceNo(),
+                fromDLSN.getEntryId());
+        this.conf = conf;
+        this.maxCachedEntries = conf.getReadAheadMaxRecords();
+        this.numReadAheadEntries = conf.getReadAheadBatchSize();
+        this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis();
+        this.readHandler = readHandler;
+        this.entryStore = entryStore;
+        this.scheduler = scheduler;
+        this.readFunc = new ReadEntriesFunc(numReadAheadEntries);
+        this.alertStatsLogger = alertStatsLogger;
+
+        // create the segment reader list
+        this.segmentReaders = new LinkedList<SegmentReader>();
+        this.segmentReadersToClose = new LinkedList<SegmentReader>();
+        // create the readahead entry queue
+        this.entryQueue = new LinkedBlockingQueue<Entry.Reader>();
+
+        // start the idle reader detection
+        lastEntryAddedTime = Stopwatch.createStarted(ticker);
+        // start the idle reader check task
+        idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary();
+    }
+
+    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+        if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) {
+            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
+                @Override
+                public void run() {
+                    if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
+                        return;
+                    }
+                    // the readahead has been idle
+                    unsafeCheckIfReadAheadIsIdle();
+                }
+            }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    private void unsafeCheckIfReadAheadIsIdle() {
+        boolean forceReadLogSegments =
+                (null == currentSegmentReader) || currentSegmentReader.isBeyondLastAddConfirmed();
+        if (forceReadLogSegments) {
+            readHandler.readLogSegmentsFromStore(
+                    LogSegmentMetadata.COMPARATOR,
+                    LogSegmentFilter.DEFAULT_FILTER,
+                    null
+            ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    // do nothing here since it would be retried on next idle reader check task
+                }
+
+                @Override
+                public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+                    onSegmentsUpdated(segments.getValue());
+                }
+            });
+        }
+    }
+
+    private void cancelIdleReaderTask() {
+        if (null != idleReaderCheckTask) {
+            idleReaderCheckTask.cancel(true);
+        }
+    }
+
+    @VisibleForTesting
+    EntryPosition getNextEntryPosition() {
+        return nextEntryPosition;
+    }
+
+    @VisibleForTesting
+    SegmentReader getCurrentSegmentReader() {
+        return currentSegmentReader;
+    }
+
+    @VisibleForTesting
+    long getCurrentSegmentSequenceNumber() {
+        return currentSegmentSequenceNumber;
+    }
+
+    @VisibleForTesting
+    SegmentReader getNextSegmentReader() {
+        return nextSegmentReader;
+    }
+
+    @VisibleForTesting
+    LinkedList<SegmentReader> getSegmentReaders() {
+        return segmentReaders;
+    }
+
+    @VisibleForTesting
+    boolean isInitialized() {
+        return isInitialized;
+    }
+
+    private void orderedSubmit(Runnable runnable) {
+        synchronized (this) {
+            if (null != closePromise) {
+                return;
+            }
+        }
+        try {
+            scheduler.submit(streamName, runnable);
+        } catch (RejectedExecutionException ree) {
+            logger.debug("Failed to submit and execute an operation for readhead entry reader of {}",
+                    streamName, ree);
+        }
+    }
+
+    public void start(final List<LogSegmentMetadata> segmentList) {
+        logger.info("Starting the readahead entry reader for {} : segments = {}",
+                readHandler.getFullyQualifiedName(), segmentList);
+        processLogSegments(segmentList);
+    }
+
+    private void removeClosedSegmentReaders() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeRemoveClosedSegmentReaders();
+            }
+        });
+    }
+
+    private void unsafeRemoveClosedSegmentReaders() {
+        SegmentReader reader = segmentReadersToClose.peekFirst();
+        while (null != reader) {
+            if (reader.isClosed()) {
+                segmentReadersToClose.pollFirst();
+                reader = segmentReadersToClose.peekFirst();
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        final Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closePromise = closeFuture = new Promise<Void>();
+        }
+
+        // cancel the idle reader task
+        cancelIdleReaderTask();
+
+        // use runnable here instead of CloseableRunnable,
+        // because we need this to be executed
+        try {
+            scheduler.submit(streamName, new Runnable() {
+                @Override
+                public void run() {
+                    unsafeAsyncClose(closeFuture);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}",
+                    streamName, ree);
+            unsafeAsyncClose(closeFuture);
+        }
+
+        return closeFuture;
+    }
+
+    private void unsafeAsyncClose(Promise<Void> closePromise) {
+        List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
+                segmentReaders.size() + segmentReadersToClose.size() + 1);
+        if (null != currentSegmentReader) {
+            segmentReadersToClose.add(currentSegmentReader);
+        }
+        if (null != nextSegmentReader) {
+            segmentReadersToClose.add(nextSegmentReader);
+        }
+        for (SegmentReader reader : segmentReaders) {
+            segmentReadersToClose.add(reader);
+        }
+        segmentReaders.clear();
+        for (SegmentReader reader : segmentReadersToClose) {
+            closeFutures.add(reader.close());
+        }
+        Futures.collect(closeFutures).proxyTo(closePromise);
+    }
+
+    //
+    // Reader State Changes
+    //
+
+    ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) {
+        this.stateChangeNotifications.add(notification);
+        return this;
+    }
+
+    ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) {
+        this.stateChangeNotifications.remove(notification);
+        return this;
+    }
+
+    private void notifyStateChangeOnSuccess() {
+        for (AsyncNotification notification : stateChangeNotifications) {
+            notification.notifyOnOperationComplete();
+        }
+    }
+
+    private void notifyStateChangeOnFailure(Throwable cause) {
+        for (AsyncNotification notification : stateChangeNotifications) {
+            notification.notifyOnError(cause);
+        }
+    }
+
+    void setLastException(IOException cause) {
+        if (!lastException.compareAndSet(null, cause)) {
+            return;
+        }
+        // the exception is set and notify the state change
+        notifyStateChangeOnFailure(cause);
+    }
+
+    void checkLastException() throws IOException {
+        if (null != lastException.get()) {
+            throw lastException.get();
+        }
+    }
+
+    void checkCatchingUpStatus(LogSegmentEntryReader reader) {
+        if (reader.getSegment().isInProgress()
+                && isCatchingUp
+                && reader.hasCaughtUpOnInprogress()) {
+            logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.",
+                    new Object[] { readHandler.getFullyQualifiedName(),
+                            reader.getLastAddConfirmed(), reader.getSegment() });
+            isCatchingUp = false;
+        }
+    }
+
+    public boolean isReadAheadCaughtUp() {
+        return !isCatchingUp;
+    }
+
+    //
+    // ReadAhead State Machine
+    //
+
+    @Override
+    public void onSuccess(List<Entry.Reader> entries) {
+        lastEntryAddedTime.reset().start();
+        for (Entry.Reader entry : entries) {
+            entryQueue.add(entry);
+        }
+        if (!entries.isEmpty()) {
+            Entry.Reader lastEntry = entries.get(entries.size() - 1);
+            nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1);
+        }
+        // notify on data available
+        notifyStateChangeOnSuccess();
+        if (entryQueue.size() >= maxCachedEntries) {
+            pauseReadAheadOnCacheFull();
+        } else {
+            scheduleReadNext();
+        }
+    }
+
+    @Override
+    public void onFailure(Throwable cause) {
+        if (cause instanceof EndOfLogSegmentException) {
+            // we reach end of the log segment
+            moveToNextLogSegment();
+            return;
+        }
+        if (cause instanceof IOException) {
+            setLastException((IOException) cause);
+        } else {
+            setLastException(new UnexpectedException("Unexpected non I/O exception", cause));
+        }
+    }
+
+    private synchronized void invokeReadAhead() {
+        if (readAheadPaused) {
+            scheduleReadNext();
+            readAheadPaused = false;
+        }
+    }
+
+    private synchronized void pauseReadAheadOnCacheFull() {
+        this.readAheadPaused = true;
+        if (!isCacheFull()) {
+            invokeReadAhead();
+        }
+    }
+
+    private synchronized void pauseReadAheadOnNoMoreLogSegments() {
+        this.readAheadPaused = true;
+    }
+
+    //
+    // Cache Related Methods
+    //
+
+    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
+        if (null != lastException.get()) {
+            throw lastException.get();
+        }
+        Entry.Reader entry;
+        try {
+            entry = entryQueue.poll(waitTime, waitTimeUnit);
+        } catch (InterruptedException e) {
+            throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e);
+        }
+        try {
+            return entry;
+        } finally {
+            // resume readahead if the cache becomes empty
+            if (null != entry && !isCacheFull()) {
+                invokeReadAhead();
+            }
+        }
+    }
+
+    /**
+     * Return number cached entries.
+     *
+     * @return number cached entries.
+     */
+    public int getNumCachedEntries() {
+        return entryQueue.size();
+    }
+
+    /**
+     * Return if the cache is full.
+     *
+     * @return true if the cache is full, otherwise false.
+     */
+    public boolean isCacheFull() {
+        return getNumCachedEntries() >= maxCachedEntries;
+    }
+
+    @VisibleForTesting
+    public boolean isCacheEmpty() {
+        return entryQueue.isEmpty();
+    }
+
+    /**
+     * Check whether the readahead becomes stall.
+     *
+     * @param idleReaderErrorThreshold idle reader error threshold
+     * @param timeUnit time unit of the idle reader error threshold
+     * @return true if the readahead becomes stall, otherwise false.
+     */
+    public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
+        return (lastEntryAddedTime.elapsed(timeUnit) > idleReaderErrorThreshold);
+    }
+
+    //
+    // LogSegment Management
+    //
+
+    void processLogSegments(final List<LogSegmentMetadata> segments) {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeProcessLogSegments(segments);
+            }
+        });
+    }
+
+    private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
+        if (isInitialized) {
+            unsafeReinitializeLogSegments(segments);
+        } else {
+            unsafeInitializeLogSegments(segments);
+        }
+    }
+
+    /**
+     * Update the log segment metadata.
+     *
+     * @param reader the reader to update the metadata
+     * @param newMetadata the new metadata received
+     * @return true if successfully, false on encountering errors
+     */
+    private boolean updateLogSegmentMetadata(SegmentReader reader,
+                                             LogSegmentMetadata newMetadata) {
+        if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+            setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+                    + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
+            return false;
+        }
+        if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) {
+            setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata
+                    + " received after a closed log segment " + reader.getSegment() + " on reading segment "
+                    + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + streamName));
+            return false;
+        }
+        if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) {
+            reader.updateLogSegmentMetadata(newMetadata);
+        }
+        return true;
+    }
+
+    /**
+     * Reinitialize the log segments
+     */
+    private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) {
+        logger.info("Reinitialize log segments with {}", segments);
+        int segmentIdx = 0;
+        for (; segmentIdx < segments.size(); segmentIdx++) {
+            LogSegmentMetadata segment = segments.get(segmentIdx);
+            if (segment.getLogSegmentSequenceNumber() < currentSegmentSequenceNumber) {
+                continue;
+            }
+            break;
+        }
+        if (segmentIdx >= segments.size()) {
+            return;
+        }
+        LogSegmentMetadata segment = segments.get(segmentIdx);
+        if (null != currentSegmentReader) {
+            if (!updateLogSegmentMetadata(currentSegmentReader, segment)) {
+                return;
+            }
+        } else {
+            if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+                setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+                        + streamName + " : current segment sn = " + currentSegmentSequenceNumber
+                        + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
+                return;
+            }
+        }
+        segmentIdx++;
+        if (segmentIdx >= segments.size()) {
+            return;
+        }
+        // check next segment
+        segment = segments.get(segmentIdx);
+        if (null != nextSegmentReader) {
+            if (!updateLogSegmentMetadata(nextSegmentReader, segment)) {
+                return;
+            }
+            segmentIdx++;
+        }
+        // check the segment readers in the queue
+        for (int readerIdx = 0;
+             readerIdx < segmentReaders.size() && segmentIdx < segments.size();
+             readerIdx++, segmentIdx++) {
+            SegmentReader reader = segmentReaders.get(readerIdx);
+            segment = segments.get(segmentIdx);
+            if (!updateLogSegmentMetadata(reader, segment)) {
+                return;
+            }
+        }
+        // add the remaining segments to the reader queue
+        for (; segmentIdx < segments.size(); segmentIdx++) {
+            segment = segments.get(segmentIdx);
+            SegmentReader reader = new SegmentReader(segment, 0L);
+            reader.openReader();
+            segmentReaders.add(reader);
+        }
+        if (null == currentSegmentReader) {
+            unsafeMoveToNextLogSegment();
+        }
+        // resume readahead if necessary
+        invokeReadAhead();
+    }
+
+    /**
+     * Initialize the reader with the log <i>segments</i>.
+     *
+     * @param segments list of log segments
+     */
+    private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) {
+        if (segments.isEmpty()) {
+            // not initialize the background reader, until the first log segment is notified
+            return;
+        }
+        boolean skipTruncatedLogSegments = true;
+        DLSN dlsnToStart = fromDLSN;
+        // positioning the reader
+        for (int i = 0; i < segments.size(); i++) {
+            LogSegmentMetadata segment = segments.get(i);
+            // skip any log segments that have smaller log segment sequence numbers
+            if (segment.getLogSegmentSequenceNumber() < fromDLSN.getLogSegmentSequenceNo()) {
+                continue;
+            }
+            // if the log segment is truncated, skip it.
+            if (skipTruncatedLogSegments &&
+                    !conf.getIgnoreTruncationStatus() &&
+                    segment.isTruncated()) {
+                continue;
+            }
+            // if the log segment is partially truncated, move the start dlsn to the min active dlsn
+            if (skipTruncatedLogSegments &&
+                    !conf.getIgnoreTruncationStatus() &&
+                    segment.isPartiallyTruncated()) {
+                if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+                    dlsnToStart = segment.getMinActiveDLSN();
+                }
+            }
+            skipTruncatedLogSegments = false;
+            if (!isAllowedToPosition(segment, dlsnToStart)) {
+                return;
+            }
+
+            SegmentReader reader = new SegmentReader(segment,
+                    segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo()
+                            ? dlsnToStart.getEntryId() : 0L);
+            segmentReaders.add(reader);
+        }
+        if (segmentReaders.isEmpty()) {
+            // not initialize the background reader, until the first log segment is available to read
+            return;
+        }
+        currentSegmentReader = segmentReaders.pollFirst();
+        currentSegmentReader.openReader();
+        currentSegmentReader.startRead();
+        currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+        unsafeReadNext(currentSegmentReader);
+        if (!segmentReaders.isEmpty()) {
+            for (SegmentReader reader : segmentReaders) {
+                reader.openReader();
+            }
+            unsafePrefetchNextSegment(true);
+        }
+        // mark the reader initialized
+        isInitialized = true;
+    }
+
+    private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) {
+        SegmentReader nextReader = segmentReaders.peekFirst();
+        // open the next log segment if it is inprogress
+        if (null != nextReader) {
+            if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) {
+                return;
+            }
+            nextReader.startRead();
+            nextSegmentReader = nextReader;
+            segmentReaders.pollFirst();
+        }
+    }
+
+    /**
+     * Check if we are allowed to position the reader at <i>fromDLSN</i>.
+     *
+     * @return true if it is allowed, otherwise false.
+     */
+    private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) {
+        if (segment.isTruncated()
+                && segment.getLastDLSN().compareTo(fromDLSN) >= 0
+                && !conf.getIgnoreTruncationStatus()) {
+            setLastException(new AlreadyTruncatedTransactionException(streamName
+                    + " : trying to position read ahead at " + fromDLSN
+                    + " on a segment " + segment + " that is already marked as truncated"));
+            return false;
+        }
+        if (segment.isPartiallyTruncated() &&
+                segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
+            if (conf.getAlertWhenPositioningOnTruncated()) {
+                alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
+                    fromDLSN, segment);
+            }
+            if (!conf.getIgnoreTruncationStatus()) {
+                logger.error("{}: Trying to position reader on {} when {} is marked partially truncated",
+                        new Object[]{ streamName, fromDLSN, segment });
+
+                setLastException(new AlreadyTruncatedTransactionException(streamName
+                        + " : trying to position read ahead at " + fromDLSN
+                        + " on a segment " + segment + " that is already marked as truncated"));
+                return false;
+            }
+        }
+        return true;
+    }
+
+    void moveToNextLogSegment() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                unsafeMoveToNextLogSegment();
+            }
+        });
+    }
+
+    private void unsafeMoveToNextLogSegment() {
+        if (null != currentSegmentReader) {
+            segmentReadersToClose.add(currentSegmentReader);
+            currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc);
+            logger.debug("close current segment reader {}", currentSegmentReader.getSegment());
+            currentSegmentReader = null;
+        }
+        boolean hasSegmentToRead = false;
+        if (null != nextSegmentReader) {
+            currentSegmentReader = nextSegmentReader;
+            logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+            currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+            nextSegmentReader = null;
+            // start reading
+            unsafeReadNext(currentSegmentReader);
+            unsafePrefetchNextSegment(true);
+            hasSegmentToRead = true;
+        } else {
+            unsafePrefetchNextSegment(false);
+            if (null != nextSegmentReader) {
+                currentSegmentReader = nextSegmentReader;
+                logger.debug("move to read segment {}", currentSegmentReader.getSegment());
+                currentSegmentSequenceNumber = currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
+                nextSegmentReader = null;
+                unsafeReadNext(currentSegmentReader);
+                unsafePrefetchNextSegment(true);
+                hasSegmentToRead = true;
+            }
+        }
+        if (!hasSegmentToRead) { // no more segment to read, wait until new log segment arrive
+            if (isCatchingUp) {
+                logger.info("ReadAhead for {} is caught up and no log segments to read now",
+                        readHandler.getFullyQualifiedName());
+                isCatchingUp = false;
+            }
+            pauseReadAheadOnNoMoreLogSegments();
+        }
+    }
+
+    void scheduleReadNext() {
+        orderedSubmit(new CloseableRunnable() {
+            @Override
+            void safeRun() {
+                if (null == currentSegmentReader) {
+                    pauseReadAheadOnNoMoreLogSegments();
+                    return;
+                }
+                unsafeReadNext(currentSegmentReader);
+            }
+        });
+    }
+
+    private void unsafeReadNext(SegmentReader reader) {
+        reader.readNext().addEventListener(this);
+    }
+
+    @Override
+    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+        logger.info("segments is updated with {}", segments);
+        processLogSegments(segments);
+    }
+
+    @Override
+    public void onLogStreamDeleted() {
+        setLastException(new LogNotFoundException("Log stream "
+                + streamName + " is deleted"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index fd3b63f..be8e1b5 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -27,6 +27,7 @@ import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
 import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -37,14 +38,16 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -128,6 +131,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                                  LedgerHandle lh,
                                  Enumeration<LedgerEntry> entries,
                                  Object ctx) {
+            if (failureInjector.shouldInjectCorruption(entryId, entryId)) {
+                rc = BKException.Code.DigestMatchException;
+            }
+            processReadEntries(rc, lh, entries, ctx);
+        }
+
+        void processReadEntries(int rc,
+                                LedgerHandle lh,
+                                Enumeration<LedgerEntry> entries,
+                                Object ctx) {
             if (isDone()) {
                 return;
             }
@@ -155,6 +168,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                                                       long entryId,
                                                       LedgerEntry entry,
                                                       Object ctx) {
+            if (failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) {
+                rc = BKException.Code.DigestMatchException;
+            }
+            processReadEntry(rc, entryId, entry, ctx);
+        }
+
+        void processReadEntry(int rc,
+                              long entryId,
+                              LedgerEntry entry,
+                              Object ctx) {
             if (isDone()) {
                 return;
             }
@@ -245,10 +268,10 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     private final BookKeeper bk;
     private final DistributedLogConfiguration conf;
     private final OrderedScheduler scheduler;
-    private final long startEntryId;
     private final long lssn;
     private final long startSequenceId;
     private final boolean envelopeEntries;
+    private final boolean deserializeRecordSet;
     private final int numPrefetchEntries;
     private final int maxPrefetchEntries;
     // state
@@ -260,29 +283,39 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     private long nextEntryId;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null);
     private final AtomicLong scheduleCount = new AtomicLong(0);
+    private volatile boolean hasCaughtupOnInprogress = false;
     // read retries
     private int readAheadWaitTime;
     private final int maxReadBackoffTime;
     private final AtomicInteger numReadErrors = new AtomicInteger(0);
+    private final boolean skipBrokenEntries;
     // readahead cache
     int cachedEntries = 0;
     int numOutstandingEntries = 0;
     final LinkedBlockingQueue<CacheEntry> readAheadEntries;
     // request queue
-    final ConcurrentLinkedQueue<PendingReadRequest> readQueue;
+    final LinkedList<PendingReadRequest> readQueue;
+
+    // failure injector
+    private final AsyncFailureInjector failureInjector;
+    // Stats
+    private final Counter skippedBrokenEntriesCounter;
 
     BKLogSegmentEntryReader(LogSegmentMetadata metadata,
                             LedgerHandle lh,
                             long startEntryId,
                             BookKeeper bk,
                             OrderedScheduler scheduler,
-                            DistributedLogConfiguration conf) {
+                            DistributedLogConfiguration conf,
+                            StatsLogger statsLogger,
+                            AsyncFailureInjector failureInjector) {
         this.metadata = metadata;
         this.lssn = metadata.getLogSegmentSequenceNumber();
         this.startSequenceId = metadata.getStartSequenceId();
         this.envelopeEntries = metadata.getEnvelopeEntries();
+        this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
         this.lh = lh;
-        this.startEntryId = this.nextEntryId = Math.max(startEntryId, 0);
+        this.nextEntryId = Math.max(startEntryId, 0);
         this.bk = bk;
         this.conf = conf;
         this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment();
@@ -294,17 +327,35 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         // create the readahead queue
         this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>();
         // create the read request queue
-        this.readQueue = new ConcurrentLinkedQueue<PendingReadRequest>();
+        this.readQueue = new LinkedList<PendingReadRequest>();
         // read backoff settings
         this.readAheadWaitTime = conf.getReadAheadWaitTime();
         this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime();
+        // other read settings
+        this.skipBrokenEntries = conf.getReadAheadSkipBrokenEntries();
+
+        // Failure Injection
+        this.failureInjector = failureInjector;
+        // Stats
+        this.skippedBrokenEntriesCounter = statsLogger.getCounter("skipped_broken_entries");
+    }
+
+    @VisibleForTesting
+    public synchronized CacheEntry getOutstandingLongPoll() {
+        return outstandingLongPoll;
+    }
+
+    @VisibleForTesting
+    LinkedBlockingQueue<CacheEntry> getReadAheadEntries() {
+        return this.readAheadEntries;
     }
 
     synchronized LedgerHandle getLh() {
         return lh;
     }
 
-    synchronized LogSegmentMetadata getSegment() {
+    @Override
+    public synchronized LogSegmentMetadata getSegment() {
         return metadata;
     }
 
@@ -318,6 +369,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         prefetchIfNecessary();
     }
 
+    @Override
+    public boolean hasCaughtUpOnInprogress() {
+        return hasCaughtupOnInprogress;
+    }
+
     //
     // Process on Log Segment Metadata Updates
     //
@@ -425,10 +481,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
     }
 
     private void cancelAllPendingReads(Throwable throwExc) {
-        for (PendingReadRequest request : readQueue) {
+        List<PendingReadRequest> requestsToCancel;
+        synchronized (readQueue) {
+            requestsToCancel = Lists.newArrayListWithExpectedSize(readQueue.size());
+            requestsToCancel.addAll(readQueue);
+            readQueue.clear();
+        }
+        for (PendingReadRequest request : requestsToCancel) {
             request.setException(throwExc);
         }
-        readQueue.clear();
     }
 
     //
@@ -475,14 +536,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                         (!isLedgerClosed() && nextEntryId > getLastAddConfirmed() + 1)) {
                     break;
                 }
-                entriesToFetch.add(new CacheEntry(nextEntryId));
+                CacheEntry entry = new CacheEntry(nextEntryId);
+                entriesToFetch.add(entry);
+                readAheadEntries.add(entry);
                 ++numOutstandingEntries;
                 ++cachedEntries;
                 ++nextEntryId;
             }
         }
         for (CacheEntry entry : entriesToFetch) {
-            readAheadEntries.add(entry);
             issueRead(entry);
         }
     }
@@ -518,6 +580,10 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         synchronized (this) {
             this.outstandingLongPoll = cacheEntry;
         }
+
+        if (!hasCaughtupOnInprogress) {
+            hasCaughtupOnInprogress = true;
+        }
         getLh().asyncReadLastConfirmedAndEntry(
                 cacheEntry.entryId,
                 conf.getReadLACLongPollTimeout(),
@@ -535,7 +601,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                 .setLogSegmentInfo(lssn, startSequenceId)
                 .setEntryId(entry.getEntryId())
                 .setEnvelopeEntry(envelopeEntries)
-                .deserializeRecordSet(false)
+                .deserializeRecordSet(deserializeRecordSet)
                 .setInputStream(entry.getEntryInputStream())
                 .buildReader();
     }
@@ -635,12 +701,18 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
 
     private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) {
         while (!nextRequest.hasReadEnoughEntries()) {
-            CacheEntry entry = readAheadEntries.peek();
-            // no entry available in the read ahead cache
+            CacheEntry entry;
+            boolean hitEndOfLogSegment;
+            synchronized (this) {
+                entry = readAheadEntries.peek();
+                hitEndOfLogSegment = (null == entry) && isEndOfLogSegment();
+            }
+            // reach end of log segment
+            if (hitEndOfLogSegment) {
+                setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
+                return;
+            }
             if (null == entry) {
-                if (isEndOfLogSegment()) {
-                    setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
-                }
                 return;
             }
             // entry is not complete yet.
@@ -665,6 +737,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
                     setException(e, false);
                     return;
                 }
+            } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) {
+                // skip this entry and move forward
+                skippedBrokenEntriesCounter.inc();
+                readAheadEntries.poll();
+                continue;
             } else {
                 setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
                         + " @ log segment " + getSegment(), entry.getRc()), false);
@@ -685,7 +762,8 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         return isLedgerClosed() && entryId > getLastAddConfirmed();
     }
 
-    private synchronized boolean isBeyondLastAddConfirmed() {
+    @Override
+    public synchronized boolean isBeyondLastAddConfirmed() {
         return isBeyondLastAddConfirmed(nextEntryId);
     }
 
@@ -693,10 +771,6 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
         return entryId > getLastAddConfirmed();
     }
 
-    private synchronized boolean isNotBeyondLastAddConfirmed() {
-        return isNotBeyondLastAddConfirmed(nextEntryId);
-    }
-
     private boolean isNotBeyondLastAddConfirmed(long entryId) {
         return entryId <= getLastAddConfirmed();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
new file mode 100644
index 0000000..dc96a80
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * BookKeeper Based Entry Store
+ */
+public class BKLogSegmentEntryStore implements LogSegmentEntryStore, AsyncCallback.OpenCallback {
+
+    private static class OpenReaderRequest {
+
+        private final LogSegmentMetadata segment;
+        private final long startEntryId;
+        private final Promise<LogSegmentEntryReader> openPromise;
+
+        OpenReaderRequest(LogSegmentMetadata segment,
+                          long startEntryId) {
+            this.segment = segment;
+            this.startEntryId = startEntryId;
+            this.openPromise = new Promise<LogSegmentEntryReader>();
+        }
+
+    }
+
+    private final byte[] passwd;
+    private final BookKeeper bk;
+    private final OrderedScheduler scheduler;
+    private final DistributedLogConfiguration conf;
+    private final StatsLogger statsLogger;
+    private final AsyncFailureInjector failureInjector;
+
+    public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
+                                  BookKeeper bk,
+                                  OrderedScheduler scheduler,
+                                  StatsLogger statsLogger,
+                                  AsyncFailureInjector failureInjector) {
+        this.conf = conf;
+        this.bk = bk;
+        this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
+        this.scheduler = scheduler;
+        this.statsLogger = statsLogger;
+        this.failureInjector = failureInjector;
+    }
+
+    @Override
+    public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment) {
+        throw new UnsupportedOperationException("Not supported yet");
+    }
+
+    @Override
+    public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
+                                                    long startEntryId) {
+        OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);
+        if (segment.isInProgress()) {
+            bk.asyncOpenLedgerNoRecovery(
+                    segment.getLedgerId(),
+                    BookKeeper.DigestType.CRC32,
+                    passwd,
+                    this,
+                    request);
+        } else {
+            bk.asyncOpenLedger(
+                    segment.getLedgerId(),
+                    BookKeeper.DigestType.CRC32,
+                    passwd,
+                    this,
+                    request);
+        }
+        return request.openPromise;
+    }
+
+    @Override
+    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+        OpenReaderRequest request = (OpenReaderRequest) ctx;
+        if (BKException.Code.OK != rc) {
+            FutureUtils.setException(
+                    request.openPromise,
+                    new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc));
+            return;
+        }
+        // successfully open a ledger
+        LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
+                request.segment,
+                lh,
+                request.startEntryId,
+                bk,
+                scheduler,
+                conf,
+                statsLogger,
+                failureInjector);
+        FutureUtils.setValue(request.openPromise, reader);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java
index ef67266..4145040 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java
@@ -59,7 +59,7 @@ public interface AsyncFailureInjector {
         }
 
         @Override
-        public boolean shouldInjectCorruption() {
+        public boolean shouldInjectCorruption(long startEntryId, long endEntryId) {
             return false;
         }
 
@@ -122,7 +122,10 @@ public interface AsyncFailureInjector {
     /**
      * Return the flag indicating if should inject corruption.
      *
+     * @param startEntryId the start entry id
+     * @param endEntryId the end entry id
      * @return true to inject corruption otherwise false.
      */
-    boolean shouldInjectCorruption();
+    boolean shouldInjectCorruption(long startEntryId, long endEntryId);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java
index 8928494..f3bfea9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java
@@ -153,8 +153,19 @@ public class AsyncRandomFailureInjector implements AsyncFailureInjector {
     }
 
     @Override
-    public boolean shouldInjectCorruption() {
-        return simulateCorruption;
+    public boolean shouldInjectCorruption(long startEntryId, long endEntryId) {
+        if (!simulateCorruption) {
+            return false;
+        }
+        if (startEntryId == endEntryId) {
+            return startEntryId % 10 == 0;
+        }
+        for (long i = startEntryId; i <= endEntryId; i++) {
+            if (i % 10 == 0) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
index d43f3d8..07387cb 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
@@ -39,6 +39,13 @@ public interface LogSegmentEntryReader extends AsyncCloseable {
     void start();
 
     /**
+     * Return the log segment metadata for this reader.
+     *
+     * @return the log segment metadata
+     */
+    LogSegmentMetadata getSegment();
+
+    /**
      * Update the log segment each time when the metadata has changed.
      *
      * @param segment new metadata of the log segment.
@@ -64,4 +71,18 @@ public interface LogSegmentEntryReader extends AsyncCloseable {
      */
     long getLastAddConfirmed();
 
+    /**
+     * Is the reader reading beyond last add confirmed.
+     *
+     * @return true if the reader is reading beyond last add confirmed
+     */
+    boolean isBeyondLastAddConfirmed();
+
+    /**
+     * Has the log segment reader caught up with the inprogress log segment.
+     *
+     * @return true only if the log segment is inprogress and it is caught up, otherwise return false.
+     */
+    boolean hasCaughtUpOnInprogress();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
index ff47691..d8611f9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -39,8 +39,10 @@ public interface LogSegmentEntryStore {
      * Open the reader for reading data to the log <i>segment</i>.
      *
      * @param segment the log <i>segment</i> to read data from
+     * @param startEntryId the start entry id
      * @return future represent the opened reader
      */
-    Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment);
+    Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
+                                             long startEntryId);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
index 2f9869e..a77f753 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
@@ -62,4 +62,4 @@ public class MovingAverageRateFactory {
             avg.sample();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 6c55014..5161b91 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -1245,8 +1245,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                         public void onSuccess(Enumeration<LedgerEntry> entries) {
                             int rc = BKException.Code.OK;
 
-                            // If the range includes an entry id that is a multiple of 10, simulate corruption.
-                            if (failureInjector.shouldInjectCorruption() && rangeContainsSimulatedBrokenEntry(startEntryId, endEntryId)) {
+                            if (failureInjector.shouldInjectCorruption(startEntryId, endEntryId)) {
                                 rc = BKException.Code.DigestMatchException;
                             }
                             readComplete(rc, null, entries, readCtx, startEntryId, endEntryId);
@@ -1259,15 +1258,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncClosea
                     });
         }
 
-        private boolean rangeContainsSimulatedBrokenEntry(long start, long end) {
-            for (long i = start; i <= end; i++) {
-                if (i % 10 == 0) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
         public void readComplete(final int rc, final LedgerHandle lh,
                                  final Enumeration<LedgerEntry> seq, final Object ctx,
                                  final long startEntryId, final long endEntryId) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
index 9f34902..287bd6d 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
@@ -475,6 +475,14 @@ public class OrderedScheduler implements ScheduledExecutorService {
         return chooseExecutor(key).schedule(command, delay, unit);
     }
 
+    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
+                                                  Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit) {
+        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
     public Future<?> submit(Object key, Runnable command) {
         return chooseExecutor(key).submit(command);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
index 47cabba..9927616 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java
@@ -81,7 +81,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase {
         assertEquals(0L, record.getSequenceId());
         DLMTestUtil.verifyLogRecord(record);
 
-        String readLockPath = reader1.bkLedgerManager.getReadLockPath();
+        String readLockPath = reader1.readHandler.getReadLockPath();
         Utils.close(reader1);
 
         // simulate a old stream created without readlock path

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 28f7a74..95d760e 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -140,6 +140,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
             record = reader.readNext(false);
         }
+        reader.close();
         assertEquals(3 * 9, numTrans);
         assertEquals(3 * 9, readDlm.getLogRecordCount());
         readDlm.close();
@@ -339,7 +340,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                         assertTrue(value.getSequenceId() < 0);
                         assertTrue(value.getSequenceId() > startSequenceId);
                     }
-                    LOG.debug("Recevied record {} from {}", value.getDlsn(), reader.getStreamName());
+                    LOG.info("Received record {} from {}", value, reader.getStreamName());
                     assertTrue(!value.isControl());
                     assertTrue(value.getDlsn().getSlotId() == 0);
                     assertTrue(value.getDlsn().compareTo(startPosition) >= 0);
@@ -366,7 +367,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
             @Override
             public void onFailure(Throwable cause) {
-                LOG.debug("Encountered Exception on reading {}", reader.getStreamName(), cause);
+                LOG.error("Encountered Exception on reading {}", reader.getStreamName(), cause);
                 errorsFound.set(true);
                 completionLatch.countDown();
             }
@@ -806,6 +807,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         readerSyncLatch.await();
 
         assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1);
+        reader.stop();
         dlm.close();
     }
 
@@ -906,7 +908,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(1024);
         DistributedLogManager dlm = createNewDLM(confLocal, name);
 
-        int numLogSegments = 20;
+        int numLogSegments = 5;
         int numRecordsPerLogSegment = 10;
 
         final CountDownLatch doneLatch = new CountDownLatch(1);
@@ -924,7 +926,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
 
         reader.start();
 
-        final CountDownLatch writeLatch = new CountDownLatch(200);
+        final CountDownLatch writeLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
         final AtomicBoolean writeErrors = new AtomicBoolean(false);
 
         int txid = 1;
@@ -949,6 +951,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         syncLatch.await();
 
         assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1);
+        reader.stop();
         dlm.close();
     }
 
@@ -1341,7 +1344,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(true);
         confLocal.setReadAheadBatchSize(1);
         confLocal.setReadAheadMaxRecords(1);
-        confLocal.setReaderIdleWarnThresholdMillis(50);
+        confLocal.setReaderIdleWarnThresholdMillis(0);
         confLocal.setReaderIdleErrorThresholdMillis(idleReaderErrorThreshold);
         final DistributedLogManager dlm = createNewDLM(confLocal, name);
         final Thread currentThread = Thread.currentThread();
@@ -1424,6 +1427,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             Assert.assertEquals(segmentSize, recordCount);
         }
         assertFalse(currentThread.isInterrupted());
+        Utils.close(reader);
         executor.shutdown();
     }
 
@@ -1512,7 +1516,6 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                         try {
                             int txid = 1;
                             for (long i = 0; i < numSegments; i++) {
-                                long start = txid;
                                 BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
                                 for (long j = 1; j <= segmentSize; j++) {
                                     writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
@@ -1558,6 +1561,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         assertTrue(!exceptionEncountered);
         Assert.assertEquals(recordCount, segmentSize * numSegments);
         assertTrue(!currentThread.isInterrupted());
+        Utils.close(reader);
         executor.shutdown();
     }
 
@@ -1617,19 +1621,20 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         LOG.info("Read record {}", record);
         assertEquals(1L, record.getTransactionId());
 
-        assertNotNull(reader.bkLedgerManager.readAheadWorker);
-        assertTrue(reader.bkLedgerManager.readAheadCache.getNumCachedEntries() <= maxAllowedCachedRecords);
+        assertNotNull(reader.getReadAheadReader());
+        assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords);
 
         for (int i = 2; i <= numRecords; i++) {
             record = Await.result(reader.readNext());
             LOG.info("Read record {}", record);
             assertEquals((long) i, record.getTransactionId());
             TimeUnit.MILLISECONDS.sleep(20);
-            int numCachedEntries = reader.bkLedgerManager.readAheadCache.getNumCachedEntries();
+            int numCachedEntries = reader.getReadAheadReader().getNumCachedEntries();
             assertTrue("Should cache less than " + batchSize + " records but already found "
                     + numCachedEntries + " records when reading " + i + "th record",
                     numCachedEntries <= maxAllowedCachedRecords);
         }
+        Utils.close(reader);
     }
 
     @Test(timeout = 60000)
@@ -1675,6 +1680,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
+        Utils.close(reader);
     }
 
     @Test(timeout = 60000)
@@ -1694,6 +1700,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
+        writer.close();
 
         BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN);
         try {
@@ -1701,6 +1708,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             fail("Should have thrown");
         } catch (EndOfStreamException ex) {
         }
+        Utils.close(reader);
     }
 
     @Test(timeout = 60000)
@@ -1863,7 +1871,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         } catch (DLIllegalStateException e) {
         }
 
-        reader.asyncClose();
+        Utils.close(reader);
         dlm.close();
     }
 
@@ -2096,6 +2104,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             assertEquals(i+1, record.getPositionWithinLogSegment());
             assertArrayEquals(DLMTestUtil.generatePayload(i+1), record.getPayload());
         }
+        Utils.close(reader1);
+        readDLM1.close();
 
         DistributedLogConfiguration readConf2 = new DistributedLogConfiguration();
         readConf2.addConfiguration(confLocal);
@@ -2124,6 +2134,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
                 assertEquals(5, LogRecordSet.numRecords(record));
             }
         }
+        Utils.close(reader2);
+        readDLM2.close();
     }
 
     @Test(timeout = 60000)
@@ -2152,6 +2164,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         } catch (IdleReaderException ire) {
             // expected
         }
+        Utils.close(reader);
+        writer.close();
+        dlm.close();
     }
 
     @Test(timeout = 60000)
@@ -2177,5 +2192,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
         assertEquals(1L, record.getTransactionId());
         DLMTestUtil.verifyLogRecord(record);
+
+        Utils.close(reader);
+        writer.close();
+        dlm.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
index c029dca..54177c8 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java
@@ -162,7 +162,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L);
 
         // wait until readahead caught up
-        while (!reader.getReadHandler().isReadAheadCaughtUp()) {
+        while (!reader.getReadAheadReader().isReadAheadCaughtUp()) {
             TimeUnit.MILLISECONDS.sleep(20);
         }
 
@@ -178,8 +178,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase {
         logger.info("Write another 10 records");
 
         // wait until readahead move on
-        while (reader.getReadHandler()
-                .readAheadWorker.getNextReadAheadPosition().getEntryId() < 21) {
+        while (reader.getReadAheadReader().getNextEntryPosition().getEntryId() < 21) {
             TimeUnit.MILLISECONDS.sleep(20);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java
new file mode 100644
index 0000000..384d1e8
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntryPosition.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test Case for {@link EntryPosition}
+ */
+public class TestEntryPosition {
+
+    private void checkPosition(EntryPosition position,
+                               long lssn,
+                               long entryId) {
+        assertEquals(position.getLogSegmentSequenceNumber(), lssn);
+        assertEquals(position.getEntryId(), entryId);
+    }
+
+    @Test
+    public void testAdvance() {
+        EntryPosition position = new EntryPosition(9L, 99L);
+
+        checkPosition(position, 9L, 99L);
+
+        // advance (8L, 100L) takes no effect
+        assertFalse(position.advance(8L, 100L));
+        checkPosition(position, 9L, 99L);
+        // advance (9L, 98L) takes no effect
+        assertFalse(position.advance(9L, 98L));
+        checkPosition(position, 9L, 99L);
+        // advance (9L, 99L) takes no effect
+        assertFalse(position.advance(9L, 99L));
+        checkPosition(position, 9L, 99L);
+        // advance (9L, 100L) takes effects
+        assertTrue(position.advance(9L, 100L));
+        checkPosition(position, 9L, 100L);
+        // advance (10L, 0L) takes effects
+        assertTrue(position.advance(10L, 0L));
+        checkPosition(position, 10L, 0L);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
index 775c99d..3f47337 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -148,9 +148,8 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
             LOG.info("Writer stopped after writing {} records, waiting for reader to complete",
                     writeCount.get());
             while (writeCount.get() > (readerThreads[0].getReadCount())) {
-                LOG.info("Write Count = {}, Read Count = {}, ReadAhead = {}",
-                        new Object[] { writeCount.get(), readerThreads[0].getReadCount(),
-                                        reader0.getReadHandler().getReadAheadCache() });
+                LOG.info("Write Count = {}, Read Count = {}",
+                        new Object[] { writeCount.get(), readerThreads[0].getReadCount() });
                 TimeUnit.MILLISECONDS.sleep(100);
             }
             assertEquals(writeCount.get(),

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
deleted file mode 100644
index 71b6834..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.readahead.ReadAheadWorker;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-/**
- * {@link ReadAheadWorker} related test cases.
- */
-public class TestReadAhead extends TestDistributedLogBase {
-
-    static final Logger logger = LoggerFactory.getLogger(TestReadAhead.class);
-
-    @Test(timeout = 60000)
-    public void testNoSuchLedgerExceptionOnReadLAC() throws Exception {
-        String name = "distrlog-nosuchledger-exception-on-readlac";
-        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
-        confLocal.loadConf(conf);
-        confLocal.setReadAheadWaitTime(500);
-        confLocal.setReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis(2000);
-        confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value);
-        confLocal.setLogSegmentSequenceNumberValidationEnabled(false);
-
-        DistributedLogManager dlm = createNewDLM(confLocal, name);
-        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 1L, 1L, false, 0, false);
-        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 2L, 11L, true, 10, true);
-
-        BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name);
-        final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
-        final Future<LogRecordWithDLSN> readFuture = reader.readNext();
-        try {
-            Await.result(readFuture, Duration.fromMilliseconds(2000));
-            fail("Should not read any data beyond an empty inprogress log segment");
-        } catch (TimeoutException e) {
-            // expected
-        }
-
-        LedgerDescriptor ld1;
-        while (null == (ld1 = reader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor())) {
-            Thread.sleep(100);
-        }
-
-        TimeUnit.MILLISECONDS.sleep(2 * 2000);
-
-        LedgerDescriptor ld2;
-        while (null == (ld2 = reader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor())) {
-            Thread.sleep(100);
-        }
-
-        // ledger handle would be re-initialized after reaching error threshold
-        assertTrue("ledger handle should be reinitialized, after reaching error threshold.", ld1 != ld2);
-
-        dlm.close();
-
-        dlm = createNewDLM(confLocal, name);
-        dlm.recover();
-
-        long expectedTxId = 11L;
-        LogRecord record = Await.result(readFuture);
-        assertNotNull(record);
-        DLMTestUtil.verifyLogRecord(record);
-        assertEquals(expectedTxId, record.getTransactionId());
-        expectedTxId++;
-
-        for (int i = 1; i < 10; i++) {
-            record = Await.result(reader.readNext());
-            assertNotNull(record);
-            DLMTestUtil.verifyLogRecord(record);
-            assertEquals(expectedTxId, record.getTransactionId());
-            expectedTxId++;
-        }
-
-        Utils.close(reader);
-        readDLM.close();
-
-    }
-
-    @Test(timeout = 60000)
-    public void testReadAheadWaitOnEndOfStream() throws Exception {
-        String name = "distrlog-readahead-wait-on-end-of-stream";
-        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
-        confLocal.loadConf(conf);
-        confLocal.setZKNumRetries(0);
-        confLocal.setReadAheadWaitTime(500);
-        confLocal.setReadAheadWaitTimeOnEndOfStream(Integer.MAX_VALUE);
-
-        DistributedLogManager dlm = createNewDLM(confLocal, name);
-        DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 3, 10);
-
-        BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name);
-        final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN);
-
-        int numReads = 0;
-        long expectedID = 1;
-        for (long i = 0; i < 3; i++) {
-            for (long j = 1; j <= 10; j++) {
-                LogRecordWithDLSN record = Await.result(reader.readNext());
-                assertEquals(expectedID++, record.getTransactionId());
-                DLMTestUtil.verifyLogRecord(record);
-                ++numReads;
-            }
-        }
-        assertEquals(30, numReads);
-        // we are at the end of the stream and there isn't inprogress log segment
-        Future<LogRecordWithDLSN> readFuture = reader.readNext();
-
-        // make sure readahead is backing off on reading log segment on Integer.MAX_VALUE
-        AsyncNotification notification1;
-        while (null == (notification1 = reader.bkLedgerManager.readAheadWorker.getMetadataNotification())) {
-            Thread.sleep(200);
-        }
-        Thread.sleep(1000);
-
-        // write another record
-        BKSyncLogWriter writer =
-                    (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
-        writer.write(DLMTestUtil.getLogRecordInstance(31L));
-        writer.closeAndComplete();
-
-        LogRecordWithDLSN record = Await.result(readFuture);
-        assertEquals(31L, record.getTransactionId());
-        DLMTestUtil.verifyLogRecord(record);
-
-        Utils.close(reader);
-        readDLM.close();
-
-        dlm.close();
-    }
-
-}


Mime
View raw message