distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [15/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future
Date Wed, 21 Jun 2017 17:20:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java
deleted file mode 100644
index 8a4a30b..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.Abortable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/*
-* A generic interface class to support writing log records into
-* a persistent distributed log.
-*/
-public interface LogWriter extends Closeable, Abortable {
-    /**
-     * Write a log record to the stream.
-     *
-     * @param record single log record
-     * @throws IOException
-     */
-    public void write(LogRecord record) throws IOException;
-
-
-    /**
-     * Write a list of log records to the stream.
-     *
-     * @param records list of log records
-     * @throws IOException
-     */
-    @Deprecated
-    public int writeBulk(List<LogRecord> records) throws IOException;
-
-    /**
-     * All data that has been written to the stream so far will be sent to
-     * persistent storage.
-     * The transmission is asynchronous and new data can be still written to the
-     * stream while flushing is performed.
-     *
-     * TODO: rename this to flush()
-     */
-    public long setReadyToFlush() throws IOException;
-
-    /**
-     * Flush and sync all data that is ready to be flush
-     * {@link #setReadyToFlush()} into underlying persistent store.
-     * @throws IOException
-     *
-     * TODO: rename this to commit()
-     */
-    public long flushAndSync() throws IOException;
-
-    /**
-     * Flushes all the data up to this point,
-     * adds the end of stream marker and marks the stream
-     * as read-only in the metadata. No appends to the
-     * stream will be allowed after this point
-     *
-     * @throws IOException
-     */
-    public void markEndOfStream() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java
deleted file mode 100644
index 3d1d601..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface MetadataAccessor extends Closeable, AsyncCloseable {
-    /**
-     * Get the name of the stream managed by this log manager
-     * @return streamName
-     */
-    public String getStreamName();
-
-    public void createOrUpdateMetadata(byte[] metadata) throws IOException;
-
-    public void deleteMetadata() throws IOException;
-
-    public byte[] getMetadata() throws IOException;
-
-    /**
-     * Close the distributed log metadata, freeing any resources it may hold.
-     */
-    public void close() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index f94a6e0..386a9a1 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -21,6 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Ticker;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Function;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.DLIllegalStateException;
@@ -32,19 +35,13 @@ import org.apache.distributedlog.io.AsyncCloseable;
 import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -52,7 +49,6 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -75,12 +71,9 @@ class ReadAheadEntryReader implements
     // Static Functions
     //
 
-    private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
-        @Override
-        public BoxedUnit apply(LogSegmentEntryReader reader) {
-            reader.start();
-            return BoxedUnit.UNIT;
-        }
+    private static Function<LogSegmentEntryReader, Void> START_READER_FUNC = reader -> {
+        reader.start();
+        return null;
     };
 
     //
@@ -91,7 +84,7 @@ class ReadAheadEntryReader implements
 
         private LogSegmentMetadata metadata;
         private final long startEntryId;
-        private Future<LogSegmentEntryReader> openFuture = null;
+        private CompletableFuture<LogSegmentEntryReader> openFuture = null;
         private LogSegmentEntryReader reader = null;
         private boolean isStarted = false;
         private boolean isClosed = false;
@@ -122,7 +115,7 @@ class ReadAheadEntryReader implements
             if (null != openFuture) {
                 return;
             }
-            openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this);
+            openFuture = entryStore.openReader(metadata, startEntryId).whenComplete(this);
         }
 
         synchronized boolean isReaderStarted() {
@@ -137,16 +130,16 @@ class ReadAheadEntryReader implements
             if (null != reader) {
                 reader.start();
             } else {
-                openFuture.onSuccess(START_READER_FUNC);
+                openFuture.thenApply(START_READER_FUNC);
             }
         }
 
-        synchronized Future<List<Entry.Reader>> readNext() {
+        synchronized CompletableFuture<List<Entry.Reader>> readNext() {
             if (null != reader) {
                 checkCatchingUpStatus(reader);
                 return reader.readNext(numReadAheadEntries);
             } else {
-                return openFuture.flatMap(readFunc);
+                return openFuture.thenCompose(readFunc);
             }
         }
 
@@ -155,14 +148,10 @@ class ReadAheadEntryReader implements
                 reader.onLogSegmentMetadataUpdated(segment);
                 this.metadata = segment;
             } else {
-                openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(LogSegmentEntryReader reader) {
-                        reader.onLogSegmentMetadataUpdated(segment);
-                        synchronized (SegmentReader.this) {
-                            SegmentReader.this.metadata = segment;
-                        }
-                        return BoxedUnit.UNIT;
+                openFuture.thenAccept(reader1 -> {
+                    reader1.onLogSegmentMetadataUpdated(segment);
+                    synchronized (SegmentReader.this) {
+                        SegmentReader.this.metadata = segment;
                     }
                 });
             }
@@ -185,28 +174,21 @@ class ReadAheadEntryReader implements
             return isClosed;
         }
 
-        synchronized Future<Void> close() {
+        synchronized CompletableFuture<Void> close() {
             if (null == openFuture) {
-                return Future.Void();
+                return FutureUtils.Void();
             }
-            return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() {
-                @Override
-                public Future<Void> apply(LogSegmentEntryReader reader) {
-                    return reader.asyncClose();
-                }
-            }).ensure(new Function0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
+            return FutureUtils.ensure(
+                openFuture.thenCompose(reader1 -> reader1.asyncClose()),
+                () -> {
                     synchronized (SegmentReader.this) {
                         isClosed = true;
                     }
-                    return null;
-                }
-            });
+                });
         }
     }
 
-    private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> {
+    private class ReadEntriesFunc implements Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> {
 
         private final int numEntries;
 
@@ -215,7 +197,7 @@ class ReadAheadEntryReader implements
         }
 
         @Override
-        public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
+        public CompletableFuture<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
             checkCatchingUpStatus(reader);
             return reader.readNext(numEntries);
         }
@@ -244,14 +226,8 @@ class ReadAheadEntryReader implements
     //
     // Functions
     //
-    private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc;
-    private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() {
-        @Override
-        public BoxedUnit apply() {
-            removeClosedSegmentReaders();
-            return BoxedUnit.UNIT;
-        }
-    };
+    private final Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> readFunc;
+    private final Runnable removeClosedSegmentReadersFunc = () -> removeClosedSegmentReaders();
 
     //
     // Resources
@@ -282,7 +258,7 @@ class ReadAheadEntryReader implements
     private final AtomicBoolean started = new AtomicBoolean(false);
     private boolean isInitialized = false;
     private boolean readAheadPaused = false;
-    private Promise<Void> closePromise = null;
+    private CompletableFuture<Void> closePromise = null;
     // segment readers
     private long currentSegmentSequenceNumber;
     private SegmentReader currentSegmentReader;
@@ -344,15 +320,12 @@ class ReadAheadEntryReader implements
 
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) {
-            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
-                @Override
-                public void run() {
-                    if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
-                        return;
-                    }
-                    // the readahead has been idle
-                    unsafeCheckIfReadAheadIsIdle();
+            return scheduler.scheduleAtFixedRate(streamName, () -> {
+                if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
+                    return;
                 }
+                // the readahead has been idle
+                unsafeCheckIfReadAheadIsIdle();
             }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
         }
         return null;
@@ -366,7 +339,7 @@ class ReadAheadEntryReader implements
                     LogSegmentMetadata.COMPARATOR,
                     LogSegmentFilter.DEFAULT_FILTER,
                     null
-            ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+            ).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
                 @Override
                 public void onFailure(Throwable cause) {
                     // do nothing here since it would be retried on next idle reader check task
@@ -459,13 +432,13 @@ class ReadAheadEntryReader implements
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncClose() {
+        final CompletableFuture<Void> closeFuture;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closePromise = closeFuture = new Promise<Void>();
+            closePromise = closeFuture = new CompletableFuture<Void>();
         }
 
         // cancel the idle reader task
@@ -489,8 +462,8 @@ class ReadAheadEntryReader implements
         return closeFuture;
     }
 
-    private void unsafeAsyncClose(Promise<Void> closePromise) {
-        List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
+    private void unsafeAsyncClose(CompletableFuture<Void> closePromise) {
+        List<CompletableFuture<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
                 segmentReaders.size() + segmentReadersToClose.size() + 1);
         if (null != currentSegmentReader) {
             segmentReadersToClose.add(currentSegmentReader);
@@ -505,7 +478,9 @@ class ReadAheadEntryReader implements
         for (SegmentReader reader : segmentReadersToClose) {
             closeFutures.add(reader.close());
         }
-        Futures.collect(closeFutures).proxyTo(closePromise);
+        FutureUtils.proxyTo(
+            FutureUtils.collect(closeFutures).thenApply((value) -> null),
+            closePromise);
     }
 
     //
@@ -921,7 +896,9 @@ class ReadAheadEntryReader implements
     private void unsafeMoveToNextLogSegment() {
         if (null != currentSegmentReader) {
             segmentReadersToClose.add(currentSegmentReader);
-            currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc);
+            FutureUtils.ensure(
+                currentSegmentReader.close(),
+                removeClosedSegmentReadersFunc);
             logger.debug("close current segment reader {}", currentSegmentReader.getSegment());
             currentSegmentReader = null;
         }
@@ -971,7 +948,7 @@ class ReadAheadEntryReader implements
     }
 
     private void unsafeReadNext(SegmentReader reader) {
-        reader.readNext().addEventListener(this);
+        reader.readNext().whenComplete(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
index 9935d5f..bf4e140 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,14 +32,10 @@ import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector;
 import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector;
 import org.apache.distributedlog.selector.LastRecordSelector;
 import org.apache.distributedlog.selector.LogRecordSelector;
-import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
 
 /**
  * Utility function for readers
@@ -78,7 +75,7 @@ public class ReadUtils {
      *          log segment entry store
      * @return a future with last record.
      */
-    public static Future<LogRecordWithDLSN> asyncReadLastRecord(
+    public static CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord(
             final String streamName,
             final LogSegmentMetadata l,
             final boolean fence,
@@ -116,7 +113,7 @@ public class ReadUtils {
      *          threshold dlsn
      * @return a future with last record.
      */
-    public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord(
+    public static CompletableFuture<LogRecordWithDLSN> asyncReadFirstUserRecord(
             final String streamName,
             final LogSegmentMetadata l,
             final int scanStartBatchSize,
@@ -243,14 +240,14 @@ public class ReadUtils {
      *          scan context
      * @return a future with the log record.
      */
-    private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries(
+    private static CompletableFuture<LogRecordWithDLSN> asyncReadRecordFromEntries(
             final String streamName,
             final LogSegmentRandomAccessEntryReader reader,
             final LogSegmentMetadata metadata,
             final ExecutorService executorService,
             final ScanContext context,
             final LogRecordSelector selector) {
-        final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+        final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>();
         final long startEntryId = context.curStartEntryId.get();
         final long endEntryId = context.curEndEntryId.get();
         if (LOG.isDebugEnabled()) {
@@ -271,7 +268,7 @@ public class ReadUtils {
                         } catch (IOException ioe) {
                             // exception is only thrown due to bad ledger entry, so it might be corrupted
                             // we shouldn't do anything beyond this point. throw the exception to application
-                            promise.setException(ioe);
+                            promise.completeExceptionally(ioe);
                             return;
                         }
                     }
@@ -282,16 +279,16 @@ public class ReadUtils {
                                 new Object[]{streamName, startEntryId, endEntryId,
                                         metadata, record});
                     }
-                    promise.setValue(record);
+                    promise.complete(record);
                 }
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
+                    promise.completeExceptionally(cause);
                 }
             };
         reader.readEntries(startEntryId, endEntryId)
-                .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+                .whenCompleteAsync(readEntriesListener, executorService);
         return promise;
     }
 
@@ -343,7 +340,7 @@ public class ReadUtils {
             final LogSegmentRandomAccessEntryReader reader,
             final LogSegmentMetadata metadata,
             final ExecutorService executorService,
-            final Promise<LogRecordWithDLSN> promise,
+            final CompletableFuture<LogRecordWithDLSN> promise,
             final ScanContext context,
             final LogRecordSelector selector) {
         FutureEventListener<LogRecordWithDLSN> readEntriesListener =
@@ -356,12 +353,12 @@ public class ReadUtils {
                                         metadata, value});
                     }
                     if (null != value) {
-                        promise.setValue(value);
+                        promise.complete(value);
                         return;
                     }
                     if (!context.moveToNextRange()) {
                         // no entries to read again
-                        promise.setValue(null);
+                        promise.complete(null);
                         return;
                     }
                     // scan next range
@@ -376,11 +373,11 @@ public class ReadUtils {
 
                 @Override
                 public void onFailure(Throwable cause) {
-                    promise.setException(cause);
+                    promise.completeExceptionally(cause);
                 }
             };
         asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector)
-                .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService));
+                .whenCompleteAsync(readEntriesListener, executorService);
     }
 
     private static void asyncReadRecordFromLogSegment(
@@ -392,7 +389,7 @@ public class ReadUtils {
             final int scanMaxBatchSize,
             final boolean includeControl,
             final boolean includeEndOfStream,
-            final Promise<LogRecordWithDLSN> promise,
+            final CompletableFuture<LogRecordWithDLSN> promise,
             final AtomicInteger numRecordsScanned,
             final LogRecordSelector selector,
             final boolean backward,
@@ -402,7 +399,7 @@ public class ReadUtils {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName });
             }
-            promise.setValue(null);
+            promise.complete(null);
             return;
         }
         final ScanContext context = new ScanContext(
@@ -413,7 +410,7 @@ public class ReadUtils {
                                    promise, context, selector);
     }
 
-    private static Future<LogRecordWithDLSN> asyncReadRecord(
+    private static CompletableFuture<LogRecordWithDLSN> asyncReadRecord(
             final String streamName,
             final LogSegmentMetadata l,
             final boolean fence,
@@ -428,7 +425,7 @@ public class ReadUtils {
             final boolean backward,
             final long startEntryId) {
 
-        final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
+        final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>();
 
         FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
             new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
@@ -438,13 +435,7 @@ public class ReadUtils {
                         LOG.debug("{} Opened log segment {} for reading record",
                                 streamName, l);
                     }
-                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply() {
-                            reader.asyncClose();
-                            return BoxedUnit.UNIT;
-                        }
-                    });
+                    promise.whenComplete((value, cause) -> reader.asyncClose());
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("{} {} scanning {}.", new Object[]{
                                 (backward ? "backward" : "forward"), streamName, l});
@@ -458,11 +449,11 @@ public class ReadUtils {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
+                    promise.completeExceptionally(cause);
                 }
             };
         entryStore.openRandomAccessReader(l, fence)
-                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+                .whenCompleteAsync(openReaderListener, executorService);
         return promise;
     }
 
@@ -499,7 +490,7 @@ public class ReadUtils {
      *          how many number of entries to search in parallel
      * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>.
      */
-    public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
+    public static CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
             final String logName,
             final LogSegmentMetadata segment,
             final long transactionId,
@@ -511,30 +502,23 @@ public class ReadUtils {
                 // all log records whose transaction id is less than provided transactionId
                 // then return none
                 Optional<LogRecordWithDLSN> noneRecord = Optional.absent();
-                return Future.value(noneRecord);
+                return FutureUtils.value(noneRecord);
             }
         }
 
-        final Promise<Optional<LogRecordWithDLSN>> promise =
-                new Promise<Optional<LogRecordWithDLSN>>();
+        final CompletableFuture<Optional<LogRecordWithDLSN>> promise =
+                new CompletableFuture<Optional<LogRecordWithDLSN>>();
         final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener =
             new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
                 @Override
                 public void onSuccess(final LogSegmentRandomAccessEntryReader reader) {
-                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply() {
-                            reader.asyncClose();
-                            return BoxedUnit.UNIT;
-                        }
-
-                    });
+                    promise.whenComplete((value, cause) -> reader.asyncClose());
                     long lastEntryId = reader.getLastAddConfirmed();
                     if (lastEntryId < 0) {
                         // it means that the log segment is created but not written yet or an empty log segment.
                         // it is equivalent to 'all log records whose transaction id is less than provided transactionId'
                         Optional<LogRecordWithDLSN> nonRecord = Optional.absent();
-                        promise.setValue(nonRecord);
+                        promise.complete(nonRecord);
                         return;
                     }
                     // all log records whose transaction id is not less than provided transactionId
@@ -548,15 +532,15 @@ public class ReadUtils {
                                 executorService,
                                 new SingleEntryScanContext(0L),
                                 selector
-                        ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+                        ).whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
                             @Override
                             public void onSuccess(LogRecordWithDLSN value) {
-                                promise.setValue(Optional.of(selector.result()));
+                                promise.complete(Optional.of(selector.result()));
                             }
 
                             @Override
                             public void onFailure(Throwable cause) {
-                                promise.setException(cause);
+                                promise.completeExceptionally(cause);
                             }
                         });
 
@@ -576,12 +560,12 @@ public class ReadUtils {
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
+                    promise.completeExceptionally(cause);
                 }
             };
 
         entryStore.openRandomAccessReader(segment, false)
-                .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService));
+                .whenCompleteAsync(openReaderListener, executorService);
         return promise;
     }
 
@@ -617,12 +601,12 @@ public class ReadUtils {
             final List<Long> entriesToSearch,
             final int nWays,
             final Optional<LogRecordWithDLSN> prevFoundRecord,
-            final Promise<Optional<LogRecordWithDLSN>> promise) {
-        final List<Future<LogRecordWithDLSN>> searchResults =
+            final CompletableFuture<Optional<LogRecordWithDLSN>> promise) {
+        final List<CompletableFuture<LogRecordWithDLSN>> searchResults =
                 Lists.newArrayListWithExpectedSize(entriesToSearch.size());
         for (Long entryId : entriesToSearch) {
             LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
-            Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries(
+            CompletableFuture<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries(
                     logName,
                     reader,
                     segment,
@@ -649,11 +633,11 @@ public class ReadUtils {
 
                     @Override
                     public void onFailure(Throwable cause) {
-                        promise.setException(cause);
+                        promise.completeExceptionally(cause);
                     }
                 };
-        Future.collect(searchResults).addEventListener(
-                FutureEventListenerRunnable.of(processSearchResultsListener, executorService));
+        FutureUtils.collect(searchResults).whenCompleteAsync(
+                processSearchResultsListener, executorService);
     }
 
     /**
@@ -668,7 +652,7 @@ public class ReadUtils {
             final List<LogRecordWithDLSN> searchResults,
             final int nWays,
             final Optional<LogRecordWithDLSN> prevFoundRecord,
-            final Promise<Optional<LogRecordWithDLSN>> promise) {
+            final CompletableFuture<Optional<LogRecordWithDLSN>> promise) {
         int found = -1;
         for (int i = 0; i < searchResults.size(); i++) {
             LogRecordWithDLSN record = searchResults.get(i);
@@ -678,7 +662,7 @@ public class ReadUtils {
             }
         }
         if (found == -1) { // all log records' transaction id is less than provided transaction id
-            promise.setValue(prevFoundRecord);
+            promise.complete(prevFoundRecord);
             return;
         }
         // we found a log record
@@ -691,7 +675,7 @@ public class ReadUtils {
         if (foundRecord.getDlsn().getSlotId() != 0L
                 || found == 0
                 || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) {
-            promise.setValue(Optional.of(foundRecord));
+            promise.complete(Optional.of(foundRecord));
             return;
         }
 
@@ -702,7 +686,7 @@ public class ReadUtils {
                 searchResults.get(found),
                 nWays);
         if (nextSearchBatch.isEmpty()) {
-            promise.setValue(prevFoundRecord);
+            promise.complete(prevFoundRecord);
             return;
         }
         getLogRecordNotLessThanTxIdFromEntries(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
index a2109f4..04bb9e4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog;
 
 import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitLimiter;
 
 class WriteLimiter {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
index 4e94984..c4939c0 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
@@ -19,12 +19,19 @@ package org.apache.distributedlog.admin;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogManager;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.ReadUtils;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.impl.acl.ZKAccessControl;
 import org.apache.distributedlog.exceptions.DLIllegalStateException;
@@ -35,21 +42,12 @@ import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import org.apache.distributedlog.metadata.MetadataUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.thrift.AccessControlEntry;
 import org.apache.distributedlog.tools.DistributedLogTool;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,11 +93,11 @@ public class DistributedLogAdmin extends DistributedLogTool {
      *          is confirmation needed before executing actual action.
      * @throws IOException
      */
-    public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
+    public static void fixInprogressSegmentWithLowerSequenceNumber(final Namespace namespace,
                                                                    final MetadataUpdater metadataUpdater,
                                                                    final String streamName,
                                                                    final boolean verbose,
-                                                                   final boolean interactive) throws IOException {
+                                                                   final boolean interactive) throws Exception {
         DistributedLogManager dlm = namespace.openLog(streamName);
         try {
             List<LogSegmentMetadata> segments = dlm.getLogSegments();
@@ -193,21 +191,21 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final DistributedLogNamespace namespace,
+                                                 final Namespace namespace,
                                                  final MetadataUpdater metadataUpdater,
                                                  final OrderedScheduler scheduler,
                                                  final boolean verbose,
-                                                 final boolean interactive) throws IOException {
+                                                 final boolean interactive) throws Exception {
         checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
     }
 
     public static void checkAndRepairDLNamespace(final URI uri,
-                                                 final DistributedLogNamespace namespace,
+                                                 final Namespace namespace,
                                                  final MetadataUpdater metadataUpdater,
                                                  final OrderedScheduler scheduler,
                                                  final boolean verbose,
                                                  final boolean interactive,
-                                                 final int concurrency) throws IOException {
+                                                 final int concurrency) throws Exception {
         Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
         // 0. getting streams under a given uri.
         Iterator<String> streamsIter = namespace.getLogs();
@@ -247,7 +245,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     private static Map<String, StreamCandidate> checkStreams(
-            final DistributedLogNamespace namespace,
+            final Namespace namespace,
             final Collection<String> streams,
             final OrderedScheduler scheduler,
             final int concurrency) throws IOException {
@@ -274,7 +272,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
                         LOG.info("Checking stream {}.", stream);
                         candidate = checkStream(namespace, stream, scheduler);
                         LOG.info("Checked stream {} - {}.", stream, candidate);
-                    } catch (IOException e) {
+                    } catch (Throwable e) {
                         LOG.error("Error on checking stream {} : ", stream, e);
                         doneLatch.countDown();
                         break;
@@ -313,7 +311,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
     }
 
     private static StreamCandidate checkStream(
-            final DistributedLogNamespace namespace,
+            final Namespace namespace,
             final String streamName,
             final OrderedScheduler scheduler) throws IOException {
         DistributedLogManager dlm = namespace.openLog(streamName);
@@ -322,14 +320,14 @@ public class DistributedLogAdmin extends DistributedLogTool {
             if (segments.isEmpty()) {
                 return null;
             }
-            List<Future<LogSegmentCandidate>> futures =
-                    new ArrayList<Future<LogSegmentCandidate>>(segments.size());
+            List<CompletableFuture<LogSegmentCandidate>> futures =
+                    new ArrayList<CompletableFuture<LogSegmentCandidate>>(segments.size());
             for (LogSegmentMetadata segment : segments) {
                 futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
             }
             List<LogSegmentCandidate> segmentCandidates;
             try {
-                segmentCandidates = Await.result(Future.collect(futures));
+                segmentCandidates = FutureUtils.result(FutureUtils.collect(futures));
             } catch (Exception e) {
                 throw new IOException("Failed on checking stream " + streamName, e);
             }
@@ -348,13 +346,13 @@ public class DistributedLogAdmin extends DistributedLogTool {
         }
     }
 
-    private static Future<LogSegmentCandidate> checkLogSegment(
-            final DistributedLogNamespace namespace,
+    private static CompletableFuture<LogSegmentCandidate> checkLogSegment(
+            final Namespace namespace,
             final String streamName,
             final LogSegmentMetadata metadata,
             final OrderedScheduler scheduler) {
         if (metadata.isInProgress()) {
-            return Future.value(null);
+            return FutureUtils.value(null);
         }
 
         final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
@@ -370,7 +368,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
                 new AtomicInteger(0),
                 scheduler,
                 entryStore
-        ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
+        ).thenApply(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
             @Override
             public LogSegmentCandidate apply(LogRecordWithDLSN record) {
                 if (null != record &&
@@ -388,7 +386,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
     private static boolean repairStream(MetadataUpdater metadataUpdater,
                                         StreamCandidate streamCandidate,
                                         boolean verbose,
-                                        boolean interactive) throws IOException {
+                                        boolean interactive) throws Exception {
         if (verbose) {
             System.out.println("Stream " + streamCandidate.streamName + " : ");
             for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
@@ -863,7 +861,7 @@ public class DistributedLogAdmin extends DistributedLogTool {
         protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
             ZKAccessControl accessControl;
             try {
-                accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+                accessControl = FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null));
             } catch (KeeperException.NoNodeException nne) {
                 accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java
new file mode 100644
index 0000000..3838bf7
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.io.AsyncCloseable;
+
+public interface AsyncLogReader extends AsyncCloseable {
+
+    /**
+     * Get stream name that the reader reads from.
+     *
+     * @return stream name.
+     */
+    public String getStreamName();
+
+    /**
+     * Read the next record from the log stream
+     *
+     * @return A promise that when satisfied will contain the Log Record with its DLSN.
+     */
+    public CompletableFuture<LogRecordWithDLSN> readNext();
+
+    /**
+     * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
+     * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
+     * call.
+     *
+     * @param numEntries
+     *          num entries
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries);
+
+    /**
+     * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
+     * <p>
+     * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
+     * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
+     * wait until new entries are available.
+     *
+     * @param numEntries
+     *          max entries to return
+     * @param waitTime
+     *          maximum wait time if there are entries already for read
+     * @param timeUnit
+     *          wait time unit
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java
new file mode 100644
index 0000000..9e12de2
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+
+public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
+
+    /**
+     * Get the last committed transaction id.
+     *
+     * @return last committed transaction id.
+     */
+    public long getLastTxId();
+
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     * @return A Future which contains a DLSN if the record was successfully written
+     * or an exception if the write fails
+     */
+    public CompletableFuture<DLSN> write(LogRecord record);
+
+    /**
+     * Write log records to the stream in bulk. Each future in the list represents the result of
+     * one write operation. The size of the result list is equal to the size of the input list.
+     * Buffers are written in order, and the list of result futures has the same order.
+     *
+     * @param record set of log records
+     * @return A Future which contains a list of Future DLSNs if the record was successfully written
+     * or an exception if the operation fails.
+     */
+    public CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk(List<LogRecord> record);
+
+    /**
+     * Truncate the log until <i>dlsn</i>.
+     *
+     * @param dlsn
+     *          dlsn to truncate until.
+     * @return A Future indicates whether the operation succeeds or not, or an exception
+     * if the truncation fails.
+     */
+    public CompletableFuture<Boolean> truncate(DLSN dlsn);
+
+    /**
+     * Get the name of the stream this writer writes data to
+     */
+    public String getStreamName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
new file mode 100644
index 0000000..60f629d
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.AppendOnlyStreamReader;
+import org.apache.distributedlog.AppendOnlyStreamWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+
+/**
+ * A DistributedLogManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface DistributedLogManager extends AsyncCloseable, Closeable {
+
+    /**
+     * Get the name of the stream managed by this log manager
+     * @return streamName
+     */
+    public String getStreamName();
+
+    /**
+     * Get the namespace driver used by this manager.
+     *
+     * @return the namespace driver
+     */
+    public NamespaceDriver getNamespaceDriver();
+
+    /**
+     * Get log segments.
+     *
+     * @return log segments
+     * @throws IOException
+     */
+    public List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+    /**
+     * Register <i>listener</i> on log segment updates of this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    public void registerListener(LogSegmentListener listener) throws IOException ;
+
+    /**
+     * Unregister <i>listener</i> on log segment updates from this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    public void unregisterListener(LogSegmentListener listener);
+
+    /**
+     * Open async log writer to write records to the log stream.
+     *
+     * @return result represents the open result
+     */
+    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter();
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    public LogWriter startLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    // @Deprecated
+    public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin appending to the end of the log stream which is being treated as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
+
+    /**
+     * Get a reader to read a log stream as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
+
+    /**
+     * Get the input stream starting with fromTxnId for the specified log
+     *
+     * @param fromTxnId - the first transaction id we want to read
+     * @return the stream starting with transaction fromTxnId
+     * @throws IOException if a stream cannot be found.
+     */
+    public LogReader getInputStream(long fromTxnId)
+        throws IOException;
+
+    public LogReader getInputStream(DLSN fromDLSN) throws IOException;
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
+     *
+     * @param fromTxnId
+     *          transaction id to start reading from
+     * @return async log reader
+     */
+    public CompletableFuture<AsyncLogReader> openAsyncLogReader(long fromTxnId);
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
+     *
+     * @param fromDLSN
+     *          dlsn to start reading from
+     * @return async log reader
+     */
+    public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
+
+    // @Deprecated
+    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
+
+    // @Deprecated
+    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
+
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
+
+    /**
+     * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
+     * If two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param fromDLSN
+     *          start dlsn
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
+
+    /**
+     * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
+     * its last commit position recorded in subscription store. If no last commit position found
+     * in subscription store, it would start reading from head of the stream.
+     *
+     * If the two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
+
+    /**
+     * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
+     *
+     * @param transactionId
+     *          transaction id
+     * @return dlsn of first log record whose transaction id is not less than transactionId.
+     */
+    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(long transactionId);
+
+    /**
+     * Get the last log record in the stream
+     *
+     * @return the last log record in the stream
+     * @throws IOException if a stream cannot be found.
+     */
+    public LogRecordWithDLSN getLastLogRecord()
+        throws IOException;
+
+    /**
+     * Get the earliest Transaction Id available in the log
+     *
+     * @return earliest transaction id
+     * @throws IOException
+     */
+    public long getFirstTxId() throws IOException;
+
+    /**
+     * Get Latest Transaction Id in the log
+     *
+     * @return latest transaction id
+     * @throws IOException
+     */
+    public long getLastTxId() throws IOException;
+
+    /**
+     * Get Latest DLSN in the log
+     *
+     * @return last dlsn
+     * @throws IOException
+     */
+    public DLSN getLastDLSN() throws IOException;
+
+    /**
+     * Get Latest log record with DLSN in the log - async
+     *
+     * @return latest log record with DLSN
+     */
+    public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync();
+
+    /**
+     * Get Latest Transaction Id in the log - async
+     *
+     * @return latest transaction id
+     */
+    public CompletableFuture<Long> getLastTxIdAsync();
+
+    /**
+     * Get first DLSN in the log.
+     *
+     * @return first dlsn in the stream
+     */
+    public CompletableFuture<DLSN> getFirstDLSNAsync();
+
+    /**
+     * Get Latest DLSN in the log - async
+     *
+     * @return latest transaction id
+     */
+    public CompletableFuture<DLSN> getLastDLSNAsync();
+
+    /**
+     * Get the number of log records in the active portion of the log
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return number of log records
+     * @throws IOException
+     */
+    public long getLogRecordCount() throws IOException;
+
+    /**
+     * Get the number of log records in the active portion of the log - async.
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return future number of log records
+     * @throws IOException
+     */
+    public CompletableFuture<Long> getLogRecordCountAsync(final DLSN beginDLSN);
+
+    /**
+     * Run recovery on the log.
+     *
+     * @throws IOException
+     */
+    public void recover() throws IOException;
+
+    /**
+     * Check if an end of stream marker was added to the stream
+     * A stream with an end of stream marker cannot be appended to
+     *
+     * @return true if the marker was added to the stream, false otherwise
+     * @throws IOException
+     */
+    public boolean isEndOfStreamMarked() throws IOException;
+
+    /**
+     * Delete the log.
+     *
+     * @throws IOException if the deletion fails
+     */
+    public void delete() throws IOException;
+
+    /**
+     * The DistributedLogManager may archive/purge any logs for transactionId
+     * less than or equal to minImageTxId.
+     * This is to be used only when the client explicitly manages deletion. If
+     * the cleanup policy is based on sliding time window, then this method need
+     * not be called.
+     *
+     * @param minTxIdToKeep the earliest txid that must be retained
+     * @throws IOException if purging fails
+     */
+    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
+
+    /**
+     * Get the subscriptions store provided by the distributedlog manager.
+     *
+     * @return subscriptions store manages subscriptions for current stream.
+     */
+    public SubscriptionsStore getSubscriptionsStore();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java
new file mode 100644
index 0000000..631a8a9
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.io.AsyncCloseable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
+ *
+ * <h3>Lifecycle of a Reader</h3>
+ *
+ * A reader is a <i>sequential</i> reader that read records from a DL log starting
+ * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
+ * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
+ * <p>
+ * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
+ * to read records out the log from provided position.
+ * <p>
+ * Closing the reader (via {@link #close()} will release all the resources occupied
+ * by this reader instance.
+ * <p>
+ * Exceptions could be thrown during reading records. Once the exception is thrown,
+ * the reader is set to an error state and it isn't usable anymore. It is the application's
+ * responsibility to handle the exceptions and re-create readers if necessary.
+ * <p>
+ * Example:
+ * <pre>
+ * DistributedLogManager dlm = ...;
+ * long nextTxId = ...;
+ * LogReader reader = dlm.getInputStream(nextTxId);
+ *
+ * while (true) { // keep reading & processing records
+ *     LogRecord record;
+ *     try {
+ *         record = reader.readNext(false);
+ *         nextTxId = record.getTransactionId();
+ *         // process the record
+ *         ...
+ *     } catch (IOException ioe) {
+ *         // handle the exception
+ *         ...
+ *         reader = dlm.getInputStream(nextTxId + 1);
+ *     }
+ * }
+ *
+ * </pre>
+ *
+ * <h3>Read Records</h3>
+ *
+ * Reading records from an <i>endless</i> log in `synchronous` way isn't as
+ * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
+ * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
+ * controlling the <i>waiting</i> behavior on `synchronous` reads.
+ *
+ * <h4>Blocking vs NonBlocking</h4>
+ *
+ * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
+ * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
+ * means the reads will only check readahead cache and return whatever records
+ * available in the readahead cache.
+ * <p>
+ * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
+ * catching up with writer (there are records in the log), the read call will
+ * wait until records are read and returned. If the reader is caught up with
+ * writer (there are no more records in the log at read time), the read call
+ * will wait for a small period of time (defined in
+ * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
+ * records available in the readahead cache. In other words, if a reader sees
+ * no record on blocking reads, it means the reader is `caught-up` with the
+ * writer.
+ * <p>
+ * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
+ * state machines. Applications could use <i>blocking</i> reads till caught up
+ * with latest data. Once they are caught up with latest data, they could start
+ * serving their service and turn to <i>non-blocking</i> read mode and tail read
+ * data from the logs.
+ * <p>
+ * See examples below.
+ *
+ * <h4>Read Single Record</h4>
+ *
+ * {@link #readNext(boolean)} is reading individual records from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ *
+ * // keep reading records in blocking way until no records available in the log
+ * LogRecord record = reader.readNext(false);
+ * while (null != record) {
+ *     // process the record
+ *     ...
+ *     // read next record
+ *     records = reader.readNext(false);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     record = reader.readNext(true)
+ *     // process the new records
+ *     ...
+ * }
+ * </pre>
+ *
+ * <h4>Read Batch of Records</h4>
+ *
+ * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
+ * from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ * int N = 10;
+ *
+ * // keep reading N records in blocking way until no records available in the log
+ * List<LogRecord> records = reader.readBulk(false, N);
+ * while (!records.isEmpty()) {
+ *     // process the list of records
+ *     ...
+ *     if (records.size() < N) { // no more records available in the log
+ *         break;
+ *     }
+ *     // read next N records
+ *     records = reader.readBulk(false, N);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     records = reader.readBulk(true, N)
+ *     // process the new records
+ *     ...
+ * }
+ *
+ * </pre>
+ *
+ * <p>
+ * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
+ * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
+ *
+ * @see AsyncLogReader
+ */
+public interface LogReader extends Closeable, AsyncCloseable {
+
+    /**
+     * Read the next log record from the stream.
+     * <p>
+     * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
+     * records from read ahead cache. It would return <i>null</i> if there isn't any records
+     * available in the read ahead cache.
+     * <p>
+     * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
+     * block until return a record if there are records in the stream (aka catching up).
+     * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
+     * milliseconds and return null if there isn't any more records in the stream.
+     *
+     * @param nonBlocking should the read make blocking calls to the backend or rely on the
+     * readAhead cache
+     * @return an operation from the stream or null if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     */
+    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
+
+    /**
+     * Read the next <i>numLogRecords</i> log records from the stream
+     *
+     * @param nonBlocking should the read make blocking calls to the backend or rely on the
+     * readAhead cache
+     * @param numLogRecords maximum number of log records returned by this call.
+     * @return an operation from the stream or empty list if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     * @see #readNext(boolean)
+     */
+    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java
new file mode 100644
index 0000000..46ad1f0
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.io.Abortable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/*
+* A generic interface class to support writing log records into
+* a persistent distributed log.
+*/
+public interface LogWriter extends Closeable, Abortable {
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     * @throws IOException
+     */
+    public void write(LogRecord record) throws IOException;
+
+
+    /**
+     * Write a list of log records to the stream.
+     *
+     * @param records list of log records
+     * @throws IOException
+     */
+    @Deprecated
+    public int writeBulk(List<LogRecord> records) throws IOException;
+
+    /**
+     * All data that has been written to the stream so far will be sent to
+     * persistent storage.
+     * The transmission is asynchronous and new data can be still written to the
+     * stream while flushing is performed.
+     *
+     * TODO: rename this to flush()
+     */
+    public long setReadyToFlush() throws IOException;
+
+    /**
+     * Flush and sync all data that is ready to be flush
+     * {@link #setReadyToFlush()} into underlying persistent store.
+     * @throws IOException
+     *
+     * TODO: rename this to commit()
+     */
+    public long flushAndSync() throws IOException;
+
+    /**
+     * Flushes all the data up to this point,
+     * adds the end of stream marker and marks the stream
+     * as read-only in the metadata. No appends to the
+     * stream will be allowed after this point
+     *
+     * @throws IOException
+     */
+    public void markEndOfStream() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java
new file mode 100644
index 0000000..76ef700
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api;
+
+import org.apache.distributedlog.io.AsyncCloseable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface MetadataAccessor extends Closeable, AsyncCloseable {
+    /**
+     * Get the name of the stream managed by this log manager
+     * @return streamName
+     */
+    public String getStreamName();
+
+    public void createOrUpdateMetadata(byte[] metadata) throws IOException;
+
+    public void deleteMetadata() throws IOException;
+
+    public byte[] getMetadata() throws IOException;
+
+    /**
+     * Close the distributed log metadata, freeing any resources it may hold.
+     */
+    public void close() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
new file mode 100644
index 0000000..818824d
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.namespace;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+
+/**
+ * A namespace is the basic unit for managing a set of distributedlogs.
+ *
+ * <h4>Namespace Interface</h4>
+ *
+ * <P>
+ * The <code>Namespace</code> interface is implemented by different backend providers.
+ * There are several components are required for an implementation:
+ * <OL>
+ *     <LI>Log Management -- manage logs in a given namespace. e.g. create/open/delete log, list of logs,
+ *         watch the changes of logs.
+ *     <LI>Access Control -- manage the access controls for logs in the namespace.
+ * </OL>
+ * </P>
+ *
+ * <h4>Namespace Location</h4>
+ *
+ * At the highest level, a <code>Namespace</code> is located by a <code>URI</code>. The location
+ * URI is in string form has the syntax
+ *
+ * <blockquote>
+ * distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i>
+ * </blockquote>
+ *
+ * where square brackets [...] delineate optional components and the characters <tt><b>-</b></tt> and <tt><b>:</b></tt>
+ * stand for themselves.
+ *
+ * The <code>provider</code> part in the URI indicates what is the backend used for this namespace. For example:
+ * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while <i>distributedlog-mem</i> URI is storing logs in
+ * memory. The <code>provider</code> part is optional. It would use bookkeeper backend if the <i>provider</i> part
+ * is omitted.
+ *
+ * @see DistributedLogManager
+ * @since 0.3.32
+ */
+@Beta
+public interface Namespace {
+
+    /**
+     * Get the namespace driver used by this namespace.
+     *
+     * @return namespace driver
+     */
+    NamespaceDriver getNamespaceDriver();
+
+    //
+    // Method to operate logs
+    //
+
+    /**
+     * Create a log named <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    void createLog(String logName)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Delete a log named <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @throws InvalidStreamNameException if log name is invalid
+     * @throws LogNotFoundException if log doesn't exist
+     * @throws IOException when encountered issues with backend
+     */
+    void deleteLog(String logName)
+            throws InvalidStreamNameException, LogNotFoundException, IOException;
+
+    /**
+     * Open a log named <i>logName</i>.
+     * A distributedlog manager is returned to access log <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @return distributedlog manager instance.
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    DistributedLogManager openLog(String logName)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Open a log named <i>logName</i> with specific log configurations.
+     *
+     * <p>This method allows the caller to override global configuration settings by
+     * supplying log configuration overrides. Log config overrides come in two flavors,
+     * static and dynamic. Static config never changes in the lifecyle of <code>DistributedLogManager</code>,
+     * dynamic config changes by reloading periodically and safe to access from any context.</p>
+     *
+     * @param logName
+     *          name of the log
+     * @param logConf
+     *          static log configuration
+     * @param dynamicLogConf
+     *          dynamic log configuration
+     * @return distributedlog manager instance.
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    DistributedLogManager openLog(String logName,
+                                  Optional<DistributedLogConfiguration> logConf,
+                                  Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                  Optional<StatsLogger> perStreamStatsLogger)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Check whether the log <i>logName</i> exist.
+     *
+     * @param logName
+     *          name of the log
+     * @return <code>true</code> if the log exists, otherwise <code>false</code>.
+     * @throws IOException when encountered exceptions on checking
+     */
+    boolean logExists(String logName)
+            throws IOException;
+
+    /**
+     * Retrieve the logs under the namespace.
+     *
+     * @return iterator of the logs under the namespace.
+     * @throws IOException when encountered issues with backend.
+     */
+    Iterator<String> getLogs()
+            throws IOException;
+
+    //
+    // Methods for namespace
+    //
+
+    /**
+     * Register namespace listener on stream updates under the namespace.
+     *
+     * @param listener
+     *          listener to receive stream updates under the namespace
+     */
+    void registerNamespaceListener(NamespaceListener listener);
+
+    /**
+     * Create an access control manager to manage/check acl for logs.
+     *
+     * @return access control manager for logs under the namespace.
+     * @throws IOException
+     */
+    AccessControlManager createAccessControlManager()
+            throws IOException;
+
+    /**
+     * Close the namespace.
+     */
+    void close();
+
+}


Mime
View raw message