distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [01/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future
Date Wed, 21 Jun 2017 17:20:33 GMT
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 0f4ea2816 -> 53fca4ac3


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java
b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java
index 8ef5c46..006f832 100644
--- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java
+++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java
@@ -17,22 +17,21 @@
  */
 package org.apache.distributedlog.basic;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import com.google.common.collect.Lists;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordSet;
 import org.apache.distributedlog.io.CompressionCodec.Type;
 import org.apache.distributedlog.service.DistributedLogClient;
 import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 
 /**
  * Write multiple record atomically
@@ -60,12 +59,12 @@ public class AtomicWriter {
                 .build();
 
         final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(16 * 1024, Type.NONE);
-        List<Future<DLSN>> writeFutures = Lists.newArrayListWithExpectedSize(messages.length);
+        List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayListWithExpectedSize(messages.length);
         for (String msg : messages) {
             final String message = msg;
             ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes(UTF_8));
-            Promise<DLSN> writeFuture = new Promise<DLSN>();
-            writeFuture.addEventListener(new FutureEventListener<DLSN>() {
+            CompletableFuture<DLSN> writeFuture = FutureUtils.createFuture();
+            writeFuture.whenComplete(new FutureEventListener<DLSN>() {
                 @Override
                 public void onFailure(Throwable cause) {
                     System.out.println("Encountered error on writing data");
@@ -81,9 +80,9 @@ public class AtomicWriter {
             recordSetWriter.writeRecord(msgBuf, writeFuture);
             writeFutures.add(writeFuture);
         }
-        FutureUtils.result(
+        Await.result(
             client.writeRecordSet(streamName, recordSetWriter)
-                .addEventListener(new FutureEventListener<DLSN>() {
+                .addEventListener(new com.twitter.util.FutureEventListener<DLSN>()
{
                     @Override
                     public void onFailure(Throwable cause) {
                         recordSetWriter.abortTransmit(cause);
@@ -101,7 +100,7 @@ public class AtomicWriter {
                     }
                 })
         );
-        FutureUtils.result(Future.collect(writeFutures));
+        FutureUtils.result(FutureUtils.collect(writeFutures));
         client.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java
b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java
index 4322224..833c0ce 100644
--- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java
+++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java
@@ -17,18 +17,21 @@
  */
 package org.apache.distributedlog.basic;
 
-import org.apache.distributedlog.*;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import jline.ConsoleReader;
+import static com.google.common.base.Charsets.UTF_8;
 
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
+import jline.ConsoleReader;
 
 /**
  * Writer write records from console
@@ -53,7 +56,7 @@ public class ConsoleWriter {
         conf.setOutputBufferSize(0);
         conf.setPeriodicFlushFrequencyMilliSeconds(0);
         conf.setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .regionId(DistributedLogConstants.LOCAL_REGION_ID)
@@ -73,7 +76,7 @@ public class ConsoleWriter {
                 String line;
                 while ((line = reader.readLine(PROMPT_MESSAGE)) != null) {
                     writer.write(new LogRecord(System.currentTimeMillis(), line.getBytes(UTF_8)))
-                            .addEventListener(new FutureEventListener<DLSN>() {
+                            .whenComplete(new FutureEventListener<DLSN>() {
                                 @Override
                                 public void onFailure(Throwable cause) {
                                     System.out.println("Encountered error on writing data");
@@ -89,7 +92,7 @@ public class ConsoleWriter {
                 }
             } finally {
                 if (null != writer) {
-                    FutureUtils.result(writer.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+                    FutureUtils.result(writer.asyncClose(), 5, TimeUnit.SECONDS);
                 }
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java
b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java
index 9fe2013..29370de 100644
--- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java
+++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java
@@ -18,15 +18,17 @@
 package org.apache.distributedlog.basic;
 
 import org.apache.distributedlog.*;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.commons.lang.StringUtils;
 
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -48,7 +50,7 @@ public class MultiReader {
 
         URI uri = URI.create(dlUriStr);
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -67,7 +69,7 @@ public class MultiReader {
 
         for (DistributedLogManager dlm : managers) {
             final DistributedLogManager manager = dlm;
-            dlm.getLastLogRecordAsync().addEventListener(new FutureEventListener<LogRecordWithDLSN>()
{
+            dlm.getLastLogRecordAsync().whenComplete(new FutureEventListener<LogRecordWithDLSN>()
{
                 @Override
                 public void onFailure(Throwable cause) {
                     if (cause instanceof LogNotFoundException) {
@@ -99,7 +101,7 @@ public class MultiReader {
                                  final DLSN dlsn,
                                  final CountDownLatch keepAliveLatch) {
         System.out.println("Wait for records from " + dlm.getStreamName() + " starting from
" + dlsn);
-        dlm.openAsyncLogReader(dlsn).addEventListener(new FutureEventListener<AsyncLogReader>()
{
+        dlm.openAsyncLogReader(dlsn).whenComplete(new FutureEventListener<AsyncLogReader>()
{
             @Override
             public void onFailure(Throwable cause) {
                 System.err.println("Encountered error on reading records from stream " +
dlm.getStreamName());
@@ -131,10 +133,10 @@ public class MultiReader {
                 System.out.println("\"\"\"");
                 System.out.println(new String(record.getPayload(), UTF_8));
                 System.out.println("\"\"\"");
-                reader.readNext().addEventListener(this);
+                reader.readNext().whenComplete(this);
             }
         };
-        reader.readNext().addEventListener(readListener);
+        reader.readNext().whenComplete(readListener);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java
b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java
index 50a456d..b43b90c 100644
--- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java
+++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java
@@ -17,19 +17,20 @@
  */
 package org.apache.distributedlog.basic;
 
-import org.apache.distributedlog.*;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.CountDownLatch;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
+import static com.google.common.base.Charsets.UTF_8;
 
 import java.net.URI;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 
 /**
  * Rewind a stream to read data back in a while
@@ -50,7 +51,7 @@ public class StreamRewinder {
 
         URI uri = URI.create(dlUriStr);
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -101,13 +102,13 @@ public class StreamRewinder {
                     caughtup.set(true);
                 }
 
-                reader.readNext().addEventListener(this);
+                reader.readNext().whenComplete(this);
             }
         };
-        reader.readNext().addEventListener(readListener);
+        reader.readNext().whenComplete(readListener);
 
         keepAliveLatch.await();
-        FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+        FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java
b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java
index 8b43b45..6a3acf6 100644
--- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java
+++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java
@@ -17,20 +17,22 @@
  */
 package org.apache.distributedlog.basic;
 
-import org.apache.distributedlog.*;
-import org.apache.distributedlog.exceptions.LogEmptyException;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
+import static com.google.common.base.Charsets.UTF_8;
 
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 
 /**
  * A reader is tailing a log
@@ -50,7 +52,7 @@ public class TailReader {
 
         URI uri = URI.create(dlUriStr);
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -100,13 +102,13 @@ public class TailReader {
                 System.out.println("\"\"\"");
                 System.out.println(new String(record.getPayload(), UTF_8));
                 System.out.println("\"\"\"");
-                reader.readNext().addEventListener(this);
+                reader.readNext().whenComplete(this);
             }
         };
-        reader.readNext().addEventListener(readListener);
+        reader.readNext().whenComplete(readListener);
 
         keepAliveLatch.await();
-        FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+        FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties b/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties
index 56a6417..7aa93f6 100644
--- a/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties
+++ b/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java
b/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java
index f1490d4..9cf1cf9 100644
--- a/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java
+++ b/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java
@@ -17,20 +17,18 @@
  */
 package org.apache.distributedlog.kafka;
 
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.FutureEventListener;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.distributedlog.DLSN;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 
 class DLFutureRecordMetadata implements Future<RecordMetadata> {
 
@@ -79,25 +77,26 @@ class DLFutureRecordMetadata implements Future<RecordMetadata> {
     @Override
     public RecordMetadata get() throws InterruptedException, ExecutionException {
         try {
-            FutureUtils.result(dlsnFuture);
+            Await.result(dlsnFuture);
             // TODO: align the DLSN concepts with kafka concepts
             return new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L);
-        } catch (DLInterruptedException e) {
+        } catch (InterruptedException e) {
             throw new InterruptedException("Interrupted on waiting for response");
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new ExecutionException("Error on waiting for response", e);
         }
     }
 
     @Override
-    public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+    public RecordMetadata get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
         try {
-            FutureUtils.result(dlsnFuture, Duration.apply(timeout, unit));
+            Await.result(dlsnFuture, Duration.apply(timeout, unit));
             // TODO: align the DLSN concepts with kafka concepts
             return new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L);
-        } catch (DLInterruptedException e) {
+        } catch (InterruptedException e) {
             throw new InterruptedException("Interrupted on waiting for response");
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new ExecutionException("Error on waiting for response", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
index 94a53d4..6fd017c 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
@@ -19,13 +19,13 @@ package org.apache.distributedlog.mapreduce;
 
 import com.google.common.collect.Lists;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAccessor;
@@ -59,7 +59,7 @@ public class DistributedLogInputFormat
     protected Configuration conf;
     protected DistributedLogConfiguration dlConf;
     protected URI dlUri;
-    protected DistributedLogNamespace namespace;
+    protected Namespace namespace;
     protected String streamName;
     protected DistributedLogManager dlm;
 
@@ -71,7 +71,7 @@ public class DistributedLogInputFormat
         dlUri = URI.create(configuration.get(DL_URI, ""));
         streamName = configuration.get(DL_STREAM, "");
         try {
-            namespace = DistributedLogNamespaceBuilder.newBuilder()
+            namespace = NamespaceBuilder.newBuilder()
                     .conf(dlConf)
                     .uri(dlUri)
                     .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties b/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties
index 56a6417..7aa93f6 100644
--- a/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties
+++ b/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java
b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java
index ecf18fc..c59cc72 100644
--- a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java
+++ b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java
@@ -17,22 +17,27 @@
  */
 package org.apache.distributedlog.messaging;
 
-import org.apache.distributedlog.*;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.Options;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.iq80.leveldb.impl.Iq80DBFactory.*;
 
 import java.io.File;
 import java.net.URI;
-import java.util.concurrent.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.iq80.leveldb.impl.Iq80DBFactory.*;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
 
 /**
  * Reader with offsets
@@ -54,7 +59,7 @@ public class ReaderWithOffsets {
 
         URI uri = URI.create(dlUriStr);
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -120,13 +125,13 @@ public class ReaderWithOffsets {
                 System.out.println(new String(record.getPayload(), UTF_8));
                 System.out.println("\"\"\"");
                 lastDLSN.set(record.getDlsn());
-                reader.readNext().addEventListener(this);
+                reader.readNext().whenComplete(this);
             }
         };
-        reader.readNext().addEventListener(readListener);
+        reader.readNext().whenComplete(readListener);
 
         keepAliveLatch.await();
-        FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+        FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java
b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java
index 2cf202f..3f874c0 100644
--- a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java
+++ b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java
@@ -17,19 +17,7 @@
  */
 package org.apache.distributedlog.messaging;
 
-import org.apache.distributedlog.*;
-import org.apache.distributedlog.exceptions.LogEmptyException;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.thrift.messaging.TransformedRecord;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
+import static com.google.common.base.Charsets.UTF_8;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,8 +25,24 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.thrift.messaging.TransformedRecord;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
 
 /**
  * Transform one stream to another stream. And apply transformation
@@ -63,7 +67,7 @@ public class StreamTransformer {
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         conf.setOutputBufferSize(16*1024); // 16KB
         conf.setPeriodicFlushFrequencyMilliSeconds(5); // 5ms
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -102,7 +106,7 @@ public class StreamTransformer {
         try {
             readLoop(srcDlm, srcDlsn, targetWriter, replicationTransformer);
         } finally {
-            FutureUtils.result(targetWriter.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+            FutureUtils.result(targetWriter.asyncClose(), 5, TimeUnit.SECONDS);
             targetDlm.close();
             srcDlm.close();
             namespace.close();
@@ -131,7 +135,7 @@ public class StreamTransformer {
             @Override
             public void onSuccess(LogRecordWithDLSN record) {
                 if (record.getDlsn().compareTo(fromDLSN) <= 0) {
-                    reader.readNext().addEventListener(this);
+                    reader.readNext().whenComplete(this);
                     return;
                 }
                 System.out.println("Received record " + record.getDlsn());
@@ -146,13 +150,13 @@ public class StreamTransformer {
                     e.printStackTrace(System.err);
                     keepAliveLatch.countDown();
                 }
-                reader.readNext().addEventListener(this);
+                reader.readNext().whenComplete(this);
             }
         };
-        reader.readNext().addEventListener(readListener);
+        reader.readNext().whenComplete(readListener);
 
         keepAliveLatch.await();
-        FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS));
+        FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS);
     }
 
     private static void transform(final AsyncLogWriter writer,
@@ -170,7 +174,7 @@ public class StreamTransformer {
         transformedRecord.write(protocolFactory.getProtocol(new TIOStreamTransport(baos)));
         byte[] data = baos.toByteArray();
         writer.write(new LogRecord(record.getSequenceId(), data))
-                .addEventListener(new FutureEventListener<DLSN>() {
+                .whenComplete(new FutureEventListener<DLSN>() {
             @Override
             public void onFailure(Throwable cause) {
                 System.err.println("Encountered error on writing records to stream " + writer.getStreamName());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a24577c..d8c74ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
   </developers>
   <modules>
     <module>distributedlog-build-tools</module>
+    <module>distributedlog-common</module>
     <module>distributedlog-protocol</module>
     <module>distributedlog-core</module>
     <module>distributedlog-proxy-protocol</module>
@@ -90,6 +91,7 @@
     <module>distributedlog-proxy-server</module>
     <module>distributedlog-benchmark</module>
     <module>distributedlog-tutorials</module>
+    <module>distributedlog-core-twitter</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -103,11 +105,13 @@
     <commons-lang3.version>3.3.2</commons-lang3.version>
     <curator.version>3.2.1</curator.version>
     <finagle.version>6.34.0</finagle.version>
+    <freebuilder.version>1.12.3</freebuilder.version>
     <guava.version>20.0</guava.version>
     <jetty.version>8.1.19.v20160209</jetty.version>
     <jmock.version>2.8.2</jmock.version>
     <junit.version>4.8.1</junit.version>
     <libthrift.version>0.5.0-1</libthrift.version>
+    <lombok.version>1.16.16</lombok.version>
     <lz4.version>1.2.0</lz4.version>
     <mockito.version>1.9.5</mockito.version>
     <scrooge.version>4.6.0</scrooge.version>
@@ -142,7 +146,7 @@
           <groups>
             <group>
               <title>Core Library</title>
-              <packages>org.apache.distributedlog:org.apache.distributedlog.annotations:org.apache.distributedlog.callback:org.apache.distributedlog.exceptions:org.apache.distributedlog.feature:org.apache.distributedlog.io:org.apache.distributedlog.lock:org.apache.distributedlog.logsegment:org.apache.distributedlog.metadata:org.apache.distributedlog.namespace:org.apache.distributedlog.net:org.apache.distributedlog.stats:org.apache.distributedlog.subscription</packages>
+              <packages>org.apache.distributedlog:org.apache.distributedlog.annotations:org.apache.distributedlog.callback:org.apache.distributedlog.exceptions:org.apache.distributedlog.feature:org.apache.distributedlog.io:org.apache.distributedlog.lock:org.apache.distributedlog.logsegment:org.apache.distributedlog.metadata:org.apache.distributedlog.namespace:org.apache.distributedlog.net:org.apache.distributedlog.stats:org.apache.distributedlog.api.subscription</packages>
             </group>
             <group>
               <title>Proxy Client</title>
@@ -150,7 +154,7 @@
             </group>
           </groups>
           <excludePackageNames>
-            org.apache.distributedlog.acl:org.apache.distributedlog.admin:org.apache.distributedlog.auditor:org.apache.distributedlog.basic:org.apache.distributedlog.benchmark*:org.apache.distributedlog.bk:org.apache.distributedlog.ownership:org.apache.distributedlog.proxy:org.apache.distributedlog.resolver:org.apache.distributedlog.service.*:org.apache.distributedlog.config:org.apache.distributedlog.function:org.apache.distributedlog.impl*:org.apache.distributedlog.injector:org.apache.distributedlog.kafka:org.apache.distributedlog.limiter:org.apache.distributedlog.mapreduce:org.apache.distributedlog.messaging:org.apache.distributedlog.rate:org.apache.distributedlog.readahead:org.apache.distributedlog.selector:org.apache.distributedlog.stats:org.apache.distributedlog.thrift*:org.apache.distributedlog.tools:org.apache.distributedlog.util:org.apache.distributedlog.zk:org.apache.bookkeeper.client:org.apache.bookkeeper.stats

+            org.apache.distributedlog.acl:org.apache.distributedlog.admin:org.apache.distributedlog.auditor:org.apache.distributedlog.basic:org.apache.distributedlog.benchmark*:org.apache.distributedlog.bk:org.apache.distributedlog.ownership:org.apache.distributedlog.proxy:org.apache.distributedlog.resolver:org.apache.distributedlog.service.*:org.apache.distributedlog.config:org.apache.distributedlog.function:org.apache.distributedlog.impl*:org.apache.distributedlog.injector:org.apache.distributedlog.kafka:org.apache.distributedlog.limiter:org.apache.distributedlog.mapreduce:org.apache.distributedlog.messaging:org.apache.distributedlog.common.rate:org.apache.distributedlog.readahead:org.apache.distributedlog.selector:org.apache.distributedlog.stats:org.apache.distributedlog.thrift*:org.apache.distributedlog.tools:org.apache.distributedlog.util:org.apache.distributedlog.zk:org.apache.bookkeeper.client:org.apache.bookkeeper.stats
           </excludePackageNames>
         </configuration>
         <executions>
@@ -182,8 +186,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>${maven-compiler-plugin.version}</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>



Mime
View raw message