distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [08/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
new file mode 100644
index 0000000..fdb29f3
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -0,0 +1,1325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
+import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.function.GetLastTxIdFunction;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.distributedlog.metadata.LogMetadataForWriter;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.logsegment.RollingPolicy;
+import org.apache.distributedlog.logsegment.SizeBasedRollingPolicy;
+import org.apache.distributedlog.logsegment.TimeBasedRollingPolicy;
+import org.apache.distributedlog.metadata.LogStreamMetadataStore;
+import org.apache.distributedlog.metadata.MetadataUpdater;
+import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import org.apache.distributedlog.util.Allocator;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Transaction;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
+
+/**
+ * Log Handler for Writers.
+ *
+ * <h3>Metrics</h3>
+ * All the metrics about log write handler are exposed under scope `segments`.
+ * <ul>
+ * <li> `segments`/open : opstats. latency characteristics on starting a new log segment.
+ * <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment.
+ * <li> `segments`/recover : opstats. latency characteristics on recovering a log segment.
+ * <li> `segments`/delete : opstats. latency characteristics on deleting a log segment.
+ * </ul>
+ */
+class BKLogWriteHandler extends BKLogHandler {
+    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
+
+    private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER =
+            new Transaction.OpListener<LogSegmentEntryWriter>() {
+        @Override
+        public void onCommit(LogSegmentEntryWriter r) {
+            // no-op
+        }
+
+        @Override
+        public void onAbort(Throwable t) {
+            // no-op
+        }
+    };
+
+    protected final LogMetadataForWriter logMetadataForWriter;
+    protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator;
+    protected final DistributedLock lock;
+    protected final MaxTxId maxTxId;
+    protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
+    protected final boolean validateLogSegmentSequenceNumber;
+    protected final int regionId;
+    protected final RollingPolicy rollingPolicy;
+    protected Future<? extends DistributedLock> lockFuture = null;
+    protected final PermitLimiter writeLimiter;
+    protected final FeatureProvider featureProvider;
+    protected final DynamicDistributedLogConfiguration dynConf;
+    protected final MetadataUpdater metadataUpdater;
+    // tracking the inprogress log segments
+    protected final LinkedList<Long> inprogressLSSNs;
+
+    // Fetch LogSegments State: write can continue without full list of log segments while truncation needs
+    private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
+    private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
+
+    // Recover Functions
+    private final RecoverLogSegmentFunction recoverLogSegmentFunction =
+            new RecoverLogSegmentFunction();
+    private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction =
+            new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() {
+                @Override
+                public Future<Long> apply(List<LogSegmentMetadata> segmentList) {
+                    LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
+                    // if lastLedgerRollingTimeMillis is not updated, we set it to now.
+                    synchronized (BKLogWriteHandler.this) {
+                        if (lastLedgerRollingTimeMillis < 0) {
+                            lastLedgerRollingTimeMillis = Utils.nowInMillis();
+                        }
+                    }
+
+                    if (validateLogSegmentSequenceNumber) {
+                        synchronized (inprogressLSSNs) {
+                            for (LogSegmentMetadata segment : segmentList) {
+                                if (segment.isInProgress()) {
+                                    inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber());
+                                }
+                            }
+                        }
+                    }
+
+                    return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map(
+                            GetLastTxIdFunction.INSTANCE);
+                }
+            };
+
+    // Stats
+    private final StatsLogger perLogStatsLogger;
+    private final OpStatsLogger closeOpStats;
+    private final OpStatsLogger openOpStats;
+    private final OpStatsLogger recoverOpStats;
+    private final OpStatsLogger deleteOpStats;
+
+    /**
+     * Construct a Bookkeeper journal manager.
+     */
+    BKLogWriteHandler(LogMetadataForWriter logMetadata,
+                      DistributedLogConfiguration conf,
+                      LogStreamMetadataStore streamMetadataStore,
+                      LogSegmentMetadataCache metadataCache,
+                      LogSegmentEntryStore entryStore,
+                      OrderedScheduler scheduler,
+                      Allocator<LogSegmentEntryWriter, Object> segmentAllocator,
+                      StatsLogger statsLogger,
+                      StatsLogger perLogStatsLogger,
+                      AlertStatsLogger alertStatsLogger,
+                      String clientId,
+                      int regionId,
+                      PermitLimiter writeLimiter,
+                      FeatureProvider featureProvider,
+                      DynamicDistributedLogConfiguration dynConf,
+                      DistributedLock lock /** owned by handler **/) {
+        super(logMetadata,
+                conf,
+                streamMetadataStore,
+                metadataCache,
+                entryStore,
+                scheduler,
+                statsLogger,
+                alertStatsLogger,
+                clientId);
+        this.logMetadataForWriter = logMetadata;
+        this.logSegmentAllocator = segmentAllocator;
+        this.perLogStatsLogger = perLogStatsLogger;
+        this.writeLimiter = writeLimiter;
+        this.featureProvider = featureProvider;
+        this.dynConf = dynConf;
+        this.lock = lock;
+        this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
+
+        if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
+            this.regionId = regionId;
+        } else {
+            this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
+        }
+        this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
+
+        // Construct the max sequence no
+        maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
+        inprogressLSSNs = new LinkedList<Long>();
+        // Construct the max txn id.
+        maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
+
+        // Schedule fetching log segment list in background before we access it.
+        // We don't need to watch the log segment list changes for writer, as it manages log segment list.
+        fetchForWrite = readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                WRITE_HANDLE_FILTER,
+                null);
+
+        // Initialize other parameters.
+        setLastLedgerRollingTimeMillis(Utils.nowInMillis());
+
+        // Rolling Policy
+        if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
+            rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
+        } else {
+            rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
+        }
+
+        // Stats
+        StatsLogger segmentsStatsLogger = statsLogger.scope("segments");
+        openOpStats = segmentsStatsLogger.getOpStatsLogger("open");
+        closeOpStats = segmentsStatsLogger.getOpStatsLogger("close");
+        recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover");
+        deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
+    }
+
+    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
+            final Comparator<LogSegmentMetadata> comparator) {
+        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
+                try {
+                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                } catch (UnexpectedException e) {
+                    FutureUtils.setException(promise, e);
+                }
+            }
+        });
+        return promise;
+    }
+
+    private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
+            final Comparator<LogSegmentMetadata> comparator) {
+        Future<Versioned<List<LogSegmentMetadata>>> result;
+        synchronized (this) {
+            if (null == fetchForTruncation) {
+                fetchForTruncation = readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null);
+            }
+            result = fetchForTruncation;
+        }
+
+        final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
+        result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
+                try {
+                    FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+                } catch (UnexpectedException e) {
+                    FutureUtils.setException(promise, e);
+                }
+            }
+        });
+        return promise;
+    }
+
+    // Transactional operations for MaxLogSegmentSequenceNo
+    void storeMaxSequenceNumber(final Transaction<Object> txn,
+                                final MaxLogSegmentSequenceNo maxSeqNo,
+                                final long seqNo,
+                                final boolean isInprogress) {
+        metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
+                new Transaction.OpListener<Version>() {
+            @Override
+            public void onCommit(Version version) {
+                if (validateLogSegmentSequenceNumber) {
+                    synchronized (inprogressLSSNs) {
+                        if (isInprogress) {
+                            inprogressLSSNs.add(seqNo);
+                        } else {
+                            inprogressLSSNs.removeFirst();
+                        }
+                    }
+                }
+                maxSeqNo.update(version, seqNo);
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                // no-op
+            }
+        });
+    }
+
+    // Transactional operations for MaxTxId
+    void storeMaxTxId(final Transaction<Object> txn,
+                      final MaxTxId maxTxId,
+                      final long txId) {
+        metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
+                new Transaction.OpListener<Version>() {
+                    @Override
+                    public void onCommit(Version version) {
+                                                        maxTxId.update(version, txId);
+                                                                                      }
+
+                    @Override
+                    public void onAbort(Throwable t) {
+                        // no-op
+                    }
+                });
+    }
+
+    // Transactional operations for logsegment
+    void writeLogSegment(final Transaction<Object> txn,
+                         final LogSegmentMetadata metadata) {
+        metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
+            @Override
+            public void onCommit(Void r) {
+                addLogSegmentToCache(metadata.getSegmentName(), metadata);
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                // no-op
+            }
+        });
+    }
+
+    void deleteLogSegment(final Transaction<Object> txn,
+                          final LogSegmentMetadata metadata) {
+        metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
+            @Override
+            public void onCommit(Void r) {
+                removeLogSegmentFromCache(metadata.getSegmentName());
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                // no-op
+            }
+        });
+    }
+
+    /**
+     * The caller could call this before any actions, which to hold the lock for
+     * the write handler of its whole lifecycle. The lock will only be released
+     * when closing the write handler.
+     *
+     * This method is useful to prevent releasing underlying zookeeper lock during
+     * recovering/completing log segments. Releasing underlying zookeeper lock means
+     * 1) increase latency when re-lock on starting new log segment. 2) increase the
+     * possibility of a stream being re-acquired by other instances.
+     *
+     * @return future represents the lock result
+     */
+    Future<? extends DistributedLock> lockHandler() {
+        if (null != lockFuture) {
+            return lockFuture;
+        }
+        lockFuture = lock.asyncAcquire();
+        return lockFuture;
+    }
+
+    Future<Void> unlockHandler() {
+        if (null != lockFuture) {
+            return lock.asyncClose();
+        } else {
+            return Future.Void();
+        }
+    }
+
+    /**
+     * Start a new log segment in a BookKeeper ledger.
+     * First ensure that we have the write lock for this journal.
+     * Then create a ledger and stream based on that ledger.
+     * The ledger id is written to the inprogress znode, so that in the
+     * case of a crash, a recovery process can find the ledger we were writing
+     * to when we crashed.
+     *
+     * @param txId First transaction id to be written to the stream
+     * @return
+     * @throws IOException
+     */
+    public BKLogSegmentWriter startLogSegment(long txId) throws IOException {
+        return startLogSegment(txId, false, false);
+    }
+
+    /**
+     * Start a new log segment in a BookKeeper ledger.
+     * First ensure that we have the write lock for this journal.
+     * Then create a ledger and stream based on that ledger.
+     * The ledger id is written to the inprogress znode, so that in the
+     * case of a crash, a recovery process can find the ledger we were writing
+     * to when we crashed.
+     *
+     * @param txId First transaction id to be written to the stream
+     * @param bestEffort
+     * @param allowMaxTxID
+     *          allow using max tx id to start log segment
+     * @return
+     * @throws IOException
+     */
+    public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID)
+            throws IOException {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        boolean success = false;
+        try {
+            BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID);
+            success = true;
+            return writer;
+        } finally {
+            if (success) {
+                openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            } else {
+                openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+        }
+    }
+
+    protected long assignLogSegmentSequenceNumber() throws IOException {
+        // For any active stream we will always make sure that there is at least one
+        // active ledger (except when the stream first starts out). Therefore when we
+        // see no ledger metadata for a stream, we assume that this is the first ledger
+        // in the stream
+        long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
+        boolean logSegmentsFound = false;
+
+        if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) {
+            List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
+            Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc);
+
+            if (null == nextLogSegmentSeqNo) {
+                logSegmentsFound = false;
+                // we don't find last assigned log segment sequence number
+                // then we start the log segment with configured FirstLogSegmentSequenceNumber.
+                logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber();
+            } else {
+                logSegmentsFound = true;
+                // latest log segment is assigned with a sequence number, start with next sequence number
+                logSegmentSeqNo = nextLogSegmentSeqNo;
+            }
+        }
+
+        // We only skip log segment sequence number validation only when no log segments found &
+        // the maximum log segment sequence number is "UNASSIGNED".
+        if (!logSegmentsFound &&
+            (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == maxLogSegmentSequenceNo.getSequenceNumber())) {
+            // no ledger seqno stored in /ledgers before
+            LOG.info("No max ledger sequence number found while creating log segment {} for {}.",
+                logSegmentSeqNo, getFullyQualifiedName());
+        } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) {
+            LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}",
+                new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(),
+                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
+            // there is max log segment number recorded there and it isn't match. throw exception.
+            throw new DLIllegalStateException("Unexpected max log segment sequence number "
+                + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+                + ", expected " + (logSegmentSeqNo - 1));
+        }
+
+        return logSegmentSeqNo;
+    }
+
+    protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException {
+        return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
+    }
+
+    protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
+                                                              final boolean bestEffort,
+                                                              final boolean allowMaxTxID) {
+        final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
+        try {
+            lock.checkOwnershipAndReacquire();
+        } catch (LockingException e) {
+            FutureUtils.setException(promise, e);
+            return promise;
+        }
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> list) {
+                doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
+            }
+        });
+        return promise;
+    }
+
+    protected void doStartLogSegment(final long txId,
+                                     final boolean bestEffort,
+                                     final boolean allowMaxTxID,
+                                     final Promise<BKLogSegmentWriter> promise) {
+        // validate the tx id
+        if ((txId < 0) ||
+                (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
+            FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
+            return;
+        }
+
+        long highestTxIdWritten = maxTxId.get();
+        if (txId < highestTxIdWritten) {
+            if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
+                LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
+                FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+                return;
+            } else {
+                LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
+                FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+                return;
+            }
+        }
+
+        try {
+            logSegmentAllocator.allocate();
+        } catch (IOException e) {
+            // failed to issue an allocation request
+            failStartLogSegment(promise, bestEffort, e);
+            return;
+        }
+
+        // start the transaction from zookeeper
+        final Transaction<Object> txn = streamMetadataStore.newTransaction();
+
+        // failpoint injected before creating ledger
+        try {
+            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
+        } catch (IOException ioe) {
+            failStartLogSegment(promise, bestEffort, ioe);
+            return;
+        }
+
+        logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
+                .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() {
+
+            @Override
+            public void onSuccess(LogSegmentEntryWriter entryWriter) {
+                // try-obtain succeed
+                createInprogressLogSegment(
+                        txn,
+                        txId,
+                        entryWriter,
+                        bestEffort,
+                        promise);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                failStartLogSegment(promise, bestEffort, cause);
+            }
+        });
+    }
+
+    private void failStartLogSegment(Promise<BKLogSegmentWriter> promise,
+                                     boolean bestEffort,
+                                     Throwable cause) {
+        if (bestEffort) {
+            FutureUtils.setValue(promise, null);
+        } else {
+            FutureUtils.setException(promise, cause);
+        }
+    }
+
+    // once the ledger handle is obtained from allocator, this function should guarantee
+    // either the transaction is executed or aborted. Otherwise, the ledger handle will
+    // just leak from the allocation pool - hence cause "No Ledger Allocator"
+    private void createInprogressLogSegment(Transaction<Object> txn,
+                                            final long txId,
+                                            final LogSegmentEntryWriter entryWriter,
+                                            boolean bestEffort,
+                                            final Promise<BKLogSegmentWriter> promise) {
+        final long logSegmentSeqNo;
+        try {
+            FailpointUtils.checkFailPoint(
+                    FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
+            logSegmentSeqNo = assignLogSegmentSequenceNumber();
+        } catch (IOException e) {
+            // abort the current prepared transaction
+            txn.abort(e);
+            failStartLogSegment(promise, bestEffort, e);
+            return;
+        }
+
+        final String inprogressZnodePath = inprogressZNode(
+                entryWriter.getLogSegmentId(), txId, logSegmentSeqNo);
+        final LogSegmentMetadata l =
+            new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
+                conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId)
+                    .setLogSegmentSequenceNo(logSegmentSeqNo)
+                    .setRegionId(regionId)
+                    .setEnvelopeEntries(
+                            LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
+                    .build();
+
+        // Create an inprogress segment
+        writeLogSegment(txn, l);
+
+        // Try storing max sequence number.
+        LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
+        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true);
+
+        // Try storing max tx id.
+        LOG.debug("Try storing MaxTxId in startLogSegment  {} {}", inprogressZnodePath, txId);
+        storeMaxTxId(txn, maxTxId, txId);
+
+        txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+
+            @Override
+            public void onSuccess(Void value) {
+                try {
+                    FutureUtils.setValue(promise, new BKLogSegmentWriter(
+                            getFullyQualifiedName(),
+                            l.getSegmentName(),
+                            conf,
+                            conf.getDLLedgerMetadataLayoutVersion(),
+                            entryWriter,
+                            lock,
+                            txId,
+                            logSegmentSeqNo,
+                            scheduler,
+                            statsLogger,
+                            perLogStatsLogger,
+                            alertStatsLogger,
+                            writeLimiter,
+                            featureProvider,
+                            dynConf));
+                } catch (IOException ioe) {
+                    failStartLogSegment(promise, false, ioe);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                failStartLogSegment(promise, false, cause);
+            }
+        }, scheduler));
+    }
+
+    boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
+        return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis);
+    }
+
+    /**
+     * Finalize a log segment. If the journal manager is currently
+     * writing to a ledger, ensure that this is the ledger of the log segment
+     * being finalized.
+     * <p/>
+     * Otherwise this is the recovery case. In the recovery case, ensure that
+     * the firstTxId of the ledger matches firstTxId for the segment we are
+     * trying to finalize.
+     */
+    Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
+        final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+        completeAndCloseLogSegment(writer, promise);
+        return promise;
+    }
+
+    private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
+                                            final Promise<LogSegmentMetadata> promise) {
+        writer.asyncClose().addEventListener(new FutureEventListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                // in theory closeToFinalize should throw exception if a stream is in error.
+                // just in case, add another checking here to make sure we don't close log segment is a stream is in error.
+                if (writer.shouldFailCompleteLogSegment()) {
+                    FutureUtils.setException(promise,
+                            new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error."));
+                    return;
+                }
+                doCompleteAndCloseLogSegment(
+                        inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
+                        writer.getLogSegmentSequenceNumber(),
+                        writer.getLogSegmentId(),
+                        writer.getStartTxId(),
+                        writer.getLastTxId(),
+                        writer.getPositionWithinLogSegment(),
+                        writer.getLastDLSN().getEntryId(),
+                        writer.getLastDLSN().getSlotId(),
+                        promise);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+        });
+    }
+
+    @VisibleForTesting
+    LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo,
+                                                  long logSegmentId,
+                                                  long firstTxId,
+                                                  long lastTxId,
+                                                  int recordCount)
+        throws IOException {
+        return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo), logSegmentSeqNo,
+            logSegmentId, firstTxId, lastTxId, recordCount, -1, -1);
+    }
+
+    /**
+     * Finalize a log segment. If the journal manager is currently
+     * writing to a ledger, ensure that this is the ledger of the log segment
+     * being finalized.
+     * <p/>
+     * Otherwise this is the recovery case. In the recovery case, ensure that
+     * the firstTxId of the ledger matches firstTxId for the segment we are
+     * trying to finalize.
+     */
+    LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo,
+                                                  long logSegmentId, long firstTxId, long lastTxId,
+                                                  int recordCount, long lastEntryId, long lastSlotId)
+            throws IOException {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        boolean success = false;
+        try {
+            LogSegmentMetadata completedLogSegment =
+                    doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo,
+                            logSegmentId, firstTxId, lastTxId, recordCount,
+                            lastEntryId, lastSlotId);
+            success = true;
+            return completedLogSegment;
+        } finally {
+            if (success) {
+                closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            } else {
+                closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+        }
+    }
+
+    protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException {
+        if (!segment.isInProgress()) {
+            return segment.getStartSequenceId();
+        }
+
+        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
+
+        // we only record sequence id when both write version and logsegment's version support sequence id
+        if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion())
+                && segment.supportsSequenceId()) {
+            List<LogSegmentMetadata> logSegmentDescList =
+                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
+            startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment);
+        }
+
+        return startSequenceId;
+    }
+
+    /**
+     * Close log segment
+     *
+     * @param inprogressZnodeName
+     * @param logSegmentSeqNo
+     * @param logSegmentId
+     * @param firstTxId
+     * @param lastTxId
+     * @param recordCount
+     * @param lastEntryId
+     * @param lastSlotId
+     * @throws IOException
+     */
+    protected LogSegmentMetadata doCompleteAndCloseLogSegment(
+            String inprogressZnodeName,
+            long logSegmentSeqNo,
+            long logSegmentId,
+            long firstTxId,
+            long lastTxId,
+            int recordCount,
+            long lastEntryId,
+            long lastSlotId) throws IOException {
+        Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+        doCompleteAndCloseLogSegment(
+                inprogressZnodeName,
+                logSegmentSeqNo,
+                logSegmentId,
+                firstTxId,
+                lastTxId,
+                recordCount,
+                lastEntryId,
+                lastSlotId,
+                promise);
+        return FutureUtils.result(promise);
+    }
+
+    protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
+                                                final long logSegmentSeqNo,
+                                                final long logSegmentId,
+                                                final long firstTxId,
+                                                final long lastTxId,
+                                                final int recordCount,
+                                                final long lastEntryId,
+                                                final long lastSlotId,
+                                                final Promise<LogSegmentMetadata> promise) {
+        fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+                doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
+                        inprogressZnodeName,
+                        logSegmentSeqNo,
+                        logSegmentId,
+                        firstTxId,
+                        lastTxId,
+                        recordCount,
+                        lastEntryId,
+                        lastSlotId,
+                        promise);
+            }
+        });
+    }
+
+    private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
+            final String inprogressZnodeName,
+            long logSegmentSeqNo,
+            long logSegmentId,
+            long firstTxId,
+            long lastTxId,
+            int recordCount,
+            long lastEntryId,
+            long lastSlotId,
+            final Promise<LogSegmentMetadata> promise) {
+        try {
+            lock.checkOwnershipAndReacquire();
+        } catch (IOException ioe) {
+            FutureUtils.setException(promise, ioe);
+            return;
+        }
+
+        LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
+        LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
+
+        // validate log segment
+        if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
+            FutureUtils.setException(promise, new IOException(
+                "Active ledger has different ID to inprogress. "
+                    + inprogressLogSegment.getLogSegmentId() + " found, "
+                    + logSegmentId + " expected"));
+            return;
+        }
+        // validate the transaction id
+        if (inprogressLogSegment.getFirstTxId() != firstTxId) {
+            FutureUtils.setException(promise, new IOException("Transaction id not as expected, "
+                + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
+            return;
+        }
+        // validate the log sequence number
+        if (validateLogSegmentSequenceNumber) {
+            synchronized (inprogressLSSNs) {
+                if (inprogressLSSNs.isEmpty()) {
+                    FutureUtils.setException(promise, new UnexpectedException(
+                            "Didn't find matched inprogress log segments when completing inprogress "
+                                    + inprogressLogSegment));
+                    return;
+                }
+                long leastInprogressLSSN = inprogressLSSNs.getFirst();
+                // the log segment sequence number in metadata {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
+                // should be same as the sequence number we are completing (logSegmentSeqNo)
+                // and
+                // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs}
+                if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) ||
+                        (leastInprogressLSSN != logSegmentSeqNo)) {
+                    FutureUtils.setException(promise, new UnexpectedException(
+                            "Didn't find matched inprogress log segments when completing inprogress "
+                                    + inprogressLogSegment));
+                    return;
+                }
+            }
+        }
+
+        // store max sequence number.
+        long maxSeqNo= Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
+        if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo ||
+                (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
+            // ignore the case that a new inprogress log segment is pre-allocated
+            // before completing current inprogress one
+            LOG.info("Try storing max sequence number {} in completing {}.",
+                    new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
+        } else {
+            LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
+                    new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
+            if (validateLogSegmentSequenceNumber) {
+                FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
+                        + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+                        + ", expected " + (logSegmentSeqNo - 1)));
+                return;
+            }
+        }
+
+        // Prepare the completion
+        final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
+        long startSequenceId;
+        try {
+            startSequenceId = computeStartSequenceId(inprogressLogSegment);
+        } catch (IOException ioe) {
+            FutureUtils.setException(promise, ioe);
+            return;
+        }
+        // write completed ledger znode
+        final LogSegmentMetadata completedLogSegment =
+                inprogressLogSegment.completeLogSegment(
+                        pathForCompletedLedger,
+                        lastTxId,
+                        recordCount,
+                        lastEntryId,
+                        lastSlotId,
+                        startSequenceId);
+        setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
+
+        // prepare the transaction
+        Transaction<Object> txn = streamMetadataStore.newTransaction();
+
+        // create completed log segment
+        writeLogSegment(txn, completedLogSegment);
+        // delete inprogress log segment
+        deleteLogSegment(txn, inprogressLogSegment);
+        // store max sequence number
+        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
+        // update max txn id.
+        LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
+        storeMaxTxId(txn, maxTxId, lastTxId);
+
+        txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {
+                LOG.info("Completed {} to {} for {} : {}",
+                        new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
+                                getFullyQualifiedName(), completedLogSegment });
+                FutureUtils.setValue(promise, completedLogSegment);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+        }, scheduler));
+    }
+
+    public Future<Long> recoverIncompleteLogSegments() {
+        try {
+            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
+        } catch (IOException ioe) {
+            return Future.exception(ioe);
+        }
+        return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction);
+    }
+
+    class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
+
+        @Override
+        public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
+            if (!l.isInProgress()) {
+                return Future.value(l);
+            }
+
+            LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
+            return asyncReadLastRecord(l, true, true, true).flatMap(
+                    new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() {
+                        @Override
+                        public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) {
+                            return completeLogSegment(l, lastRecord);
+                        }
+                    });
+        }
+
+        private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
+                                                              LogRecordWithDLSN lastRecord) {
+            LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());
+
+            long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID;
+            int recordCount = 0;
+            long lastEntryId = -1;
+            long lastSlotId = -1;
+
+            if (null != lastRecord) {
+                endTxId = lastRecord.getTransactionId();
+                recordCount = lastRecord.getLastPositionWithinLogSegment();
+                lastEntryId = lastRecord.getDlsn().getEntryId();
+                lastSlotId = lastRecord.getDlsn().getSlotId();
+            }
+
+            if (endTxId == DistributedLogConstants.INVALID_TXID) {
+                LOG.error("Unrecoverable corruption has occurred in segment "
+                    + l.toString() + " at path " + l.getZkPath()
+                    + ". Unable to continue recovery.");
+                return Future.exception(new IOException("Unrecoverable corruption,"
+                    + " please check logs."));
+            } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
+                // TODO: Empty ledger - Ideally we should just remove it?
+                endTxId = l.getFirstTxId();
+            }
+
+            Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+            doCompleteAndCloseLogSegment(
+                    l.getZNodeName(),
+                    l.getLogSegmentSequenceNumber(),
+                    l.getLogSegmentId(),
+                    l.getFirstTxId(),
+                    endTxId,
+                    recordCount,
+                    lastEntryId,
+                    lastSlotId,
+                    promise);
+            return promise;
+        }
+
+    }
+
+    Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
+        if (DLSN.InvalidDLSN == dlsn) {
+            List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
+            return Future.value(emptyList);
+        }
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
+                new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
+                    @Override
+                    public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
+                        return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn);
+                    }
+                });
+    }
+
+    private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
+                                                                                  final DLSN dlsn) {
+        LOG.debug("Setting truncation status on logs older than {} from {} for {}",
+                new Object[]{dlsn, logSegments, getFullyQualifiedName()});
+        List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size());
+        LogSegmentMetadata partialTruncate = null;
+        LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn);
+        for (int i = 0; i < logSegments.size(); i++) {
+            LogSegmentMetadata l = logSegments.get(i);
+            if (!l.isInProgress()) {
+                if (l.getLastDLSN().compareTo(dlsn) < 0) {
+                    LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l);
+                    truncateList.add(l);
+                } else if (l.getFirstDLSN().compareTo(dlsn) < 0) {
+                    // Can be satisfied by at most one segment
+                    if (null != partialTruncate) {
+                        String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
+                        LOG.error(logMsg);
+                        return Future.exception(new DLIllegalStateException(logMsg));
+                    }
+                    LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
+                    partialTruncate = l;
+                } else {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn);
+    }
+
+    private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) {
+        if (logSegments.isEmpty()) {
+            return 0;
+        } else {
+            // we have to keep at least one completed log segment for sequence id
+            int numCandidateLogSegments = 0;
+            for (LogSegmentMetadata segment : logSegments) {
+                if (segment.isInProgress()) {
+                    break;
+                } else {
+                    ++numCandidateLogSegments;
+                }
+            }
+
+            return numCandidateLogSegments - 1;
+        }
+    }
+
+    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
+        if (minTimestampToKeep >= Utils.nowInMillis()) {
+            return Future.exception(new IllegalArgumentException(
+                    "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
+        }
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
+                new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
+            @Override
+            public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
+                List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
+
+                int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments);
+
+                for (int iterator = 0; iterator < numCandidates; iterator++) {
+                    LogSegmentMetadata l = logSegments.get(iterator);
+                    // When application explicitly truncates segments; timestamp based purge is
+                    // only used to cleanup log segments that have been marked for truncation
+                    if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
+                        !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
+                        purgeList.add(l);
+                    } else {
+                        // stop truncating log segments if we find either an inprogress or a partially
+                        // truncated log segment
+                        break;
+                    }
+                }
+                LOG.info("Deleting log segments older than {} for {} : {}",
+                        new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
+                return deleteLogSegments(purgeList);
+            }
+        });
+    }
+
+    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
+        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
+            new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
+                @Override
+                public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
+                    int numLogSegmentsToProcess;
+
+                    if (minTxIdToKeep < 0) {
+                        // we are deleting the log, we can remove whole log segments
+                        numLogSegmentsToProcess = logSegments.size();
+                    } else {
+                        numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
+                    }
+                    List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
+                    for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
+                        LogSegmentMetadata l = logSegments.get(iterator);
+                        if ((minTxIdToKeep < 0) ||
+                            ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
+                            !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
+                            purgeList.add(l);
+                        } else {
+                            // stop truncating log segments if we find either an inprogress or a partially
+                            // truncated log segment
+                            break;
+                        }
+                    }
+                    return deleteLogSegments(purgeList);
+                }
+            });
+    }
+
+    private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
+            final List<LogSegmentMetadata> truncateList,
+            LogSegmentMetadata partialTruncate,
+            DLSN minActiveDLSN) {
+        final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1);
+        final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1);
+        Transaction<Object> updateTxn = metadataUpdater.transaction();
+        for(LogSegmentMetadata l : truncateList) {
+            if (!l.isTruncated()) {
+                LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l);
+                listToTruncate.add(l);
+                listAfterTruncated.add(newSegment);
+            }
+        }
+
+        if (null != partialTruncate && (partialTruncate.isNonTruncated() ||
+                (partialTruncate.isPartiallyTruncated() && (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) {
+            LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated(
+                    updateTxn, partialTruncate, minActiveDLSN);
+            listToTruncate.add(partialTruncate);
+            listAfterTruncated.add(newSegment);
+        }
+
+        return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() {
+            @Override
+            public List<LogSegmentMetadata> apply(Void value) {
+                for (int i = 0; i < listToTruncate.size(); i++) {
+                    removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
+                    LogSegmentMetadata newSegment = listAfterTruncated.get(i);
+                    addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
+                }
+                return listAfterTruncated;
+            }
+        });
+    }
+
+    private Future<List<LogSegmentMetadata>> deleteLogSegments(
+            final List<LogSegmentMetadata> logs) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
+        }
+        return FutureUtils.processList(logs,
+                new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
+            @Override
+            public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
+                return deleteLogSegment(segment);
+            }
+        }, scheduler);
+    }
+
+    private Future<LogSegmentMetadata> deleteLogSegment(
+            final LogSegmentMetadata ledgerMetadata) {
+        LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
+        final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+            @Override
+            public void onSuccess(LogSegmentMetadata segment) {
+                deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+        });
+        entryStore.deleteLogSegment(ledgerMetadata)
+                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+            @Override
+            public void onFailure(Throwable cause) {
+                FutureUtils.setException(promise, cause);
+            }
+
+            @Override
+            public void onSuccess(LogSegmentMetadata segment) {
+                deleteLogSegmentMetadata(segment, promise);
+            }
+        });
+        return promise;
+    }
+
+    private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
+                                          final Promise<LogSegmentMetadata> promise) {
+        Transaction<Object> deleteTxn = metadataStore.transaction();
+        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
+            @Override
+            public void onCommit(Void r) {
+                // purge log segment
+                removeLogSegmentFromCache(segmentMetadata.getZNodeName());
+                promise.setValue(segmentMetadata);
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                if (t instanceof LogSegmentNotFoundException) {
+                    // purge log segment
+                    removeLogSegmentFromCache(segmentMetadata.getZNodeName());
+                    promise.setValue(segmentMetadata);
+                    return;
+                } else {
+                    LOG.error("Couldn't purge {} for {}: with error {}",
+                            new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
+                    promise.setException(t);
+                }
+            }
+        });
+        deleteTxn.execute();
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return Utils.closeSequence(scheduler,
+                lock,
+                logSegmentAllocator);
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        return asyncClose();
+    }
+
+    String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) {
+        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
+            return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
+        } else {
+            return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX,
+                    firstTxId, lastTxId);
+        }
+    }
+
+    /**
+     * Get the znode path for a finalize ledger
+     */
+    String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) {
+        return String.format("%s/%s", logMetadata.getLogSegmentsPath(),
+                completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo));
+    }
+
+    /**
+     * Get the name of the inprogress znode.
+     *
+     * @return name of the inprogress znode.
+     */
+    String inprogressZNodeName(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
+        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
+            // Lots of the problems are introduced due to different inprogress names with same ledger sequence number.
+            return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
+        } else {
+            return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16);
+        }
+    }
+
+    /**
+     * Get the znode path for the inprogressZNode
+     */
+    String inprogressZNode(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
+        return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo);
+    }
+
+    String inprogressZNode(String inprogressZNodeName) {
+        return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
new file mode 100644
index 0000000..bf89823
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.IdleReaderException;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Synchronous Log Reader based on {@link AsyncLogReader}
+ */
+class BKSyncLogReader implements LogReader, AsyncNotification {
+
+    private final BKDistributedLogManager bkdlm;
+    private final BKLogReadHandler readHandler;
+    private final AtomicReference<IOException> readerException =
+            new AtomicReference<IOException>(null);
+    private final int maxReadAheadWaitTime;
+    private Promise<Void> closeFuture;
+    private final Optional<Long> startTransactionId;
+    private boolean positioned = false;
+    private Entry.Reader currentEntry = null;
+
+    // readahead reader
+    ReadAheadEntryReader readAheadReader = null;
+
+    // idle reader settings
+    private final boolean shouldCheckIdleReader;
+    private final int idleErrorThresholdMillis;
+
+    // Stats
+    private final Counter idleReaderError;
+
+    BKSyncLogReader(DistributedLogConfiguration conf,
+                    BKDistributedLogManager bkdlm,
+                    DLSN startDLSN,
+                    Optional<Long> startTransactionId,
+                    StatsLogger statsLogger) throws IOException {
+        this.bkdlm = bkdlm;
+        this.readHandler = bkdlm.createReadHandler(
+                Optional.<String>absent(),
+                this,
+                true);
+        this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
+        this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
+        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
+        this.startTransactionId = startTransactionId;
+
+        // start readahead
+        startReadAhead(startDLSN);
+        if (!startTransactionId.isPresent()) {
+            positioned = true;
+        }
+
+        // Stats
+        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
+        idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
+    }
+
+    private void startReadAhead(DLSN startDLSN) throws IOException {
+        readAheadReader = new ReadAheadEntryReader(
+                    bkdlm.getStreamName(),
+                    startDLSN,
+                    bkdlm.getConf(),
+                    readHandler,
+                    bkdlm.getReaderEntryStore(),
+                    bkdlm.getScheduler(),
+                    Ticker.systemTicker(),
+                    bkdlm.alertStatsLogger);
+        readHandler.registerListener(readAheadReader);
+        readHandler.asyncStartFetchLogSegments()
+                .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                        readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
+                        readAheadReader.start(logSegments.getValue());
+                        return BoxedUnit.UNIT;
+                    }
+                });
+    }
+
+    @VisibleForTesting
+    ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
+    @VisibleForTesting
+    BKLogReadHandler getReadHandler() {
+        return readHandler;
+    }
+
+    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
+        Entry.Reader entry = null;
+        if (nonBlocking) {
+            return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
+        } else {
+            while (!readAheadReader.isReadAheadCaughtUp()
+                    && null == readerException.get()
+                    && null == entry) {
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+            }
+            if (null != entry) {
+                return entry;
+            }
+            // reader is caught up
+            if (readAheadReader.isReadAheadCaughtUp()
+                    && null == readerException.get()) {
+                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
+            }
+            return entry;
+        }
+    }
+
+    private void markReaderAsIdle() throws IdleReaderException {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Sync reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for more than " + idleErrorThresholdMillis + " ms");
+        readerException.compareAndSet(null, ire);
+        throw ire;
+    }
+
+    @Override
+    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
+            throws IOException {
+        if (null != readerException.get()) {
+            throw readerException.get();
+        }
+        LogRecordWithDLSN record = doReadNext(nonBlocking);
+        // no record is returned, check if the reader becomes idle
+        if (null == record && shouldCheckIdleReader) {
+            if (readAheadReader.getNumCachedEntries() <= 0 &&
+                    readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
+                markReaderAsIdle();
+            }
+        }
+        return record;
+    }
+
+    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
+        LogRecordWithDLSN record = null;
+
+        do {
+            // fetch one record until we don't find any entry available in the readahead cache
+            while (null == record) {
+                if (null == currentEntry) {
+                    currentEntry = readNextEntry(nonBlocking);
+                    if (null == currentEntry) {
+                        return null;
+                    }
+                }
+                record = currentEntry.nextRecord();
+                if (null == record) {
+                    currentEntry = null;
+                }
+            }
+
+            // check if we reached the end of stream
+            if (record.isEndOfStream()) {
+                EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
+                        + readHandler.getFullyQualifiedName());
+                readerException.compareAndSet(null, eos);
+                throw eos;
+            }
+            // skip control records
+            if (record.isControl()) {
+                record = null;
+                continue;
+            }
+            if (!positioned) {
+                if (record.getTransactionId() < startTransactionId.get()) {
+                    record = null;
+                    continue;
+                } else {
+                    positioned = true;
+                    break;
+                }
+            } else {
+                break;
+            }
+        } while (true);
+        return record;
+    }
+
+    @Override
+    public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
+            throws IOException {
+        LinkedList<LogRecordWithDLSN> retList =
+                new LinkedList<LogRecordWithDLSN>();
+
+        int numRead = 0;
+        LogRecordWithDLSN record = readNext(nonBlocking);
+        while ((null != record)) {
+            retList.add(record);
+            numRead++;
+            if (numRead >= numLogRecords) {
+                break;
+            }
+            record = readNext(nonBlocking);
+        }
+        return retList;
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        Promise<Void> closePromise;
+        synchronized (this) {
+            if (null != closeFuture) {
+                return closeFuture;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        readHandler.unregisterListener(readAheadReader);
+        readAheadReader.removeStateChangeNotification(this);
+        Utils.closeSequence(bkdlm.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
+        return closePromise;
+    }
+
+    @Override
+    public void close() throws IOException {
+        FutureUtils.result(asyncClose());
+    }
+
+    //
+    // Notification From ReadHandler
+    //
+
+    @Override
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            readerException.compareAndSet(null, (IOException) cause);
+        } else {
+            readerException.compareAndSet(null, new IOException(cause));
+        }
+    }
+
+    @Override
+    public void notifyOnOperationComplete() {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
new file mode 100644
index 0000000..7d33d12
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.util.FutureUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
+
+    public BKSyncLogWriter(DistributedLogConfiguration conf,
+                           DynamicDistributedLogConfiguration dynConf,
+                           BKDistributedLogManager bkdlm) {
+        super(conf, dynConf, bkdlm);
+    }
+    /**
+     * Write log records to the stream.
+     *
+     * @param record operation
+     */
+    @Override
+    public void write(LogRecord record) throws IOException {
+        getLedgerWriter(record.getTransactionId(), false).write(record);
+    }
+
+    /**
+     * Write edits logs operation to the stream.
+     *
+     * @param records list of records
+     */
+    @Override
+    @Deprecated
+    public int writeBulk(List<LogRecord> records) throws IOException {
+        return getLedgerWriter(records.get(0).getTransactionId(), false).writeBulk(records);
+    }
+
+    /**
+     * Flushes all the data up to this point,
+     * adds the end of stream marker and marks the stream
+     * as read-only in the metadata. No appends to the
+     * stream will be allowed after this point
+     */
+    @Override
+    public void markEndOfStream() throws IOException {
+        FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream());
+        closeAndComplete();
+    }
+
+    /**
+     * All data that has been written to the stream so far will be flushed.
+     * New data can be still written to the stream while flush is ongoing.
+     */
+    @Override
+    public long setReadyToFlush() throws IOException {
+        checkClosedOrInError("setReadyToFlush");
+        long highestTransactionId = 0;
+        BKLogSegmentWriter writer = getCachedLogWriter();
+        if (null != writer) {
+            highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush()));
+        }
+        return highestTransactionId;
+    }
+
+    /**
+     * Commit data that is already flushed.
+     * <p/>
+     * This API is optional as the writer implements a policy for automatically syncing
+     * the log records in the buffer. The buffered edits can be flushed when the buffer
+     * becomes full or a certain period of time is elapsed.
+     */
+    @Override
+    public long flushAndSync() throws IOException {
+        checkClosedOrInError("flushAndSync");
+
+        LOG.debug("FlushAndSync Started");
+        long highestTransactionId = 0;
+        BKLogSegmentWriter writer = getCachedLogWriter();
+        if (null != writer) {
+            highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit()));
+            LOG.debug("FlushAndSync Completed");
+        } else {
+            LOG.debug("FlushAndSync Completed - Nothing to Flush");
+        }
+        return highestTransactionId;
+    }
+
+    /**
+     * Close the stream without necessarily flushing immediately.
+     * This may be called if the stream is in error such as after a
+     * previous write or close threw an exception.
+     */
+    @Override
+    public void abort() throws IOException {
+        super.abort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
new file mode 100644
index 0000000..6ed662b
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+
+import java.util.concurrent.TimeUnit;
+
+class BKTransmitPacket {
+
+    private final EntryBuffer recordSet;
+    private final long transmitTime;
+    private final Promise<Integer> transmitComplete;
+
+    BKTransmitPacket(EntryBuffer recordSet) {
+        this.recordSet = recordSet;
+        this.transmitTime = System.nanoTime();
+        this.transmitComplete = new Promise<Integer>();
+    }
+
+    EntryBuffer getRecordSet() {
+        return recordSet;
+    }
+
+    Promise<Integer> getTransmitFuture() {
+        return transmitComplete;
+    }
+
+    /**
+     * Complete the transmit with result code <code>transmitRc</code>.
+     * <p>It would notify all the waiters that are waiting via {@link #awaitTransmitComplete(long, TimeUnit)}
+     * or {@link #addTransmitCompleteListener(FutureEventListener)}.
+     *
+     * @param transmitResult
+     *          transmit result code.
+     */
+    public void notifyTransmitComplete(int transmitResult) {
+        transmitComplete.setValue(transmitResult);
+    }
+
+    /**
+     * Register a transmit complete listener.
+     * <p>The listener will be triggered with transmit result when transmit completes.
+     * The method should be non-blocking.
+     *
+     * @param transmitCompleteListener
+     *          listener on transmit completion
+     * @see #awaitTransmitComplete(long, TimeUnit)
+     */
+    void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) {
+        transmitComplete.addEventListener(transmitCompleteListener);
+    }
+
+    /**
+     * Await for the transmit to be complete
+     *
+     * @param timeout
+     *          wait timeout
+     * @param unit
+     *          wait timeout unit
+     */
+    int awaitTransmitComplete(long timeout, TimeUnit unit)
+        throws Exception {
+        return Await.result(transmitComplete,
+                Duration.fromTimeUnit(timeout, unit));
+    }
+
+    public long getTransmitTime() {
+        return transmitTime;
+    }
+
+}


Mime
View raw message