distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [3/4] incubator-distributedlog git commit: DL-132: Enable check style for distributedlog service module.
Date Wed, 04 Jan 2017 08:44:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
index 111a874..0ee7db4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
@@ -21,26 +21,27 @@ import com.google.common.base.Optional;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Provide per stream configuration to DistributedLog service layer.
  */
 public class ServiceStreamConfigProvider implements StreamConfigProvider {
-    static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+    private static final String CONFIG_EXTENSION = "conf";
 
     private final File configBaseDir;
     private final File defaultConfigFile;
     private final StreamPartitionConverter partitionConverter;
     private final DynamicConfigurationFactory configFactory;
     private final DynamicDistributedLogConfiguration defaultDynConf;
-    private final static String CONFIG_EXTENSION = "conf";
 
     public ServiceStreamConfigProvider(String configPath,
                                        String defaultConfigPath,
@@ -51,11 +52,13 @@ public class ServiceStreamConfigProvider implements StreamConfigProvider {
                                        throws ConfigurationException {
         this.configBaseDir = new File(configPath);
         if (!configBaseDir.exists()) {
-            throw new ConfigurationException("Stream configuration base directory " + configPath + " does not exist");
+            throw new ConfigurationException("Stream configuration base directory "
+                + configPath + " does not exist");
         }
         this.defaultConfigFile = new File(configPath);
         if (!defaultConfigFile.exists()) {
-            throw new ConfigurationException("Stream configuration default config " + defaultConfigPath + " does not exist");
+            throw new ConfigurationException("Stream configuration default config "
+                + defaultConfigPath + " does not exist");
         }
 
         // Construct reloading default configuration

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
new file mode 100644
index 0000000..bb0026a
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Server Configurations.
+ */
+package com.twitter.distributedlog.service.config;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
new file mode 100644
index 0000000..4fb3673
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Proxy Service.
+ */
+package com.twitter.distributedlog.service;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
index 144e358..fb2d6d2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,18 +20,20 @@ package com.twitter.distributedlog.service.placement;
 import com.twitter.util.Future;
 
 /**
- * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * Equal Load Appraiser.
+ *
+ * <p>Created for those who hold these truths to be self-evident, that all streams are created equal,
  * that they are endowed by their creator with certain unalienable loads, that among these are
  * Uno, Eins, and One.
  */
 public class EqualLoadAppraiser implements LoadAppraiser {
-  @Override
-  public Future<StreamLoad> getStreamLoad(String stream) {
-    return Future.value(new StreamLoad(stream, 1));
-  }
+    @Override
+    public Future<StreamLoad> getStreamLoad(String stream) {
+        return Future.value(new StreamLoad(stream, 1));
+    }
 
-  @Override
-  public Future<Void> refreshCache() {
-    return Future.value(null);
-  }
+    @Override
+    public Future<Void> refreshCache() {
+        return Future.value(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 8c8dc23..c25c267 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,12 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,174 +30,171 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
-import scala.Function1;
-import scala.runtime.BoxedUnit;
-
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.Stats;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
 
 /**
- * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * Least Load Placement Policy.
+ *
+ * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
  * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
  * the load of a server would be. This placement policy then distributes these streams across the
  * servers.
  */
 public class LeastLoadPlacementPolicy extends PlacementPolicy {
-  private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-  private Map<String, String> streamToServer = new HashMap<String, String>();
-
-  public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                                  DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                                  Duration refreshInterval, StatsLogger statsLogger) {
-    super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
-    statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
-      @Override
-      public Number getDefaultValue() {
-        return 0;
-      }
-
-      @Override
-      public Number getSample() {
-        if (serverLoads.size() > 0) {
-          return serverLoads.last().getLoad() - serverLoads.first().getLoad();
-        } else {
-          return getDefaultValue();
-        }
-      }
-    });
-  }
-
-  private synchronized String getStreamOwner(String stream) {
-    return streamToServer.get(stream);
-  }
-
-  @Override
-  public Future<String> placeStream(String stream) {
-    String streamOwner = getStreamOwner(stream);
-    if (null != streamOwner) {
-      return Future.value(streamOwner);
+
+    private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
+
+    private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+    private Map<String, String> streamToServer = new HashMap<String, String>();
+
+    public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                                    DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                                    Duration refreshInterval, StatsLogger statsLogger) {
+        super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+        statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                if (serverLoads.size() > 0) {
+                    return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+                } else {
+                    return getDefaultValue();
+                }
+            }
+        });
     }
-    Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
-    return streamLoadFuture.map(new Function<StreamLoad, String>() {
-      @Override
-      public String apply(StreamLoad streamLoad) {
-        return placeStreamSynchronized(streamLoad);
-      }
-    });
-  }
-
-  synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
-    ServerLoad serverLoad = serverLoads.pollFirst();
-    serverLoad.addStream(streamLoad);
-    serverLoads.add(serverLoad);
-    return serverLoad.getServer();
-  }
-
-  @Override
-  public void refresh() {
-    logger.info("Refreshing server loads.");
-    Future<Void> refresh = loadAppraiser.refreshCache();
-    final Set<String> servers = getServers();
-    final Set<String> allStreams = getStreams();
-    Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
-      @Override
-      public Future<TreeSet<ServerLoad>> apply(Void v1) {
-        return calculate(servers, allStreams);
-      }
-    });
-    serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-        try {
-          updateServerLoads(serverLoads);
-        } catch (PlacementStateManager.StateManagerSaveException e) {
-          logger.error("The refreshed mapping could not be persisted and will not be used.", e);
-        }
-        return BoxedUnit.UNIT;
-      }
-    });
-  }
-
-  synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
-    this.placementStateManager.saveOwnership(serverLoads);
-    this.streamToServer = serverLoadsToMap(serverLoads);
-    this.serverLoads = serverLoads;
-  }
-
-  @Override
-  synchronized public void load(TreeSet<ServerLoad> serverLoads) {
-    this.serverLoads = serverLoads;
-    this.streamToServer = serverLoadsToMap(serverLoads);
-  }
-
-  public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
-    logger.info("Calculating server loads");
-    final long startTime = System.currentTimeMillis();
-    ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
-
-    for (String stream: streams) {
-      Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
-      futures.add(streamLoad);
+
+    private synchronized String getStreamOwner(String stream) {
+        return streamToServer.get(stream);
     }
 
-    return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
-      @Override
-      public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
-        /* Sort streamLoads so largest streams are placed first for better balance */
-        TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
-        for (StreamLoad streamLoad: streamLoads) {
-          streamQueue.add(streamLoad);
+    @Override
+    public Future<String> placeStream(String stream) {
+        String streamOwner = getStreamOwner(stream);
+        if (null != streamOwner) {
+            return Future.value(streamOwner);
         }
+        Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+        return streamLoadFuture.map(new Function<StreamLoad, String>() {
+            @Override
+            public String apply(StreamLoad streamLoad) {
+                return placeStreamSynchronized(streamLoad);
+            }
+        });
+    }
 
-        TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-        for (String server: servers) {
-          ServerLoad serverLoad = new ServerLoad(server);
-          if (!streamQueue.isEmpty()) {
-            serverLoad.addStream(streamQueue.pollFirst());
-          }
-          serverLoads.add(serverLoad);
+    private synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
+        ServerLoad serverLoad = serverLoads.pollFirst();
+        serverLoad.addStream(streamLoad);
+        serverLoads.add(serverLoad);
+        return serverLoad.getServer();
+    }
+
+    @Override
+    public void refresh() {
+        logger.info("Refreshing server loads.");
+        Future<Void> refresh = loadAppraiser.refreshCache();
+        final Set<String> servers = getServers();
+        final Set<String> allStreams = getStreams();
+        Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
+            new Function<Void, Future<TreeSet<ServerLoad>>>() {
+            @Override
+            public Future<TreeSet<ServerLoad>> apply(Void v1) {
+                return calculate(servers, allStreams);
+            }
+        });
+        serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                try {
+                    updateServerLoads(serverLoads);
+                } catch (PlacementStateManager.StateManagerSaveException e) {
+                    logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+                }
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads)
+        throws PlacementStateManager.StateManagerSaveException {
+        this.placementStateManager.saveOwnership(serverLoads);
+        this.streamToServer = serverLoadsToMap(serverLoads);
+        this.serverLoads = serverLoads;
+    }
+
+    @Override
+    public synchronized void load(TreeSet<ServerLoad> serverLoads) {
+        this.serverLoads = serverLoads;
+        this.streamToServer = serverLoadsToMap(serverLoads);
+    }
+
+    public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+        logger.info("Calculating server loads");
+        final long startTime = System.currentTimeMillis();
+        ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+        for (String stream : streams) {
+            Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+            futures.add(streamLoad);
         }
 
-        while (!streamQueue.isEmpty()) {
-          ServerLoad serverLoad = serverLoads.pollFirst();
-          serverLoad.addStream(streamQueue.pollFirst());
-          serverLoads.add(serverLoad);
+        return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+            @Override
+            public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+        /* Sort streamLoads so largest streams are placed first for better balance */
+                TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+                for (StreamLoad streamLoad : streamLoads) {
+                    streamQueue.add(streamLoad);
+                }
+
+                TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+                for (String server : servers) {
+                    ServerLoad serverLoad = new ServerLoad(server);
+                    if (!streamQueue.isEmpty()) {
+                        serverLoad.addStream(streamQueue.pollFirst());
+                    }
+                    serverLoads.add(serverLoad);
+                }
+
+                while (!streamQueue.isEmpty()) {
+                    ServerLoad serverLoad = serverLoads.pollFirst();
+                    serverLoad.addStream(streamQueue.pollFirst());
+                    serverLoads.add(serverLoad);
+                }
+                return serverLoads;
+            }
+        }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+                return BoxedUnit.UNIT;
+            }
+        }).onFailure(new Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                logger.error("Failure calculating loads", t);
+                placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+        HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+        for (ServerLoad serverLoad : serverLoads) {
+            for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
+                streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+            }
         }
-        return serverLoads;
-      }
-    }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-        placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
-        return BoxedUnit.UNIT;
-      }
-    }).onFailure(new Function<Throwable, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(Throwable t) {
-        logger.error("Failure calculating loads", t);
-        placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
-        return BoxedUnit.UNIT;
-      }
-    });
-  }
-
-  private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
-    HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
-    for (ServerLoad serverLoad: serverLoads) {
-      for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
-        streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
-      }
+        return streamToServer;
     }
-    return streamToServer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
index 784f106..53c568a 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -19,7 +19,21 @@ package com.twitter.distributedlog.service.placement;
 
 import com.twitter.util.Future;
 
+/**
+ * Interface for load appraiser.
+ */
 public interface LoadAppraiser {
-  Future<StreamLoad> getStreamLoad(String stream);
-  Future<Void> refreshCache();
+    /**
+     * Retrieve the stream load for a given {@code stream}.
+     *
+     * @param stream name of the stream
+     * @return the stream load of the stream.
+     */
+    Future<StreamLoad> getStreamLoad(String stream);
+
+    /**
+     * Refesch the cache.
+     * @return
+     */
+    Future<Void> refreshCache();
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
index 2044428..46e8940 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,15 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -24,125 +33,116 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.TreeSet;
-
-import scala.runtime.BoxedUnit;
-
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
+import scala.runtime.BoxedUnit;
 
 /**
- * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
- * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
+ *
+ * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
  * then distributed these StreamLoads to the available servers in a manner defined by the
  * implementation creating ServerLoad objects. It then saves this assignment via the
  * PlacementStateManager.
  */
 public abstract class PlacementPolicy {
-  protected final LoadAppraiser loadAppraiser;
-  protected final RoutingService routingService;
-  protected final DistributedLogNamespace namespace;
-  protected final PlacementStateManager placementStateManager;
-  private final Duration refreshInterval;
 
-  protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
-  protected final OpStatsLogger placementCalcStats;
-  private Timer placementRefreshTimer;
+    private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
 
-  public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                         DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                         Duration refreshInterval, StatsLogger statsLogger) {
-    this.loadAppraiser = loadAppraiser;
-    this.routingService = routingService;
-    this.namespace = namespace;
-    this.placementStateManager = placementStateManager;
-    this.refreshInterval = refreshInterval;
-    placementCalcStats = statsLogger.getOpStatsLogger("placement");
-  }
+    protected final LoadAppraiser loadAppraiser;
+    protected final RoutingService routingService;
+    protected final DistributedLogNamespace namespace;
+    protected final PlacementStateManager placementStateManager;
+    private final Duration refreshInterval;
+    protected final OpStatsLogger placementCalcStats;
+    private Timer placementRefreshTimer;
 
-  public Set<String> getServers() {
-    Set<SocketAddress> hosts = routingService.getHosts();
-    Set<String> servers = new HashSet<String>(hosts.size());
-    for (SocketAddress address: hosts) {
-      servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+    public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                           DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                           Duration refreshInterval, StatsLogger statsLogger) {
+        this.loadAppraiser = loadAppraiser;
+        this.routingService = routingService;
+        this.namespace = namespace;
+        this.placementStateManager = placementStateManager;
+        this.refreshInterval = refreshInterval;
+        placementCalcStats = statsLogger.getOpStatsLogger("placement");
     }
-    return servers;
-  }
 
-  public Set<String> getStreams() {
-    Set<String> streams = new HashSet<String>();
-    try {
-      Iterator<String> logs = namespace.getLogs();
-      while (logs.hasNext()) {
-        streams.add(logs.next());
-      }
-    } catch (IOException e) {
-      logger.error("Could not get streams for placement policy.", e);
+    public Set<String> getServers() {
+        Set<SocketAddress> hosts = routingService.getHosts();
+        Set<String> servers = new HashSet<String>(hosts.size());
+        for (SocketAddress address : hosts) {
+            servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+        }
+        return servers;
     }
-    return streams;
-  }
-
-  public void start(boolean leader) {
-    logger.info("Starting placement policy");
 
-    TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
-    for (String server: getServers()) {
-      emptyServerLoads.add(new ServerLoad(server));
+    public Set<String> getStreams() {
+        Set<String> streams = new HashSet<String>();
+        try {
+            Iterator<String> logs = namespace.getLogs();
+            while (logs.hasNext()) {
+                streams.add(logs.next());
+            }
+        } catch (IOException e) {
+            logger.error("Could not get streams for placement policy.", e);
+        }
+        return streams;
     }
-    load(emptyServerLoads); //Pre-Load so streams don't NPE
-    if (leader) { //this is the leader shard
-      logger.info("Shard is leader. Scheduling timed refresh.");
-      placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
-      placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
-        @Override
-        public BoxedUnit apply() {
-          refresh();
-          return BoxedUnit.UNIT;
+
+    public void start(boolean leader) {
+        logger.info("Starting placement policy");
+
+        TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+        for (String server : getServers()) {
+            emptyServerLoads.add(new ServerLoad(server));
         }
-      });
-    } else {
-      logger.info("Shard is not leader. Watching for server load changes.");
-      placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
-        @Override
-        public void callback(TreeSet<ServerLoad> serverLoads) {
-          if (!serverLoads.isEmpty()) {
-            load(serverLoads);
-          }
+        load(emptyServerLoads); //Pre-Load so streams don't NPE
+        if (leader) { //this is the leader shard
+            logger.info("Shard is leader. Scheduling timed refresh.");
+            placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+            placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    refresh();
+                    return BoxedUnit.UNIT;
+                }
+            });
+        } else {
+            logger.info("Shard is not leader. Watching for server load changes.");
+            placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+                @Override
+                public void callback(TreeSet<ServerLoad> serverLoads) {
+                    if (!serverLoads.isEmpty()) {
+                        load(serverLoads);
+                    }
+                }
+            });
         }
-      });
     }
-  }
 
-  public void close() {
-    if (placementRefreshTimer != null) {
-      placementRefreshTimer.stop();
+    public void close() {
+        if (placementRefreshTimer != null) {
+            placementRefreshTimer.stop();
+        }
     }
-  }
 
-  /**
-   * Places the stream on a server according to the policy and returns a future contianing the
-   * host that owns the stream upon completion
-   */
-  public abstract Future<String> placeStream(String stream);
+    /**
+     * Places the stream on a server according to the policy.
+     *
+     * <p>It returns a future containing the host that owns the stream upon completion
+     */
+    public abstract Future<String> placeStream(String stream);
 
-  /**
-   * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
-   */
-  public abstract void refresh();
+    /**
+     * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
+     */
+    public abstract void refresh();
 
-  /**
-   * Loads the placement mapping into the node from a TreeSet of ServerLoads
-   */
-  public abstract void load(TreeSet<ServerLoad> serverLoads);
+    /**
+     * Loads the placement mapping into the node from a TreeSet of ServerLoads.
+     */
+    public abstract void load(TreeSet<ServerLoad> serverLoads);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
index cd0d906..17e4685 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,46 +20,60 @@ package com.twitter.distributedlog.service.placement;
 import java.util.TreeSet;
 
 /**
- * The PlacementStateManager handles persistence of calculated resource placements including, the
- * storage once the calculated, and the retrieval by the other shards.
+ * The PlacementStateManager handles persistence of calculated resource placements.
  */
 public interface PlacementStateManager {
 
-  /**
-   * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
-   */
-  void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+    /**
+     * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage.
+     */
+    void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
 
-  /**
-   * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
-   */
-  TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+    /**
+     * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage.
+     */
+    TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
 
-  /**
-   * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
-   * with the new mapping when a change occurs
-   */
-  void watch(PlacementCallback placementCallback);
+    /**
+     * Watch the persistent storage for changes to the ownership mapping.
+     *
+     * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs.
+     */
+    void watch(PlacementCallback placementCallback);
 
-  interface PlacementCallback {
-    void callback(TreeSet<ServerLoad> serverLoads);
-  }
+    /**
+     * Placement Callback.
+     *
+     * <p>The callback is triggered when server loads are updated.
+     */
+    interface PlacementCallback {
+        void callback(TreeSet<ServerLoad> serverLoads);
+    }
 
-  abstract class StateManagerException extends Exception {
-    public StateManagerException(String message, Exception e) {
-      super(message, e);
+    /**
+     * The base exception thrown when state manager encounters errors.
+     */
+    abstract class StateManagerException extends Exception {
+        public StateManagerException(String message, Exception e) {
+            super(message, e);
+        }
     }
-  }
 
-  class StateManagerLoadException extends StateManagerException {
-    public StateManagerLoadException(Exception e) {
-      super("Load of Ownership failed", e);
+    /**
+     * Exception thrown when failed to load the ownership mapping.
+     */
+    class StateManagerLoadException extends StateManagerException {
+        public StateManagerLoadException(Exception e) {
+            super("Load of Ownership failed", e);
+        }
     }
-  }
 
-  class StateManagerSaveException extends StateManagerException {
-    public StateManagerSaveException(Exception e) {
-      super("Save of Ownership failed", e);
+    /**
+     * Exception thrown when failed to save the ownership mapping.
+     */
+    class StateManagerSaveException extends StateManagerException {
+        public StateManagerSaveException(Exception e) {
+            super("Save of Ownership failed", e);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
index 801e499..a0b4959 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,139 +17,142 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
-import com.twitter.distributedlog.service.placement.thrift.*;
-
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
- * A comparable data object containing the identifier of the server, total appraised load on the
+ * An object represents the server load.
+ *
+ * <p>A comparable data object containing the identifier of the server, total appraised load on the
  * server, and all streams assigned to the server by the resource placement mapping. This is
  * comparable first by load and then by server so that a sorted data structure of these will be
  * consistent across multiple calculations.
  */
 public class ServerLoad implements Comparable {
-  private static final int BUFFER_SIZE = 4096000;
-  private final String server;
-  private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
-  private long load = 0l;
-
-  public ServerLoad(String server) {
-    this.server = server;
-  }
-
-  synchronized public long addStream(StreamLoad stream) {
-    this.load += stream.getLoad();
-    streamLoads.add(stream);
-    return this.load;
-  }
-
-  synchronized public long removeStream(String stream) {
-    for (StreamLoad streamLoad : streamLoads) {
-      if (streamLoad.stream.equals(stream)) {
-        this.load -= streamLoad.getLoad();
-        streamLoads.remove(streamLoad);
+    private static final int BUFFER_SIZE = 4096000;
+    private final String server;
+    private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+    private long load = 0L;
+
+    public ServerLoad(String server) {
+        this.server = server;
+    }
+
+    public synchronized long addStream(StreamLoad stream) {
+        this.load += stream.getLoad();
+        streamLoads.add(stream);
         return this.load;
-      }
     }
-    return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
-  }
-
-  public synchronized long getLoad() {
-    return load;
-  }
-
-  public synchronized Set<StreamLoad> getStreamLoads() {
-    return streamLoads;
-  }
-
-  public synchronized String getServer() {
-    return server;
-  }
-
-  protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
-    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
-        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
-    tServerLoad.setServer(server);
-    tServerLoad.setLoad(load);
-    ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
-        = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
-    for (StreamLoad streamLoad: streamLoads) {
-      tStreamLoads.add(streamLoad.toThrift());
+
+    public synchronized long removeStream(String stream) {
+        for (StreamLoad streamLoad : streamLoads) {
+            if (streamLoad.stream.equals(stream)) {
+                this.load -= streamLoad.getLoad();
+                streamLoads.remove(streamLoad);
+                return this.load;
+            }
+        }
+        return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+    }
+
+    public synchronized long getLoad() {
+        return load;
+    }
+
+    public synchronized Set<StreamLoad> getStreamLoads() {
+        return streamLoads;
+    }
+
+    public synchronized String getServer() {
+        return server;
+    }
+
+    protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+        com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+            new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+        tServerLoad.setServer(server);
+        tServerLoad.setLoad(load);
+        ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads =
+            new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+        for (StreamLoad streamLoad : streamLoads) {
+            tStreamLoads.add(streamLoad.toThrift());
+        }
+        tServerLoad.setStreams(tStreamLoads);
+        return tServerLoad;
+    }
+
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize server load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize server load : ", uee);
+        }
+    }
+
+    public static ServerLoad deserialize(byte[] data) throws IOException {
+        com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+            new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tServerLoad.read(protocol);
+            ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+            if (tServerLoad.isSetStreams()) {
+                for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
+                    tServerLoad.getStreams()) {
+                    serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+                }
+            }
+            return serverLoad;
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize server load : ", e);
+        }
     }
-    tServerLoad.setStreams(tStreamLoads);
-    return tServerLoad;
-  }
-
-  public byte[] serialize() throws IOException {
-    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      toThrift().write(protocol);
-      transport.flush();
-      return transport.toString(UTF_8.name()).getBytes(UTF_8);
-    } catch (TException e) {
-      throw new IOException("Failed to serialize server load : ", e);
-    } catch (UnsupportedEncodingException uee) {
-      throw new IOException("Failed to serialize server load : ", uee);
+
+    @Override
+    public synchronized int compareTo(Object o) {
+        ServerLoad other = (ServerLoad) o;
+        if (load == other.getLoad()) {
+            return server.compareTo(other.getServer());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
     }
-  }
-
-  public static ServerLoad deserialize(byte[] data) throws IOException {
-    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
-        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
-    TMemoryInputTransport transport = new TMemoryInputTransport(data);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      tServerLoad.read(protocol);
-      ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
-      if (tServerLoad.isSetStreams()) {
-        for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
-          serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+
+    @Override
+    public synchronized boolean equals(Object o) {
+        if (!(o instanceof ServerLoad)) {
+            return false;
         }
-      }
-      return serverLoad;
-    } catch (TException e) {
-      throw new IOException("Failed to deserialize server load : ", e);
+        ServerLoad other = (ServerLoad) o;
+        return server.equals(other.getServer())
+            && load == other.getLoad()
+            && streamLoads.equals(other.getStreamLoads());
     }
-  }
-
-  @Override
-  public synchronized int compareTo(Object o) {
-    ServerLoad other = (ServerLoad) o;
-    if (load == other.getLoad()) {
-      return server.compareTo(other.getServer());
-    } else {
-      return Long.compare(load, other.getLoad());
+
+    @Override
+    public synchronized String toString() {
+        return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
     }
-  }
 
-  @Override
-  public synchronized boolean equals(Object o) {
-    if (!(o instanceof ServerLoad)) {
-      return false;
+    @Override
+    public synchronized int hashCode() {
+        return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
     }
-    ServerLoad other = (ServerLoad) o;
-    return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
-  }
-
-  @Override
-  public synchronized String toString() {
-    return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
-  }
-
-  @Override
-  public synchronized int hashCode() {
-    return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
index d7b7efd..c0b0ce1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,96 +17,99 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
- * A comparable data object containing the identifier of the stream and the appraised load produced
+ * An object represent the load of a stream.
+ *
+ * <p>A comparable data object containing the identifier of the stream and the appraised load produced
  * by the stream.
  */
 public class StreamLoad implements Comparable {
-  private static final int BUFFER_SIZE = 4096;
-  public final String stream;
-  private final int load;
+    private static final int BUFFER_SIZE = 4096;
+    public final String stream;
+    private final int load;
 
-  public StreamLoad(String stream, int load) {
-    this.stream = stream;
-    this.load = load;
-  }
+    public StreamLoad(String stream, int load) {
+        this.stream = stream;
+        this.load = load;
+    }
 
-  public int getLoad() {
-    return load;
-  }
+    public int getLoad() {
+        return load;
+    }
 
-  public String getStream() {
-    return stream;
-  }
+    public String getStream() {
+        return stream;
+    }
 
-  protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
-    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
-    return tStreamLoad.setStream(stream).setLoad(load);
-  }
+    protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+        com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+            new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+        return tStreamLoad.setStream(stream).setLoad(load);
+    }
 
-  public byte[] serialize() throws IOException {
-    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      toThrift().write(protocol);
-      transport.flush();
-      return transport.toString(UTF_8.name()).getBytes(UTF_8);
-    } catch (TException e) {
-      throw new IOException("Failed to serialize stream load : ", e);
-    } catch (UnsupportedEncodingException uee) {
-      throw new IOException("Failed to serialize stream load : ", uee);
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize stream load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize stream load : ", uee);
+        }
     }
-  }
 
-  public static StreamLoad deserialize(byte[] data) throws IOException {
-    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
-    TMemoryInputTransport transport = new TMemoryInputTransport(data);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      tStreamLoad.read(protocol);
-      return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
-    } catch (TException e) {
-      throw new IOException("Failed to deserialize stream load : ", e);
+    public static StreamLoad deserialize(byte[] data) throws IOException {
+        com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+            new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tStreamLoad.read(protocol);
+            return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize stream load : ", e);
+        }
     }
-  }
 
-  @Override
-  public int compareTo(Object o) {
-    StreamLoad other = (StreamLoad) o;
-    if (load == other.getLoad()) {
-      return stream.compareTo(other.getStream());
-    } else {
-      return Long.compare(load, other.getLoad());
+    @Override
+    public int compareTo(Object o) {
+        StreamLoad other = (StreamLoad) o;
+        if (load == other.getLoad()) {
+            return stream.compareTo(other.getStream());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
     }
-  }
 
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof StreamLoad)) {
-      return false;
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof StreamLoad)) {
+            return false;
+        }
+        StreamLoad other = (StreamLoad) o;
+        return stream.equals(other.getStream()) && load == other.getLoad();
     }
-    StreamLoad other = (StreamLoad) o;
-    return stream.equals(other.getStream()) && load == other.getLoad();
-  }
 
-  @Override
-  public String toString() {
-    return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
-  }
+    @Override
+    public String toString() {
+        return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+    }
 
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(stream).append(load).build();
-  }
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(stream).append(load).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
index 4f01bdc..977ae04 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.util.Utils;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.TreeSet;
-
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -35,139 +38,136 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.twitter.distributedlog.BKDistributedLogNamespace;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.util.Utils;
-
 /**
  * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
  * avoid necessitating an additional system for the resource placement.
  */
 public class ZKPlacementStateManager implements PlacementStateManager {
-  static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
-  private static final String SERVER_LOAD_DIR = "/.server-load";
 
-  private final String serverLoadPath;
-  private final ZooKeeperClient zkClient;
+    private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
 
-  private boolean watching = false;
+    private static final String SERVER_LOAD_DIR = "/.server-load";
 
-  public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
-    String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
-    zkClient = BKNamespaceDriver.createZKClientBuilder(
-        String.format("ZKPlacementStateManager-%s", zkServers),
-        conf,
-        zkServers,
-        statsLogger.scope("placement_state_manager")).build();
-    serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
-  }
+    private final String serverLoadPath;
+    private final ZooKeeperClient zkClient;
 
-  private void createServerLoadPathIfNoExists(byte[] data)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
-    try {
-      Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
-    } catch (KeeperException.NodeExistsException nee) {
-      logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+    private boolean watching = false;
+
+    public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+        zkClient = BKNamespaceDriver.createZKClientBuilder(
+            String.format("ZKPlacementStateManager-%s", zkServers),
+            conf,
+            zkServers,
+            statsLogger.scope("placement_state_manager")).build();
+        serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
     }
-  }
-
-  @Override
-  public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
-    logger.info("saving ownership");
-    try {
-      ZooKeeper zk = zkClient.get();
-      // use timestamp as data so watchers will see any changes
-      byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
-      if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
-        createServerLoadPathIfNoExists(timestamp);
-      }
-
-      Transaction tx = zk.transaction();
-      List<String> children = zk.getChildren(serverLoadPath, false);
-      HashSet<String> servers = new HashSet<String>(children);
-      tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
-      for (ServerLoad serverLoad : serverLoads) {
-        String server = serverToZkFormat(serverLoad.getServer());
-        String serverPath = serverPath(server);
-        if (servers.contains(server)) {
-          servers.remove(server);
-          tx.setData(serverPath, serverLoad.serialize(), -1);
-        } else {
-          tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+
+    private void createServerLoadPathIfNoExists(byte[] data)
+        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+        try {
+            Utils.zkCreateFullPathOptimistic(
+                zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException nee) {
+            logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
         }
-      }
-      for (String server : servers) {
-        tx.delete(serverPath(server), -1);
-      }
-      tx.commit();
-    } catch (InterruptedException | IOException | KeeperException e) {
-      throw new StateManagerSaveException(e);
     }
-  }
-
-  @Override
-  public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
-    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-    try {
-      ZooKeeper zk = zkClient.get();
-      List<String> children = zk.getChildren(serverLoadPath, false);
-      for (String server : children) {
-        ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
-      }
-      return ownerships;
-    } catch (InterruptedException | IOException | KeeperException e) {
-      throw new StateManagerLoadException(e);
+
+    @Override
+    public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+        logger.info("saving ownership");
+        try {
+            ZooKeeper zk = zkClient.get();
+            // use timestamp as data so watchers will see any changes
+            byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+            if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+                createServerLoadPathIfNoExists(timestamp);
+            }
+
+            Transaction tx = zk.transaction();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            HashSet<String> servers = new HashSet<String>(children);
+            tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+            for (ServerLoad serverLoad : serverLoads) {
+                String server = serverToZkFormat(serverLoad.getServer());
+                String serverPath = serverPath(server);
+                if (servers.contains(server)) {
+                    servers.remove(server);
+                    tx.setData(serverPath, serverLoad.serialize(), -1);
+                } else {
+                    tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+                }
+            }
+            for (String server : servers) {
+                tx.delete(serverPath(server), -1);
+            }
+            tx.commit();
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerSaveException(e);
+        }
     }
-  }
 
-  @Override
-  synchronized public void watch(final PlacementCallback callback) {
-    if (watching) {
-      return; // do not double watch
+    @Override
+    public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        try {
+            ZooKeeper zk = zkClient.get();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            for (String server : children) {
+                ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+            }
+            return ownerships;
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerLoadException(e);
+        }
     }
-    watching = true;
-
-    try {
-      ZooKeeper zk = zkClient.get();
-      try {
-        zk.getData(serverLoadPath, new Watcher() {
-          @Override
-          public void process(WatchedEvent watchedEvent) {
+
+    @Override
+    public synchronized void watch(final PlacementCallback callback) {
+        if (watching) {
+            return; // do not double watch
+        }
+        watching = true;
+
+        try {
+            ZooKeeper zk = zkClient.get();
             try {
-              callback.callback(loadOwnership());
-            } catch (StateManagerLoadException e) {
-              logger.error("Watch of Ownership failed", e);
-            } finally {
-              watching = false;
-              watch(callback);
+                zk.getData(serverLoadPath, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent watchedEvent) {
+                        try {
+                            callback.callback(loadOwnership());
+                        } catch (StateManagerLoadException e) {
+                            logger.error("Watch of Ownership failed", e);
+                        } finally {
+                            watching = false;
+                            watch(callback);
+                        }
+                    }
+                }, new Stat());
+            } catch (KeeperException.NoNodeException nee) {
+                byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+                createServerLoadPathIfNoExists(timestamp);
+                watching = false;
+                watch(callback);
             }
-          }
-        }, new Stat());
-      } catch (KeeperException.NoNodeException nee) {
-        byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-        createServerLoadPathIfNoExists(timestamp);
-        watching = false;
-        watch(callback);
-      }
-    } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
-      logger.error("Watch of Ownership failed", e);
-      watching = false;
-      watch(callback);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+            logger.error("Watch of Ownership failed", e);
+            watching = false;
+            watch(callback);
+        }
     }
-  }
 
-  public String serverPath(String server) {
-    return String.format("%s/%s", serverLoadPath, server);
-  }
+    public String serverPath(String server) {
+        return String.format("%s/%s", serverLoadPath, server);
+    }
 
-  protected String serverToZkFormat(String server) {
-    return server.replaceAll("/", "--");
-  }
+    protected String serverToZkFormat(String server) {
+        return server.replaceAll("/", "--");
+    }
 
-  protected String zkFormatToServer(String zkFormattedServer) {
-    return zkFormattedServer.replaceAll("--", "/");
-  }
+    protected String zkFormatToServer(String zkFormattedServer) {
+        return zkFormattedServer.replaceAll("--", "/");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
new file mode 100644
index 0000000..72c134b
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Placement Policy to place streams across proxy services.
+ */
+package com.twitter.distributedlog.service.placement;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index fbef587..b513e24 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -18,20 +18,18 @@
 package com.twitter.distributedlog.service.stream;
 
 import com.google.common.base.Stopwatch;
-
-import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Try;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.exceptions.ChecksumFailedException;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
-
+import com.twitter.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Try;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -40,8 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
+/**
+ * Abstract Stream Operation.
+ */
 public abstract class AbstractStreamOp<Response> implements StreamOp {
-    static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
     protected final String stream;
     protected final OpStatsLogger opStatsLogger;
     private final Promise<Response> result = new Promise<Response>();
@@ -103,7 +106,7 @@ public abstract class AbstractStreamOp<Response> implements StreamOp {
     }
 
     /**
-     * Fail with current <i>owner</i> and its reason <i>t</i>
+     * Fail with current <i>owner</i> and its reason <i>t</i>.
      *
      * @param cause
      *          failure reason

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
index ae0d67d..a385b84 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
@@ -17,17 +17,18 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.service.ResponseUtils;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.util.Future;
-
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.OpStatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Abstract Write Operation.
+ */
 public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
 
     protected AbstractWriteOp(String stream,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index c009bb9..4d50b66 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -17,19 +17,13 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.LogRecord;
 import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
@@ -39,21 +33,26 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.ConstFuture;
+import com.twitter.util.Future;
 import com.twitter.util.Future$;
 import com.twitter.util.FutureEventListener;
-import com.twitter.util.Future;
 import com.twitter.util.Try;
-
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Bulk Write Operation.
+ */
 public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
     private final List<ByteBuffer> buffers;
     private final long payloadSize;
@@ -77,9 +76,9 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
         try {
             result.get();
         } catch (Exception ex) {
-            if (ex instanceof OwnershipAcquireFailedException ||
-                ex instanceof AlreadyClosedException ||
-                ex instanceof LockingException) {
+            if (ex instanceof OwnershipAcquireFailedException
+                || ex instanceof AlreadyClosedException
+                || ex instanceof LockingException) {
                 def = true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
index 1dde1f9..e30a989 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to delete a log stream.
+ */
 public class DeleteOp extends AbstractWriteOp {
     private final StreamManager streamManager;
     private final Counter deniedDeleteCounter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
index 4b2cbc1..f34295b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
@@ -17,6 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.BKAsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
@@ -28,15 +30,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
-import static com.google.common.base.Charsets.UTF_8;
-
+/**
+ * Heartbeat Operation.
+ */
 public class HeartbeatOp extends AbstractWriteOp {
 
     static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
index 25835f6..aa0f1a7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to release ownership of a log stream.
+ */
 public class ReleaseOp extends AbstractWriteOp {
     private final StreamManager streamManager;
     private final Counter deniedReleaseCounter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
index a1e3e4f..e015e29 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
@@ -23,13 +23,14 @@ import com.twitter.util.Future;
 import java.io.IOException;
 
 /**
- * Stream is the per stream request handler in the DL service layer. The collection of Streams in
- * the proxy are managed by StreamManager.
+ * Stream is the per stream request handler in the DL service layer.
+ *
+ * <p>The collection of Streams in the proxy are managed by StreamManager.
  */
 public interface Stream {
 
     /**
-     * Get the stream configuration for this stream
+     * Get the stream configuration for this stream.
      *
      * @return stream configuration
      */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
index 51d7ffd..0dfbd69 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.stream;
 
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 
+/**
+ * Factory to create a stream with provided stream configuration {@code streamConf}.
+ */
 public interface StreamFactory {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
index cb28f1e..566ded6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
@@ -29,6 +29,9 @@ import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.jboss.netty.util.HashedWheelTimer;
 
+/**
+ * The implementation of {@link StreamFactory}.
+ */
 public class StreamFactoryImpl implements StreamFactory {
     private final String clientId;
     private final StreamOpStats streamOpStats;


Mime
View raw message