distributedlog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [2/2] incubator-distributedlog git commit: DL-32: Fix Findbug warnings
Date Thu, 13 Oct 2016 07:47:17 GMT
DL-32: Fix Findbug warnings

- Bump the version to 3.0.3
- Fix all the findbug warnings
- Enable findbugs:check on travis ci

Author: Jon Derrick <jonathan.derrickk@gmail.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #19 from jderrickk/jd/fix_findbugs_error and squashes the following commits:

c48c89c [Jon Derrick] Merge branch 'master' into jd/fix_findbugs_error
d9b0425 [Jon Derrick] verify findbugs on travis ci
985501b [Jon Derrick] Fix findbug errors on all modules
18e8267 [Jon Derrick] Remove distributedlog-example
ffa8361 [Jon Derrick] Fix findbugs in distributedlog-protocol


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/bb6990de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/bb6990de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/bb6990de

Branch: refs/heads/master
Commit: bb6990dee74cdcf158f494041f187f856344f4cf
Parents: 93bdad0
Author: Jon Derrick <jonathan.derrickk@gmail.com>
Authored: Thu Oct 13 00:47:11 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Thu Oct 13 00:47:11 2016 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 .../routing/ConsistentHashRoutingService.java   |  13 +-
 .../distributedlog/BKAbstractLogWriter.java     |   6 +-
 .../distributedlog/BKAsyncLogReaderDLSN.java    |  15 +-
 .../distributedlog/BKAsyncLogWriter.java        |   2 +-
 .../distributedlog/BKDistributedLogManager.java |   7 +-
 .../distributedlog/BKLogWriteHandler.java       |   3 +-
 .../distributedlog/BKSyncLogReaderDLSN.java     |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 .../distributedlog/LogSegmentMetadata.java      | 102 ++++++-------
 .../com/twitter/distributedlog/MaxTxId.java     |   2 +-
 .../distributedlog/function/VoidFunctions.java  |   2 +-
 .../impl/ZKLogSegmentMetadataStore.java         |   2 +-
 .../federated/FederatedZKLogMetadataStore.java  |  24 ++-
 .../distributedlog/limiter/RateLimiter.java     |   5 -
 .../rate/MovingAverageRateFactory.java          |   4 +-
 .../readahead/ReadAheadWorker.java              |   2 -
 .../tools/DistributedLogTool.java               |  18 +--
 .../twitter/distributedlog/util/DLUtils.java    |   6 +-
 .../src/main/resources/findbugsExclude.xml      |   7 +-
 distributedlog-example/bin/bk-cluster           |  26 ----
 distributedlog-example/bin/proxy-cluster        |  33 -----
 distributedlog-example/bin/proxy-writer         |  35 -----
 distributedlog-example/conf/bk_server.conf      | 145 -------------------
 distributedlog-example/conf/distributedlog.conf | 125 ----------------
 distributedlog-example/conf/log4j.properties    |  44 ------
 .../conf/stream_config/example-stream_0.conf    |  23 ---
 distributedlog-example/pom.xml                  | 136 -----------------
 .../example/DistributedLogExample.java          |  91 ------------
 .../example/ProxyClusterEmulator.java           |  82 -----------
 .../example/TestProxyClusterEmulator.java       |  61 --------
 .../distributedlog/util/ProtocolUtils.java      |  14 +-
 .../src/main/resources/findbugsExclude.xml      |  17 +++
 .../service/stream/BulkWriteOp.java             |   1 +
 .../service/stream/StreamManagerImpl.java       |   7 +-
 .../distributedlog/service/tools/ProxyTool.java |   1 -
 .../src/main/resources/findbugsExclude.xml      |   6 +
 .../distributedlog-basic/pom.xml                |   7 -
 .../basic/ConsoleProxyMultiWriter.java          |   3 -
 .../basic/ConsoleProxyWriter.java               |   3 -
 .../distributedlog/basic/ConsoleWriter.java     |   3 -
 .../distributedlog-kafka/pom.xml                |   7 -
 .../kafka/DLFutureRecordMetadata.java           |   4 +-
 .../mapreduce/LogSegmentReader.java             |   8 +-
 .../ConsoleProxyPartitionedMultiWriter.java     |   2 -
 .../messaging/ConsoleProxyRRMultiWriter.java    |   2 -
 .../src/main/resources/findbugsExclude.xml      |  33 +++++
 pom.xml                                         |   3 +-
 48 files changed, 184 insertions(+), 964 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index e5ee4db..fc464b3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,7 +28,7 @@ install:
 
 script:
   - travis_retry mvn clean apache-rat:check
-  - travis_wait 60 mvn package
+  - travis_wait 60 mvn package findbugs:check
 
 cache:
   directories:

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
index acf71ae..8abd299 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
@@ -439,7 +439,8 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService {
             MapDifference<Integer, SocketAddress> difference =
                     Maps.difference(shardId2Address, newMap);
             left = difference.entriesOnlyOnLeft();
-            for (Integer shard : left.keySet()) {
+            for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) {
+                int shard = shardEntry.getKey();
                 if (shard >= 0) {
                     SocketAddress host = shardId2Address.get(shard);
                     if (null != host) {
@@ -452,7 +453,7 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService {
                 } else {
                     // shard id is negative - they are resolved from finagle name, which instances don't have shard id
                     // in this case, if they are removed from serverset, we removed them directly
-                    SocketAddress host = left.get(shard);
+                    SocketAddress host = shardEntry.getValue();
                     if (null != host) {
                         removeHostInternal(host, Optional.<Throwable>absent());
                         removedList.add(host);
@@ -460,11 +461,11 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService {
                 }
             }
             // we need to find if any shards are replacing old shards
-            for (Integer shard : newMap.keySet()) {
-                SocketAddress oldHost = shardId2Address.get(shard);
-                SocketAddress newHost = newMap.get(shard);
+            for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) {
+                SocketAddress oldHost = shardId2Address.get(shard.getKey());
+                SocketAddress newHost = shard.getValue();
                 if (!newHost.equals(oldHost)) {
-                    join(shard, newHost, removedList);
+                    join(shard.getKey(), newHost, removedList);
                     joinedList.add(newHost);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
index 83167ab..b0cea24 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
@@ -356,8 +356,10 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
 
         // skip scheduling if there is task that's already running
         //
-        if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
-            lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
+        synchronized (this) {
+            if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
+                lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index ef055a0..7d3d53d 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -90,6 +90,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
+    private final Object scheduleLock = new Object();
     private final AtomicLong scheduleCount = new AtomicLong(0);
     final private Stopwatch scheduleDelayStopwatch;
     final private Stopwatch readNextDelayStopwatch;
@@ -112,7 +113,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
     private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
         @Override
         public void run() {
-            synchronized (scheduleCount) {
+            synchronized (scheduleLock) {
                 backgroundScheduleTask = null;
             }
             scheduleBackgroundRead();
@@ -485,7 +486,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
             LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
         }
 
-        synchronized (scheduleCount) {
+        synchronized (scheduleLock) {
             if (null != backgroundScheduleTask) {
                 backgroundScheduleTask.cancel(true);
             }
@@ -508,7 +509,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
     @Override
     public void run() {
-        synchronized(scheduleCount) {
+        synchronized(scheduleLock) {
             if (scheduleDelayStopwatch.isRunning()) {
                 scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
             }
@@ -533,11 +534,11 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
                         backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
                         return;
                     }
-                }
 
-                if (disableProcessingReadRequests) {
-                    LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
-                    return;
+                    if (disableProcessingReadRequests) {
+                        LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
+                        return;
+                    }
                 }
 
                 // If the oldest pending promise is interrupted then we must mark

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
index ffa478a..f1594f9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
@@ -458,7 +458,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri
             }
         }
         if (null == writerFuture) {
-            return Future.value(lastTxId);
+            return Future.value(getLastTxId());
         }
         return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index a5be03c..6a9d860 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -486,10 +486,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL
     }
 
     public void checkClosedOrInError(String operation) throws AlreadyClosedException {
-        if (null != closePromise) {
-            throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
+        synchronized (this) {
+            if (null != closePromise) {
+                throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
+            }
         }
-
         if (null != writerBKC) {
             writerBKC.checkClosedOrInError();
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index d73c5e2..573679a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -108,7 +108,6 @@ class BKLogWriteHandler extends BKLogHandler {
     protected final boolean sanityCheckTxnId;
     protected final boolean validateLogSegmentSequenceNumber;
     protected final int regionId;
-    protected volatile boolean closed = false;
     protected final RollingPolicy rollingPolicy;
     protected Future<? extends DistributedLock> lockFuture = null;
     protected final PermitLimiter writeLimiter;
@@ -225,7 +224,7 @@ class BKLogWriteHandler extends BKLogHandler {
 
         // Rolling Policy
         if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
-            rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000);
+            rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
         } else {
             rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index bd60856..cef5ddb 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -101,7 +101,7 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo
         }
     }
 
-    private void setLastSeenDLSN(DLSN dlsn) {
+    private synchronized void setLastSeenDLSN(DLSN dlsn) {
         synchronized (sharedLock) {
             this.lastSeenDLSN = dlsn;
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 8a6d824..5c50282 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -57,7 +57,7 @@ public class DistributedLogConstants {
     public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
-    public static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
index 7fe9942..994b141 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java
@@ -622,7 +622,7 @@ public class LogSegmentMetadata {
 
     static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts)
         throws IOException {
-        long versionStatusCount = Long.valueOf(parts[0]);
+        long versionStatusCount = Long.parseLong(parts[0]);
 
         long version = versionStatusCount & METADATA_VERSION_MASK;
         assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
@@ -637,8 +637,8 @@ public class LogSegmentMetadata {
         assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);
 
         if (parts.length == 3) {
-            long ledgerId = Long.valueOf(parts[1]);
-            long txId = Long.valueOf(parts[2]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long txId = Long.parseLong(parts[2]);
             return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
                     .setRegionId(regionId)
                     .setStatus(status)
@@ -647,10 +647,10 @@ public class LogSegmentMetadata {
             long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
             assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);
 
-            long ledgerId = Long.valueOf(parts[1]);
-            long firstTxId = Long.valueOf(parts[2]);
-            long lastTxId = Long.valueOf(parts[3]);
-            long completionTime = Long.valueOf(parts[4]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long firstTxId = Long.parseLong(parts[2]);
+            long lastTxId = Long.parseLong(parts[3]);
+            long completionTime = Long.parseLong(parts[4]);
             return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
                 .setInprogress(false)
                 .setLastTxId(lastTxId)
@@ -667,7 +667,7 @@ public class LogSegmentMetadata {
 
     static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts)
         throws IOException {
-        long versionStatusCount = Long.valueOf(parts[0]);
+        long versionStatusCount = Long.parseLong(parts[0]);
 
         long version = versionStatusCount & METADATA_VERSION_MASK;
         assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
@@ -682,9 +682,9 @@ public class LogSegmentMetadata {
         assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);
 
         if (parts.length == 4) {
-            long ledgerId = Long.valueOf(parts[1]);
-            long txId = Long.valueOf(parts[2]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[3]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long txId = Long.parseLong(parts[2]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[3]);
             return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
                 .setLogSegmentSequenceNo(logSegmentSequenceNumber)
                 .setRegionId(regionId)
@@ -694,13 +694,13 @@ public class LogSegmentMetadata {
             long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
             assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);
 
-            long ledgerId = Long.valueOf(parts[1]);
-            long firstTxId = Long.valueOf(parts[2]);
-            long lastTxId = Long.valueOf(parts[3]);
-            long completionTime = Long.valueOf(parts[4]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[5]);
-            long lastEntryId = Long.valueOf(parts[6]);
-            long lastSlotId = Long.valueOf(parts[7]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long firstTxId = Long.parseLong(parts[2]);
+            long lastTxId = Long.parseLong(parts[3]);
+            long completionTime = Long.parseLong(parts[4]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[5]);
+            long lastEntryId = Long.parseLong(parts[6]);
+            long lastSlotId = Long.parseLong(parts[7]);
             return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
                 .setInprogress(false)
                 .setLastTxId(lastTxId)
@@ -721,7 +721,7 @@ public class LogSegmentMetadata {
 
     static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[] data, String[] parts)
         throws IOException {
-        long versionStatusCount = Long.valueOf(parts[0]);
+        long versionStatusCount = Long.parseLong(parts[0]);
 
         long version = versionStatusCount & METADATA_VERSION_MASK;
         assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
@@ -737,11 +737,11 @@ public class LogSegmentMetadata {
         assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);
 
         if (parts.length == 6) {
-            long ledgerId = Long.valueOf(parts[1]);
-            long txId = Long.valueOf(parts[2]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[3]);
-            long minActiveEntryId = Long.valueOf(parts[4]);
-            long minActiveSlotId = Long.valueOf(parts[5]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long txId = Long.parseLong(parts[2]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[3]);
+            long minActiveEntryId = Long.parseLong(parts[4]);
+            long minActiveSlotId = Long.parseLong(parts[5]);
 
             LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
                 .setLogSegmentSequenceNo(logSegmentSequenceNumber)
@@ -757,15 +757,15 @@ public class LogSegmentMetadata {
             long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
             assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);
 
-            long ledgerId = Long.valueOf(parts[1]);
-            long firstTxId = Long.valueOf(parts[2]);
-            long lastTxId = Long.valueOf(parts[3]);
-            long completionTime = Long.valueOf(parts[4]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[5]);
-            long lastEntryId = Long.valueOf(parts[6]);
-            long lastSlotId = Long.valueOf(parts[7]);
-            long minActiveEntryId = Long.valueOf(parts[8]);
-            long minActiveSlotId = Long.valueOf(parts[9]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long firstTxId = Long.parseLong(parts[2]);
+            long lastTxId = Long.parseLong(parts[3]);
+            long completionTime = Long.parseLong(parts[4]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[5]);
+            long lastEntryId = Long.parseLong(parts[6]);
+            long lastSlotId = Long.parseLong(parts[7]);
+            long minActiveEntryId = Long.parseLong(parts[8]);
+            long minActiveSlotId = Long.parseLong(parts[9]);
             LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
                 .setInprogress(false)
                 .setLastTxId(lastTxId)
@@ -791,7 +791,7 @@ public class LogSegmentMetadata {
 
     static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] data, String[] parts)
         throws IOException {
-        long versionStatusCount = Long.valueOf(parts[0]);
+        long versionStatusCount = Long.parseLong(parts[0]);
 
         long version = versionStatusCount & METADATA_VERSION_MASK;
         assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
@@ -807,12 +807,12 @@ public class LogSegmentMetadata {
         assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);
 
         if (parts.length == 7) {
-            long ledgerId = Long.valueOf(parts[1]);
-            long txId = Long.valueOf(parts[2]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[3]);
-            long minActiveEntryId = Long.valueOf(parts[4]);
-            long minActiveSlotId = Long.valueOf(parts[5]);
-            long startSequenceId = Long.valueOf(parts[6]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long txId = Long.parseLong(parts[2]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[3]);
+            long minActiveEntryId = Long.parseLong(parts[4]);
+            long minActiveSlotId = Long.parseLong(parts[5]);
+            long startSequenceId = Long.parseLong(parts[6]);
 
             LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
                     .setLogSegmentSequenceNo(logSegmentSequenceNumber)
@@ -827,16 +827,16 @@ public class LogSegmentMetadata {
             long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
             assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);
 
-            long ledgerId = Long.valueOf(parts[1]);
-            long firstTxId = Long.valueOf(parts[2]);
-            long lastTxId = Long.valueOf(parts[3]);
-            long completionTime = Long.valueOf(parts[4]);
-            long logSegmentSequenceNumber = Long.valueOf(parts[5]);
-            long lastEntryId = Long.valueOf(parts[6]);
-            long lastSlotId = Long.valueOf(parts[7]);
-            long minActiveEntryId = Long.valueOf(parts[8]);
-            long minActiveSlotId = Long.valueOf(parts[9]);
-            long startSequenceId = Long.valueOf(parts[10]);
+            long ledgerId = Long.parseLong(parts[1]);
+            long firstTxId = Long.parseLong(parts[2]);
+            long lastTxId = Long.parseLong(parts[3]);
+            long completionTime = Long.parseLong(parts[4]);
+            long logSegmentSequenceNumber = Long.parseLong(parts[5]);
+            long lastEntryId = Long.parseLong(parts[6]);
+            long lastSlotId = Long.parseLong(parts[7]);
+            long minActiveEntryId = Long.parseLong(parts[8]);
+            long minActiveSlotId = Long.parseLong(parts[9]);
+            long startSequenceId = Long.parseLong(parts[10]);
             LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
                     .setInprogress(false)
                     .setLastTxId(lastTxId)
@@ -867,7 +867,7 @@ public class LogSegmentMetadata {
         String[] parts = new String(data, UTF_8).split(";");
         long version;
         try {
-            version = Long.valueOf(parts[0]) & METADATA_VERSION_MASK;
+            version = Long.parseLong(parts[0]) & METADATA_VERSION_MASK;
         } catch (Exception exc) {
             throw new IOException("Invalid ledger entry, "
                 + new String(data, UTF_8));

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index ea301e2..c446a8b 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -89,7 +89,7 @@ class MaxTxId {
             }
             String txidStr = Long.toString(maxTxId);
             try {
-                Stat stat = zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
+                zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
                 currentMax = maxTxId;
             } catch (Exception e) {
                 LOG.error("Error writing new MaxTxId value {}", maxTxId, e);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java
index 316a53f..e260482 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 public class VoidFunctions {
 
-    public static AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
+    public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
             new AbstractFunction1<List<Void>, Void>() {
                 @Override
                 public Void apply(List<Void> list) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index 41b887e..c0796a1 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -326,7 +326,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                 listenerSet.add(listener);
                 if (!listeners.containsKey(logSegmentsPath)) {
                     // listener set has been removed, add it back
-                    listeners.putIfAbsent(logSegmentsPath, listenerSet);
+                    listeners.put(logSegmentsPath, listenerSet);
                 }
             }
             new ReadLogSegmentsTask(logSegmentsPath, this).run();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 9b5bea8..0a8f28b 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -206,9 +206,10 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
         Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
         for (URI uri : uris) {
             SubNamespace subNs = new SubNamespace(uri);
-            subNamespaces.putIfAbsent(uri, subNs);
-            subNs.watch();
-            logger.info("Watched sub namespace {}", uri);
+            if (null == subNamespaces.putIfAbsent(uri, subNs)) {
+                subNs.watch();
+                logger.info("Watched sub namespace {}", uri);
+            }
         }
 
         logger.info("Federated ZK LogMetadataStore is initialized for {}", namespace);
@@ -598,10 +599,19 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
     }
 
     void setZkSubnamespacesVersion(int zkVersion) {
-        synchronized (zkSubnamespacesVersion) {
-            Integer oldVersion = zkSubnamespacesVersion.get();
-            if (null == oldVersion || oldVersion < zkVersion) {
-                zkSubnamespacesVersion.set(zkVersion);
+        Integer oldVersion;
+        boolean done = false;
+        while (!done) {
+            oldVersion = zkSubnamespacesVersion.get();
+            if (null == oldVersion) {
+                done = zkSubnamespacesVersion.compareAndSet(null, zkVersion);
+                continue;
+            }
+            if (oldVersion < zkVersion) {
+                done = zkSubnamespacesVersion.compareAndSet(oldVersion, zkVersion);
+                continue;
+            } else {
+                done = true;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java
index 3a266d9..0cb1ebe 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java
@@ -37,11 +37,6 @@ public interface RateLimiter {
     };
 
     public static abstract class Builder {
-        protected int limit;
-        public Builder setLimit(int limit) {
-            this.limit = limit;
-            return this;
-        }
         public abstract RateLimiter build();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
index 1a04b4f..2f9869e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java
@@ -26,10 +26,12 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import scala.runtime.BoxedUnit;
 
 public class MovingAverageRateFactory {
+
+    private static final int DEFAULT_INTERVAL_SECS = 1;
+
     private final Timer timer;
     private final TimerTask timerTask;
     private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs;
-    private final int DEFAULT_INTERVAL_SECS = 1;
 
     public MovingAverageRateFactory(Timer timer) {
         this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index c912178..a3fd239 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -1275,8 +1275,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As
             submit(new Runnable() {
                 @Override
                 public void run() {
-                    long numEntries = endEntryId - startEntryId + 1;
-
                     // If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and
                     // stat the issue and move forward.
                     if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 91724c7..bcb7853 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -395,12 +395,8 @@ public class DistributedLogTool extends Tool {
                                                 LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
                                                         getZooKeeperClient(), getBookKeeperClient(),
                                                         allocationExecutor);
-                                        if (null == allocator) {
-                                            System.err.println("ERROR: use zk34 version to delete allocator pool : " + poolPath + " .");
-                                        } else {
-                                            allocator.delete();
-                                            System.out.println("Deleted allocator pool : " + poolPath + " .");
-                                        }
+                                        allocator.delete();
+                                        System.out.println("Deleted allocator pool : " + poolPath + " .");
                                     } catch (IOException ioe) {
                                         System.err.println("Failed to delete allocator pool " + poolPath + " : " + ioe.getMessage());
                                     }
@@ -538,9 +534,7 @@ public class DistributedLogTool extends Tool {
             }
             for (Map.Entry<String, List<Pair<LogSegmentMetadata, List<String>>>> entry : corruptedCandidates.entrySet()) {
                 System.out.println(entry.getKey() + " : \n");
-                List<LogSegmentMetadata> segments = new ArrayList<LogSegmentMetadata>(entry.getValue().size());
                 for (Pair<LogSegmentMetadata, List<String>> pair : entry.getValue()) {
-                    segments.add(pair.getLeft());
                     System.out.println("\t - " + pair.getLeft());
                     if (printInprogressOnly && dumpEntries) {
                         int i = 0;
@@ -2592,10 +2586,6 @@ public class DistributedLogTool extends Tool {
             options.addOption("b64", "base64", true, "Base64 encoded dlsn");
         }
 
-        public void setBase64DLSN(String base64Dlsn) {
-            base64Dlsn = base64Dlsn;
-        }
-
         protected void parseCommandLine(CommandLine cmdline) throws ParseException {
             if (cmdline.hasOption("b64")) {
                 base64Dlsn = cmdline.getOptionValue("b64");
@@ -2622,10 +2612,6 @@ public class DistributedLogTool extends Tool {
             options.addOption("x", "hex", false, "Emit hex-encoded string DLSN instead of base 64");
         }
 
-        public void setDLSN(DLSN dlsn) {
-            dlsn = dlsn;
-        }
-
         protected void parseCommandLine(CommandLine cmdline) throws ParseException {
             if (cmdline.hasOption("dlsn")) {
                 dlsn = parseDLSN(cmdline.getOptionValue("dlsn"));

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
index 42c624a..803db90 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
@@ -165,7 +165,7 @@ public class DLUtils {
      */
     public static long deserializeLogSegmentSequenceNumber(byte[] data) {
         String seqNoStr = new String(data, UTF_8);
-        return Long.valueOf(seqNoStr);
+        return Long.parseLong(seqNoStr);
     }
 
     /**
@@ -189,7 +189,7 @@ public class DLUtils {
      */
     public static long deserializeTransactionId(byte[] data) {
         String seqNoStr = new String(data, UTF_8);
-        return Long.valueOf(seqNoStr);
+        return Long.parseLong(seqNoStr);
     }
 
     /**
@@ -222,6 +222,6 @@ public class DLUtils {
      * @return ledger id
      */
     public static long bytes2LedgerId(byte[] data) {
-        return Long.valueOf(new String(data, UTF_8));
+        return Long.parseLong(new String(data, UTF_8));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml
index 16d9c6c..684b827 100644
--- a/distributedlog-core/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-core/src/main/resources/findbugsExclude.xml
@@ -22,13 +22,14 @@
   </Match>
   <Match>
     <!-- it is safe to store external bytes reference here. //-->
-    <Class name="com.twitter.distributedlog.LogRecord" />
+    <Class name="com.twitter.distributedlog.Entry$Builder" />
+    <Method name="setData" />
     <Bug pattern="EI_EXPOSE_REP2" />
   </Match>
   <Match>
     <!-- it is safe to store external bytes reference here. //-->
-    <Class name="com.twitter.distributedlog.LogRecord" />
-    <Method name="getPayload" />
+    <Class name="com.twitter.distributedlog.Entry" />
+    <Method name="getRawData" />
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
   <Match>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/bk-cluster
----------------------------------------------------------------------
diff --git a/distributedlog-example/bin/bk-cluster b/distributedlog-example/bin/bk-cluster
deleted file mode 100755
index 827153e..0000000
--- a/distributedlog-example/bin/bk-cluster
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-#
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-BASEDIR=$(dirname "$0")
-DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.."
-
-cd ${DISTRIBUTEDLOG_ROOT} &&\
-mvn clean install -pl distributedlog-core -am -DskipTests &&\
-distributedlog-core/bin/dlog local 7000

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/proxy-cluster
----------------------------------------------------------------------
diff --git a/distributedlog-example/bin/proxy-cluster b/distributedlog-example/bin/proxy-cluster
deleted file mode 100755
index cc16944..0000000
--- a/distributedlog-example/bin/proxy-cluster
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-#
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-BASEDIR=$(dirname "$0")
-DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.."
-
-cd ${DISTRIBUTEDLOG_ROOT} &&\
-mvn clean install -Ptwitter-ostrich-provider -pl distributedlog-example -am -DskipTests &&\
-BUILT_JAR=`ls distributedlog-example/target/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1` &&
-java -cp .:distributedlog-example/lib/*:$BUILT_JAR \
-    -Dlog4j.configuration=distributedlog-example/conf/log4j.properties \
-    -DstatsHttpPort=9000 -DstatsExport=true \
-    -Dserver_shard=0 \
-    com.twitter.distributedlog.example.ProxyClusterEmulator \
-    --port 8000 \
-    --thriftmux

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/proxy-writer
----------------------------------------------------------------------
diff --git a/distributedlog-example/bin/proxy-writer b/distributedlog-example/bin/proxy-writer
deleted file mode 100755
index 5723cb0..0000000
--- a/distributedlog-example/bin/proxy-writer
+++ /dev/null
@@ -1,35 +0,0 @@
-#!/bin/bash
-#
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-BASEDIR=$(dirname "$0")
-DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.."
-
-cd ${DISTRIBUTEDLOG_ROOT} &&\
-mvn clean install -P twitter-ostrich-provider -pl distributedlog-benchmark -am -DskipTests &&\
-BUILT_JAR=`ls distributedlog-benchmark/target/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1` &&
-java -cp .:distributedlog-benchmark/lib/*:$BUILT_JAR \
-    -Dlog4j.configuration=distributedlog-example/conf/log4j.properties \
-    -DstatsHttpPort=9001 -DstatsExport=true \
-    com.twitter.distributedlog.benchmark.Benchmarker \
-    --mode write --shard 0 --finagle-name 'inet!127.0.0.1:8000' \
-    --duration 100000 --rate 10 --concurrency 1 \
-    --start-stream-id 0 --end-stream-id 1 \
-    --streamprefix example-stream \
-    --provider org.apache.bookkeeper.stats.twitter.ostrich.OstrichProvider

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/distributedlog-example/conf/bk_server.conf b/distributedlog-example/conf/bk_server.conf
deleted file mode 100644
index 3e2f44b..0000000
--- a/distributedlog-example/conf/bk_server.conf
+++ /dev/null
@@ -1,145 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-## Bookie settings
-
-# Max file size of entry logger, in bytes
-# A new entry log file will be created when the old one reaches the file size limitation
-logSizeLimit=1073741823
-
-# Max file size of journal file, in mega bytes
-# A new journal file will be created when the old one reaches the file size limitation
-#
-journalMaxSizeMB=2048
-
-# Max number of old journal file to kept
-# Keep a number of old journal files would help data recovery in specia case
-#
-journalMaxBackups=5
-
-# How long the interval to trigger next garbage collection, in milliseconds
-# Since garbage collection is running in background, too frequent gc
-# will heart performance. It is better to give a higher number of gc
-# interval if there is enough disk capacity.
-#
-# gc per 20 minutes (even there is nothing to gc, it would scan entry log files
-# to get ledgers mapping for next gc cycle. this would help if we have pretty high
-# write volume)
-gcWaitTime=1200000
-# do minor compaction per 1 hours
-minorCompactionInterval=3600
-minorCompactionThreshold=0.2
-# disable major compaction
-majorCompactionInterval=0
-# reduce major compaction threshold to a low value to prevent bad force compaction behavior
-majorCompactionThreshold=0.3
-# disk usage
-diskUsageThreshold=0.97
-# increase warn threshold to avoid bad force compaction behavior
-diskUsageWarnThreshold=0.96
-
-# How long the interval to flush ledger index pages to disk, in milliseconds
-# Flushing index files will introduce much random disk I/O.
-# If separating journal dir and ledger dirs each on different devices,
-# flushing would not affect performance. But if putting journal dir
-# and ledger dirs on same device, performance degrade significantly
-# on too frequent flushing. You can consider increment flush interval
-# to get better performance, but you need to pay more time on bookie
-# server restart after failure.
-#
-flushInterval=1000
-
-# ZooKeeper client session timeout in milliseconds
-# Bookie server will exit if it received SESSION_EXPIRED because it
-# was partitioned off from ZooKeeper for more than the session timeout
-# JVM garbage collection, disk I/O will cause SESSION_EXPIRED.
-# Increment this value could help avoiding this issue
-zkTimeout=60000
-
-## NIO Server settings
-
-# This settings is used to enabled/disabled Nagle's algorithm, which is a means of
-# improving the efficiency of TCP/IP networks by reducing the number of packets
-# that need to be sent over the network.
-# If you are sending many small messages, such that more than one can fit in
-# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm
-# can provide better performance.
-# Default value is true.
-#
-serverTcpNoDelay=true
-
-## ledger cache settings
-
-# Max number of ledger index files could be opened in bookie server
-# If number of ledger index files reaches this limitation, bookie
-# server started to swap some ledgers from memory to disk.
-# Too frequent swap will affect performance. You can tune this number
-# to gain performance according your requirements.
-openFileLimit=20000
-
-# Size of a index page in ledger cache, in bytes
-# A larger index page can improve performance writing page to disk,
-# which is efficent when you have small number of ledgers and these
-# ledgers have similar number of entries.
-# If you have large number of ledgers and each ledger has fewer entries,
-# smaller index page would improve memory usage.
-pageSize=8192
-
-# How many index pages provided in ledger cache
-# If number of index pages reaches this limitation, bookie server
-# starts to swap some ledgers from memory to disk. You can increment
-# this value when you found swap became more frequent. But make sure
-# pageLimit*pageSize should not more than JVM max memory limitation,
-# otherwise you would got OutOfMemoryException.
-# In general, incrementing pageLimit, using smaller index page would
-# gain bettern performance in lager number of ledgers with fewer entries case
-# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute
-# the limitation of number of index pages.
-pageLimit=131072
-
-#If all ledger directories configured are full, then support only read requests for clients.
-#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted
-#to read-only mode and serve only read requests. Otherwise the bookie will be shutdown.
-readOnlyModeEnabled=true
-
-# Bookie Journal Settings
-writeBufferSizeBytes=524288
-journalFlushWhenQueueEmpty=false
-journalRemoveFromPageCache=true
-journalAdaptiveGroupWrites=true
-journalMaxGroupWaitMSec=6
-journalBufferedEntriesThreshold=180
-journalBufferedWritesThreshold=262144
-journalMaxGroupedEntriesToCommit=200
-journalPreAllocSizeMB=4
-journalFlushWhenQueueEmpty=true
-
-# Sorted Ledger Storage Settings
-sortedLedgerStorageEnabled=true
-skipListSizeLimit=67108864
-skipListArenaChunkSize=2097152
-skipListArenaMaxAllocSize=131072
-fileInfoCacheInitialCapacity=10000
-fileInfoMaxIdleTime=3600
-
-# Bookie Threads Settings
-numAddWorkerThreads=24
-numJournalCallbackThreads=48
-numReadWorkerThreads=72
-numLongPollWorkerThreads=72
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/distributedlog.conf
----------------------------------------------------------------------
diff --git a/distributedlog-example/conf/distributedlog.conf b/distributedlog-example/conf/distributedlog.conf
deleted file mode 100644
index dac71ac..0000000
--- a/distributedlog-example/conf/distributedlog.conf
+++ /dev/null
@@ -1,125 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-########################
-# ZooKeeper Client Settings
-########################
-
-# zookeeper settings
-zkSessionTimeoutSeconds=30
-zkNumRetries=0
-zkRetryStartBackoffMillis=100
-zkRetryMaxBackoffMillis=200
-# bkc zookeeper settings
-bkcZKSessionTimeoutSeconds=60
-bkcZKNumRetries=20
-bkcZKRetryStartBackoffMillis=100
-bkcZKRetryMaxBackoffMillis=200
-
-########################
-# BookKeeper Client Settings
-########################
-
-# bookkeeper client timeouts
-bkcWriteTimeoutSeconds=10
-bkcReadTimeoutSeconds=1
-bkcNumWorkerThreads=16
-# bkcNumIOThreads=16
-bkc.numChannelsPerBookie=1
-bkc.enableTaskExecutionStats=true
-bkc.connectTimeoutMillis=1000
-bkc.enablePerHostStats=true
-
-########################
-# DL Settings
-########################
-
-# lock timeout
-lockTimeoutSeconds=0
-# dl worker threads
-numWorkerThreads=16
-
-### Recovery Related Settings
-
-# recover log segments in background
-recoverLogSegmentsInBackground=true
-# disable max id in proxy
-maxIdSanityCheck=true
-# use allocator pool for proxy
-enableLedgerAllocatorPool=false
-# ledger allocator pool size
-ledgerAllocatorPoolCoreSize=20
-# check stream exists or not
-createStreamIfNotExists=true
-# encode dc id in version
-encodeDCIDInVersion=true
-# logSegmentNameVersion
-logSegmentNameVersion=1
-
-### Write Performance Related Settings
-
-# ensemble size
-ensemble-size=3
-write-quorum-size=3
-ack-quorum-size=2
-bkc.ensemblePlacementPolicy=org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
-bkc.delayEnsembleChange=true
-
-# sync settings
-# buffer size is large because when we rewrite we perform a very large write to persist
-# all queue state at once (up to max queue memory size, ex. 16MB). the write will be
-# throttled if it takes too long, which can hurt performance, so important to optimize
-# for this case.
-output-buffer-size=512000
-enableImmediateFlush=false
-periodicFlushFrequencyMilliSeconds=6
-logFlushTimeoutSeconds=120
-
-### Ledger Rolling Related Settings
-
-# retention policy
-retention-size=0
-# rolling ledgers (disable time rolling/enable size rolling)
-rolling-interval=0
-
-# max logsegment bytes=2GB
-# much larger than max journal size, effectively never roll and let drpc do it
-maxLogSegmentBytes=2147483648
-
-# rolling concurrency
-logSegmentRollingConcurrency=1
-# disable sanityCheckDelete
-sanityCheckDelete=false
-ledgerAllocatorPoolName=drpc-alloc-pool
-
-### Readahead settings
-
-enableReadAhead=true
-ReadAheadBatchSize=10
-ReadAheadMaxEntries=100
-ReadAheadWaitTime=10
-
-### Rate limit
-
-rpsSoftWriteLimit=1
-rpsHardWriteLimit=5
-rpsHardServiceLimit=15
-
-### Config
-
-dynamicConfigReloadIntervalSec=5

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-example/conf/log4j.properties b/distributedlog-example/conf/log4j.properties
deleted file mode 100644
index d8bf9fe..0000000
--- a/distributedlog-example/conf/log4j.properties
+++ /dev/null
@@ -1,44 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-#
-# DistributedLog Logging Configuration
-#
-
-dlog.root.logger=DEBUG, stderr
-dlog.log.dir=.
-dlog.log.file=bookkeeper-server.log
-
-log4j.rootLogger=${dlog.root.logger}
-
-log4j.logger.org.apache.zookeeper=WARN
-log4j.logger.org.apache.bookkeeper=WARN
-
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.Threshold=INFO
-log4j.appender.R.File=${dlog.log.dir}/${dlog.log.file}
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.Threshold=DEBUG
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/stream_config/example-stream_0.conf
----------------------------------------------------------------------
diff --git a/distributedlog-example/conf/stream_config/example-stream_0.conf b/distributedlog-example/conf/stream_config/example-stream_0.conf
deleted file mode 100644
index 647c870..0000000
--- a/distributedlog-example/conf/stream_config/example-stream_0.conf
+++ /dev/null
@@ -1,23 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-output-buffer-size=512000
-periodicFlushFrequencyMilliSeconds=6
-retention-size=0
-rpsSoftWriteLimit=1
-rpsHardWriteLimit=20

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-example/pom.xml b/distributedlog-example/pom.xml
deleted file mode 100644
index e0867cf..0000000
--- a/distributedlog-example/pom.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>com.twitter</groupId>
-    <artifactId>distributedlog</artifactId>
-    <version>0.4.0-incubating-SNAPSHOT</version>
-  </parent>
-  <artifactId>distributedlog-example</artifactId>
-  <name>Apache DistributedLog :: Example</name>
-  <dependencies>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>distributedlog-client</artifactId>
-      <version>${project.parent.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <version>3.5.1-alpha</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>distributedlog-service</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>finagle-ostrich4_2.11</artifactId>
-      <version>${birdcage.sha}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>finagle-thriftmux_2.11</artifactId>
-      <version>${birdcage.sha}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>1.6.4</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>1.6.4</version>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.8.1</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <configuration>
-          <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${basedir}/lib</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>twitter-ostrich-provider</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.bookkeeper.stats</groupId>
-          <artifactId>twitter-ostrich-provider</artifactId>
-          <version>${bookkeeper.version}</version>
-          <exclusions>
-            <exclusion>
-              <groupId>com.twitter</groupId>
-              <artifactId>ostrich_2.10</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.twitter</groupId>
-              <artifactId>ostrich_2.9.2</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java
----------------------------------------------------------------------
diff --git a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java b/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java
deleted file mode 100644
index 1b1474b..0000000
--- a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.example;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogReader;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.LogWriter;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-
-import java.net.URI;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class DistributedLogExample {
-
-    private static byte[] generatePayload(String prefix, long txn) {
-        return String.format("%s-%d", prefix, txn).getBytes(UTF_8);
-    }
-
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1) {
-            System.err.println("Usage: DistributedLogExample <uri>");
-            System.exit(-1);
-        }
-        URI uri = URI.create(args[0]);
-        // Create a distributedlog configuration
-        DistributedLogConfiguration conf =
-            new DistributedLogConfiguration()
-                .setLogSegmentRollingIntervalMinutes(60) // interval to roll log segment
-                .setRetentionPeriodHours(1) // retention period
-                .setWriteQuorumSize(2) // 2 replicas
-                .setAckQuorumSize(2) // 2 replicas
-                .setEnsembleSize(3); // how many hosts to store a log segment
-        // Create a distributedlog
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(conf)
-                .uri(uri)
-                .build();
-
-        DistributedLogManager unpartitionedDLM =
-            namespace.openLog("unpartitioned-example");
-        System.out.println("Create unpartitioned stream : unpartitioned-example");
-        LogWriter unpartitionedWriter = unpartitionedDLM.startLogSegmentNonPartitioned();
-        for (long i = 1; i <= 10; i++) {
-            LogRecord record = new LogRecord(i, generatePayload("unpartitioned-example", i));
-            unpartitionedWriter.write(record);
-        }
-        unpartitionedWriter.close();
-        System.out.println("Write 10 records into unpartitioned stream.");
-        LogReader unpartitionedReader = unpartitionedDLM.getInputStream(1);
-        System.out.println("Read unpartitioned stream : unpartitioned-example");
-        LogRecord unpartitionedRecord = unpartitionedReader.readNext(false);
-        while (null != unpartitionedRecord) {
-            System.out.println(String.format("txn %d : %s",
-                    unpartitionedRecord.getTransactionId(), new String(unpartitionedRecord.getPayload(), "UTF-8")));
-            unpartitionedRecord = unpartitionedReader.readNext(false);
-        }
-        unpartitionedReader.close();
-        System.out.println("Read unpartitioned stream done.");
-        System.out.println("Read unpartitioned stream : unpartitioned-example from txn 5");
-        LogReader unpartitionedReader2 = unpartitionedDLM.getInputStream(5);
-        LogRecord unpartitionedRecord2 = unpartitionedReader2.readNext(false);
-        while (null != unpartitionedRecord2) {
-            System.out.println(String.format("txn %d : %s",
-                    unpartitionedRecord2.getTransactionId(), new String(unpartitionedRecord2.getPayload(), "UTF-8")));
-            unpartitionedRecord2 = unpartitionedReader2.readNext(false);
-        }
-        unpartitionedReader2.close();
-        System.out.println("Read unpartitioned stream done.");
-        unpartitionedDLM.delete();
-        unpartitionedDLM.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java b/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java
deleted file mode 100644
index c845cbe..0000000
--- a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.example;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.service.DistributedLogCluster;
-import com.twitter.distributedlog.service.DistributedLogServerApp;
-
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Main class for DistributedLogCluster emulator
- */
-public class ProxyClusterEmulator {
-    static final Logger LOG = LoggerFactory.getLogger(ProxyClusterEmulator.class);
-
-    private final DistributedLogCluster dlCluster;
-    private final String[] args;
-
-    public ProxyClusterEmulator(String[] args) throws Exception {
-        DistributedLogConfiguration conf = new DistributedLogConfiguration();
-        conf.setImmediateFlushEnabled(true);
-        conf.setOutputBufferSize(0);
-        conf.setPeriodicFlushFrequencyMilliSeconds(0);
-        conf.setLockTimeout(0);
-        this.dlCluster = DistributedLogCluster.newBuilder()
-            .numBookies(3)
-            .shouldStartZK(true)
-            .zkServers("127.0.0.1")
-            .shouldStartProxy(false) // We'll start it separately so we can pass args.
-            .dlConf(conf)
-            .build();
-        this.args = args;
-    }
-
-    public void start() throws Exception {
-        dlCluster.start();
-
-        // Run the server with bl cluster info.
-        String[] extendedArgs = new String[args.length + 2];
-        System.arraycopy(args, 0, extendedArgs, 0, args.length);
-        extendedArgs[extendedArgs.length - 2] = "-u";
-        extendedArgs[extendedArgs.length - 1] = dlCluster.getUri().toString();
-        LOG.debug("Using args {}", Arrays.toString(extendedArgs));
-        DistributedLogServerApp.main(extendedArgs);
-    }
-
-    public void stop() throws Exception {
-        dlCluster.stop();
-    }
-
-    public static void main(String[] args) throws Exception {
-        ProxyClusterEmulator emulator = null;
-        try {
-            emulator = new ProxyClusterEmulator(args);
-            emulator.start();
-        } catch (Exception ex) {
-            if (null != emulator) {
-                emulator.stop();
-            }
-            System.out.println("Exception occurred running emulator " + ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java b/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java
deleted file mode 100644
index 4897eb4..0000000
--- a/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.example;
-
-import java.net.BindException;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class TestProxyClusterEmulator {
-    static final Logger logger = LoggerFactory.getLogger(TestProxyClusterEmulator.class);
-
-    @Test(timeout = 60000)
-    public void testStartup() throws Exception {
-        final ProxyClusterEmulator emulator = new ProxyClusterEmulator(new String[] {"-port", "8000"});
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        final CountDownLatch started = new CountDownLatch(1);
-        final CountDownLatch finished = new CountDownLatch(1);
-        Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    started.countDown();
-                    emulator.start();
-                } catch (BindException ex) {
-                } catch (InterruptedException ex) {
-                } catch (Exception ex) {
-                    failed.set(true);
-                } finally {
-                    finished.countDown();
-                }
-            }
-        });
-        thread.start();
-        started.await();
-        Thread.sleep(1000);
-        thread.interrupt();
-        finished.await();
-        emulator.stop();
-        assert(!failed.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java
index a52976c..0b08471 100644
--- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java
+++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java
@@ -20,8 +20,8 @@ package com.twitter.distributedlog.util;
 import java.util.zip.CRC32;
 
 import com.twitter.distributedlog.DLSN;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
 
 /**
  * With CRC embedded in the application, we have to keep track of per api crc. Ideally this
@@ -29,8 +29,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ProtocolUtils {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProtocolUtils.class);
-
     // For request payload checksum
     private static final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() {
         @Override
@@ -45,7 +43,7 @@ public class ProtocolUtils {
     public static Long writeOpCRC32(String stream, byte[] payload) {
         CRC32 crc = requestCRC.get();
         try {
-            crc.update(stream.getBytes());
+            crc.update(stream.getBytes(UTF_8));
             crc.update(payload);
             return crc.getValue();
         } finally {
@@ -59,9 +57,8 @@ public class ProtocolUtils {
     public static Long truncateOpCRC32(String stream, DLSN dlsn) {
         CRC32 crc = requestCRC.get();
         try {
-            crc.update(stream.getBytes());
+            crc.update(stream.getBytes(UTF_8));
             crc.update(dlsn.serializeBytes());
-            long result = crc.getValue();
             return crc.getValue();
         } finally {
             crc.reset();
@@ -74,8 +71,7 @@ public class ProtocolUtils {
     public static Long streamOpCRC32(String stream) {
         CRC32 crc = requestCRC.get();
         try {
-            crc.update(stream.getBytes());
-            long result = crc.getValue();
+            crc.update(stream.getBytes(UTF_8));
             return crc.getValue();
         } finally {
             crc.reset();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-protocol/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/resources/findbugsExclude.xml b/distributedlog-protocol/src/main/resources/findbugsExclude.xml
index 29e1a16..6b2197b 100644
--- a/distributedlog-protocol/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-protocol/src/main/resources/findbugsExclude.xml
@@ -20,4 +20,21 @@
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~com\.twitter\.distributedlog\.thrift.*" />
   </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="com.twitter.distributedlog.LogRecord" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="com.twitter.distributedlog.LogRecord" />
+    <Method name="getPayload" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="com.twitter.distributedlog.io.Buffer" />
+    <Method name="getData" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index 72af11b..96a37cd 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -37,6 +37,7 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.ConstFuture;
 import com.twitter.util.Future$;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index 3f26f43..aa08a24 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -210,11 +210,12 @@ public class StreamManagerImpl implements StreamManager {
     @Override
     public Map<String, String> getStreamOwnershipMap(Optional<String> regex) {
         Map<String, String> ownershipMap = new HashMap<String, String>();
-        for (String name : acquiredStreams.keySet()) {
+        for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) {
+            String name = entry.getKey();
             if (regex.isPresent() && !name.matches(regex.get())) {
                 continue;
             }
-            Stream stream = acquiredStreams.get(name);
+            Stream stream = entry.getValue();
             if (null == stream) {
                 continue;
             }
@@ -248,7 +249,7 @@ public class StreamManagerImpl implements StreamManager {
 
                 // add partition to cached map
                 if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) {
-                    throw new StreamUnavailableException("Stream " + stream
+                    throw new StreamUnavailableException("Stream " + streamName
                             + " is not allowed to cache more than " + maxCachedPartitions + " partitions");
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
index b9c6a32..b37de10 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
@@ -52,7 +52,6 @@ public class ProxyTool extends Tool {
     protected abstract static class ClusterCommand extends OptsCommand {
 
         protected Options options = new Options();
-        protected String dc;
         protected URI uri;
         protected final List<String> streams = new ArrayList<String>();
 



Mime
View raw message