distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [04/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException
Date Mon, 12 Jun 2017 15:45:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
deleted file mode 100644
index 3f28c42..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.admin;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.exceptions.ChecksumFailedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureTransformer;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-
-/**
- * Stream admin op.
- */
-public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
-
-    protected final String stream;
-    protected final StreamManager streamManager;
-    protected final OpStatsLogger opStatsLogger;
-    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
-    protected final Long checksum;
-    protected final Feature checksumDisabledFeature;
-
-    protected StreamAdminOp(String stream,
-                            StreamManager streamManager,
-                            OpStatsLogger statsLogger,
-                            Long checksum,
-                            Feature checksumDisabledFeature) {
-        this.stream = stream;
-        this.streamManager = streamManager;
-        this.opStatsLogger = statsLogger;
-        // start here in case the operation is failed before executing.
-        stopwatch.reset().start();
-        this.checksum = checksum;
-        this.checksumDisabledFeature = checksumDisabledFeature;
-    }
-
-    protected Long computeChecksum() {
-        return ProtocolUtils.streamOpCRC32(stream);
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
-            Long serverChecksum = computeChecksum();
-            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
-                throw new ChecksumFailedException();
-            }
-        }
-    }
-
-    /**
-     * Execute the operation.
-     *
-     * @return execute operation
-     */
-    protected abstract Future<WriteResponse> executeOp();
-
-    @Override
-    public Future<WriteResponse> execute() {
-        return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() {
-
-            @Override
-            public WriteResponse map(WriteResponse response) {
-                opStatsLogger.registerSuccessfulEvent(
-                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                return response;
-            }
-
-            @Override
-            public WriteResponse handle(Throwable cause) {
-                opStatsLogger.registerFailedEvent(
-                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
-            }
-
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
deleted file mode 100644
index 5b583e1..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Stream Related Admin Operations.
- */
-package org.apache.distributedlog.service.stream.admin;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
deleted file mode 100644
index 5db2037..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import java.io.Closeable;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.configuration.event.ConfigurationEvent;
-import org.apache.commons.configuration.event.ConfigurationListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Dynamically rebuild a rate limiter when the supplied dynamic config changes.
- *
- * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
- * the config listener.
- */
-public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
-
-    private final ConfigurationListener listener;
-    private final Feature rateLimitDisabledFeature;
-    volatile RequestLimiter<Req> limiter;
-    final DynamicDistributedLogConfiguration dynConf;
-
-    public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf,
-                                 StatsLogger statsLogger,
-                                 Feature rateLimitDisabledFeature) {
-        final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic");
-        this.dynConf = dynConf;
-        this.rateLimitDisabledFeature = rateLimitDisabledFeature;
-        this.listener = new ConfigurationListener() {
-            @Override
-            public void configurationChanged(ConfigurationEvent event) {
-                // Note that this method may be called several times if several config options
-                // are changed. The effect is harmless except that we create and discard more
-                // objects than we need to.
-                LOG.debug("Config changed callback invoked with event {} {} {} {}", new Object[] {
-                        event.getPropertyName(), event.getPropertyValue(), event.getType(),
-                        event.isBeforeUpdate()});
-                if (!event.isBeforeUpdate()) {
-                    limiterStatsLogger.getCounter("config_changed").inc();
-                    LOG.debug("Rebuilding limiter");
-                    limiter = build();
-                }
-            }
-        };
-        LOG.debug("Registering config changed callback");
-        dynConf.addConfigurationListener(listener);
-    }
-
-    public void initialize() {
-        this.limiter = build();
-    }
-
-    @Override
-    public void apply(Req request) throws OverCapacityException {
-        if (rateLimitDisabledFeature.isAvailable()) {
-            return;
-        }
-        limiter.apply(request);
-    }
-
-    @Override
-    public void close() {
-        boolean success = dynConf.removeConfigurationListener(listener);
-        LOG.debug("Deregistering config changed callback success={}", success);
-    }
-
-   /**
-    * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed.
-    * This may be called multiple times so the method should be cheap.
-    */
-    protected abstract RequestLimiter<Req> build();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
deleted file mode 100644
index fc30599..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import org.apache.distributedlog.limiter.GuavaRateLimiter;
-import org.apache.distributedlog.limiter.RateLimiter;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.distributedlog.service.stream.WriteOpWithPayload;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Request limiter builder.
- */
-public class RequestLimiterBuilder {
-    private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION;
-    private RateLimiter limiter;
-    private CostFunction<StreamOp> costFunction;
-    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-
-    /**
-     * Function to calculate the `RPS` (Request per second) cost of a given stream operation.
-     */
-    public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() {
-        @Override
-        public int apply(StreamOp op) {
-            if (op instanceof WriteOpWithPayload) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    };
-
-    /**
-     * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation.
-     */
-    public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() {
-        @Override
-        public int apply(StreamOp op) {
-            if (op instanceof WriteOpWithPayload) {
-                WriteOpWithPayload writeOp = (WriteOpWithPayload) op;
-                return (int) Math.min(writeOp.getPayloadSize(), Integer.MAX_VALUE);
-            } else {
-                return 0;
-            }
-        }
-    };
-
-    /**
-     * Function to check if a stream operation will cause {@link OverCapacityException}.
-     */
-    public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() {
-        @Override
-        public void apply(StreamOp op) throws OverCapacityException {
-            return;
-        }
-    };
-
-    public RequestLimiterBuilder limit(int limit) {
-        this.limiter = GuavaRateLimiter.of(limit);
-        return this;
-    }
-
-    public RequestLimiterBuilder overlimit(OverlimitFunction<StreamOp> overlimitFunction) {
-        this.overlimitFunction = overlimitFunction;
-        return this;
-    }
-
-    public RequestLimiterBuilder cost(CostFunction<StreamOp> costFunction) {
-        this.costFunction = costFunction;
-        return this;
-    }
-
-    public RequestLimiterBuilder statsLogger(StatsLogger statsLogger) {
-        this.statsLogger = statsLogger;
-        return this;
-    }
-
-    public static RequestLimiterBuilder newRpsLimiterBuilder() {
-        return new RequestLimiterBuilder().cost(RPS_COST_FUNCTION);
-    }
-
-    public static RequestLimiterBuilder newBpsLimiterBuilder() {
-        return new RequestLimiterBuilder().cost(BPS_COST_FUNCTION);
-    }
-
-    public RequestLimiter<StreamOp> build() {
-        checkNotNull(limiter);
-        checkNotNull(overlimitFunction);
-        checkNotNull(costFunction);
-        return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
deleted file mode 100644
index de805aa..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.ChainedRequestLimiter;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Request limiter for the service instance (global request limiter).
- */
-public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> {
-    private final StatsLogger limiterStatLogger;
-    private final MovingAverageRate serviceRps;
-    private final MovingAverageRate serviceBps;
-    private final StreamManager streamManager;
-
-    public ServiceRequestLimiter(DynamicDistributedLogConfiguration dynConf,
-                                 StatsLogger statsLogger,
-                                 MovingAverageRate serviceRps,
-                                 MovingAverageRate serviceBps,
-                                 StreamManager streamManager,
-                                 Feature disabledFeature) {
-        super(dynConf, statsLogger, disabledFeature);
-        this.limiterStatLogger = statsLogger;
-        this.streamManager = streamManager;
-        this.serviceRps = serviceRps;
-        this.serviceBps = serviceBps;
-        this.limiter = build();
-    }
-
-    @Override
-    public RequestLimiter<StreamOp> build() {
-        int rpsStreamAcquireLimit = dynConf.getRpsStreamAcquireServiceLimit();
-        int rpsSoftServiceLimit = dynConf.getRpsSoftServiceLimit();
-        int rpsHardServiceLimit = dynConf.getRpsHardServiceLimit();
-        int bpsStreamAcquireLimit = dynConf.getBpsStreamAcquireServiceLimit();
-        int bpsSoftServiceLimit = dynConf.getBpsSoftServiceLimit();
-        int bpsHardServiceLimit = dynConf.getBpsHardServiceLimit();
-
-        RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
-            .limit(rpsHardServiceLimit)
-            .overlimit(new OverlimitFunction<StreamOp>() {
-                @Override
-                public void apply(StreamOp request) throws OverCapacityException {
-                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance");
-                }
-            });
-
-        RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
-            .limit(rpsSoftServiceLimit);
-
-        RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
-            .limit(bpsHardServiceLimit)
-            .overlimit(new OverlimitFunction<StreamOp>() {
-                @Override
-                public void apply(StreamOp request) throws OverCapacityException {
-                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance");
-                }
-            });
-
-        RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
-            .limit(bpsSoftServiceLimit);
-
-        ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
-        builder.addLimiter(new StreamAcquireLimiter(
-            streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
-        builder.addLimiter(new StreamAcquireLimiter(
-            streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
-        builder.addLimiter(bpsHardLimiterBuilder.build());
-        builder.addLimiter(bpsSoftLimiterBuilder.build());
-        builder.addLimiter(rpsHardLimiterBuilder.build());
-        builder.addLimiter(rpsSoftLimiterBuilder.build());
-        builder.statsLogger(limiterStatLogger);
-        return builder.build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
deleted file mode 100644
index 7675d6f..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.exceptions.TooManyStreamsException;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * A special limiter on limiting acquiring new streams.
- */
-public class StreamAcquireLimiter implements RequestLimiter<StreamOp> {
-    private final StreamManager streamManager;
-    private final MovingAverageRate serviceRps;
-    private final double serviceRpsLimit;
-    private final Counter overlimitCounter;
-
-    public StreamAcquireLimiter(StreamManager streamManager,
-                                MovingAverageRate serviceRps,
-                                double serviceRpsLimit,
-                                StatsLogger statsLogger) {
-        this.streamManager = streamManager;
-        this.serviceRps = serviceRps;
-        this.serviceRpsLimit = serviceRpsLimit;
-        this.overlimitCounter = statsLogger.getCounter("overlimit");
-    }
-
-    @Override
-    public void apply(StreamOp op) throws OverCapacityException {
-        String streamName = op.streamName();
-        if (serviceRpsLimit > -1 && serviceRps.get() > serviceRpsLimit && !streamManager.isAcquired(streamName)) {
-            overlimitCounter.inc();
-            throw new TooManyStreamsException("Request rate is too high to accept new stream " + streamName + ".");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
deleted file mode 100644
index 42b4e1e..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamRequestLimiter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.limiter;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.ChainedRequestLimiter;
-import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.service.stream.StreamOp;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * A dynamic request limiter on limiting stream operations.
- */
-public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> {
-    private final DynamicDistributedLogConfiguration dynConf;
-    private final StatsLogger limiterStatLogger;
-    private final String streamName;
-
-    public StreamRequestLimiter(String streamName,
-                                DynamicDistributedLogConfiguration dynConf,
-                                StatsLogger statsLogger,
-                                Feature disabledFeature) {
-        super(dynConf, statsLogger, disabledFeature);
-        this.limiterStatLogger = statsLogger;
-        this.dynConf = dynConf;
-        this.streamName = streamName;
-        this.limiter = build();
-    }
-
-    @Override
-    public RequestLimiter<StreamOp> build() {
-
-        // RPS hard, soft limits
-        RequestLimiterBuilder rpsHardLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("rps_hard_limit"))
-            .limit(dynConf.getRpsHardWriteLimit())
-            .overlimit(new OverlimitFunction<StreamOp>() {
-                @Override
-                public void apply(StreamOp op) throws OverCapacityException {
-                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName);
-                }
-            });
-        RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("rps_soft_limit"))
-            .limit(dynConf.getRpsSoftWriteLimit());
-
-        // BPS hard, soft limits
-        RequestLimiterBuilder bpsHardLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("bps_hard_limit"))
-            .limit(dynConf.getBpsHardWriteLimit())
-            .overlimit(new OverlimitFunction<StreamOp>() {
-                @Override
-                public void apply(StreamOp op) throws OverCapacityException {
-                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName);
-                }
-            });
-        RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()
-            .statsLogger(limiterStatLogger.scope("bps_soft_limit"))
-            .limit(dynConf.getBpsSoftWriteLimit());
-
-        ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
-        builder.addLimiter(rpsSoftLimiterBuilder.build());
-        builder.addLimiter(rpsHardLimiterBuilder.build());
-        builder.addLimiter(bpsSoftLimiterBuilder.build());
-        builder.addLimiter(bpsHardLimiterBuilder.build());
-        builder.statsLogger(limiterStatLogger);
-        return builder.build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
deleted file mode 100644
index c666b08..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/limiter/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Request Rate Limiting.
- */
-package org.apache.distributedlog.service.stream.limiter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java
deleted file mode 100644
index 7429a85..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Stream Related Operations.
- */
-package org.apache.distributedlog.service.stream;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
deleted file mode 100644
index 72668c2..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A stream-to-partition converter that caches the mapping between stream and partitions.
- */
-public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter {
-
-    private final ConcurrentMap<String, Partition> partitions;
-
-    protected CacheableStreamPartitionConverter() {
-        this.partitions = new ConcurrentHashMap<String, Partition>();
-    }
-
-    @Override
-    public Partition convert(String streamName) {
-        Partition p = partitions.get(streamName);
-        if (null != p) {
-            return p;
-        }
-        // not found
-        Partition newPartition = newPartition(streamName);
-        Partition oldPartition = partitions.putIfAbsent(streamName, newPartition);
-        if (null == oldPartition) {
-            return newPartition;
-        } else {
-            return oldPartition;
-        }
-    }
-
-    /**
-     * Create the partition from <code>streamName</code>.
-     *
-     * @param streamName
-     *          stream name
-     * @return partition id of the stream
-     */
-    protected abstract Partition newPartition(String streamName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
deleted file mode 100644
index 30b2896..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter.
- */
-public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter {
-
-    private final String delimiter;
-
-    public DelimiterStreamPartitionConverter() {
-        this("_");
-    }
-
-    public DelimiterStreamPartitionConverter(String delimiter) {
-        this.delimiter = delimiter;
-    }
-
-    @Override
-    protected Partition newPartition(String streamName) {
-        String[] parts = StringUtils.split(streamName, delimiter);
-        if (null != parts && parts.length == 2) {
-            try {
-                int partition = Integer.parseInt(parts[1]);
-                return new Partition(parts[0], partition);
-            } catch (NumberFormatException nfe) {
-                // ignore the exception
-            }
-        }
-        return new Partition(streamName, 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
deleted file mode 100644
index 5be172f..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/IdentityStreamPartitionConverter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-/**
- * Map stream name to partition of the same name.
- */
-public class IdentityStreamPartitionConverter extends CacheableStreamPartitionConverter {
-    @Override
-    protected Partition newPartition(String streamName) {
-        return new Partition(streamName, 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
deleted file mode 100644
index aa69276..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/Partition.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-import com.google.common.base.Objects;
-
-/**
- * `Partition` defines the relationship between a `virtual` stream and a
- * physical DL stream.
- *
- * <p>A `virtual` stream could be partitioned into multiple partitions
- * and each partition is effectively a DL stream.
- */
-public class Partition {
-
-    // Name of its parent stream.
-    private final String stream;
-
-    // Unique id of the partition within the stream.
-    // It can be just simply an index id.
-    public final int id;
-
-    public Partition(String stream, int id) {
-        this.stream = stream;
-        this.id = id;
-    }
-
-    /**
-     * Get the `virtual` stream name.
-     *
-     * @return the stream name.
-     */
-    public String getStream() {
-        return stream;
-    }
-
-    /**
-     * Get the partition id of this partition.
-     *
-     * @return partition id
-     */
-    public int getId() {
-        return id;
-    }
-
-    /**
-     * Get the 6 digit 0 padded id of this partition as a String.
-     * @return partition id
-     */
-    public String getPaddedId() {
-        return String.format("%06d", getId());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof Partition)) {
-            return false;
-        }
-        Partition partition = (Partition) o;
-
-        return id == partition.id && Objects.equal(stream, partition.stream);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = stream.hashCode();
-        result = 31 * result + id;
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Partition(")
-          .append(stream)
-          .append(", ")
-          .append(id)
-          .append(")");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
deleted file mode 100644
index bfcc5db..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/PartitionMap.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A mapping between a logical stream and a set of physical partitions.
- */
-public class PartitionMap {
-
-    private final Map<String, Set<Partition>> partitionMap;
-
-    public PartitionMap() {
-        partitionMap = new HashMap<String, Set<Partition>>();
-    }
-
-    public synchronized boolean addPartition(Partition partition, int maxPartitions) {
-        if (maxPartitions <= 0) {
-            return true;
-        }
-        Set<Partition> partitions = partitionMap.get(partition.getStream());
-        if (null == partitions) {
-            partitions = new HashSet<Partition>();
-            partitions.add(partition);
-            partitionMap.put(partition.getStream(), partitions);
-            return true;
-        }
-        if (partitions.contains(partition) || partitions.size() < maxPartitions) {
-            partitions.add(partition);
-            return true;
-        }
-        return false;
-    }
-
-    public synchronized boolean removePartition(Partition partition) {
-        Set<Partition> partitions = partitionMap.get(partition.getStream());
-        return null != partitions && partitions.remove(partition);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
deleted file mode 100644
index 3ea1337..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/StreamPartitionConverter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.streamset;
-
-/**
- * Map stream name to a partition.
- *
- * @see Partition
- */
-public interface StreamPartitionConverter {
-
-    /**
-     * Convert the stream name to partition.
-     *
-     * @param streamName
-     *          stream name
-     * @return partition
-     */
-    Partition convert(String streamName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
deleted file mode 100644
index d185e88..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/streamset/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * StreamSet - A logical set of streams.
- */
-package org.apache.distributedlog.service.streamset;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
deleted file mode 100644
index 3934eb5..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/ProxyTool.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.tools;
-
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.service.ClientUtils;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import org.apache.distributedlog.tools.Tool;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tools to interact with proxies.
- */
-public class ProxyTool extends Tool {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
-
-    /**
-     * Abstract Cluster level command.
-     */
-    protected abstract static class ClusterCommand extends OptsCommand {
-
-        protected Options options = new Options();
-        protected URI uri;
-        protected final List<String> streams = new ArrayList<String>();
-
-        protected ClusterCommand(String name, String description) {
-            super(name, description);
-            options.addOption("u", "uri", true, "DistributedLog URI");
-            options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
-            options.addOption("e", "expression", true, "Expression to generate stream suffix. "
-                + "Currently we support range '0-9', list '1,2,3' and name '143'");
-        }
-
-        @Override
-        protected int runCmd(CommandLine commandLine) throws Exception {
-            try {
-                parseCommandLine(commandLine);
-            } catch (ParseException pe) {
-                System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
-                printUsage();
-                return -1;
-            }
-
-            DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
-            logger.info("Created serverset for {}", uri);
-            try {
-                DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
-                        .name("proxy_tool")
-                        .clientId(ClientId$.MODULE$.apply("proxy_tool"))
-                        .maxRedirects(2)
-                        .serverSet(serverSet.getServerSet())
-                        .clientBuilder(ClientBuilder.get()
-                            .connectionTimeout(Duration.fromSeconds(2))
-                            .tcpConnectTimeout(Duration.fromSeconds(2))
-                            .requestTimeout(Duration.fromSeconds(10))
-                            .hostConnectionLimit(1)
-                            .hostConnectionCoresize(1)
-                            .keepAlive(true)
-                            .failFast(false))
-                        .build();
-                try {
-                    return runCmd(client);
-                } finally {
-                    client.close();
-                }
-            } finally {
-                serverSet.close();
-            }
-        }
-
-        protected abstract int runCmd(DistributedLogClient client) throws Exception;
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            if (!cmdline.hasOption("u")) {
-                throw new ParseException("No distributedlog uri provided.");
-            }
-            this.uri = URI.create(cmdline.getOptionValue("u"));
-
-            // get stream names
-            String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : "";
-            String streamExpression = null;
-            if (cmdline.hasOption("e")) {
-                streamExpression = cmdline.getOptionValue("e");
-            }
-            if (null == streamPrefix || null == streamExpression) {
-                throw new ParseException("Please specify stream prefix & expression.");
-            }
-            // parse the stream expression
-            if (streamExpression.contains("-")) {
-                // a range expression
-                String[] parts = streamExpression.split("-");
-                if (parts.length != 2) {
-                    throw new ParseException("Invalid stream index range : " + streamExpression);
-                }
-                try {
-                    int start = Integer.parseInt(parts[0]);
-                    int end = Integer.parseInt(parts[1]);
-                    if (start > end) {
-                        throw new ParseException("Invalid stream index range : " + streamExpression);
-                    }
-                    for (int i = start; i <= end; i++) {
-                        streams.add(streamPrefix + i);
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid stream index range : " + streamExpression);
-                }
-            } else if (streamExpression.contains(",")) {
-                // a list expression
-                String[] parts = streamExpression.split(",");
-                try {
-                    for (String part : parts) {
-                        streams.add(streamPrefix + part);
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid stream suffix list : " + streamExpression);
-                }
-            } else {
-                streams.add(streamPrefix + streamExpression);
-            }
-        }
-    }
-
-    /**
-     * Command to release ownership of a log stream.
-     */
-    static class ReleaseCommand extends ClusterCommand {
-
-        double rate = 100f;
-
-        ReleaseCommand() {
-            super("release", "Release Stream Ownerships");
-            options.addOption("t", "rate", true, "Rate to release streams");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("t")) {
-                rate = Double.parseDouble(cmdline.getOptionValue("t", "100"));
-            }
-        }
-
-        @Override
-        protected int runCmd(DistributedLogClient client) throws Exception {
-            RateLimiter rateLimiter = RateLimiter.create(rate);
-            for (String stream : streams) {
-                rateLimiter.acquire();
-                try {
-                    Await.result(client.release(stream));
-                    System.out.println("Release ownership of stream " + stream);
-                } catch (Exception e) {
-                    System.err.println("Failed to release ownership of stream " + stream);
-                    throw e;
-                }
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "release [options]";
-        }
-    }
-
-    /**
-     * Command to truncate a log stream.
-     */
-    static class TruncateCommand extends ClusterCommand {
-
-        DLSN dlsn = DLSN.InitialDLSN;
-
-        TruncateCommand() {
-            super("truncate", "Truncate streams until given dlsn.");
-            options.addOption("d", "dlsn", true, "DLSN to truncate until");
-        }
-
-        @Override
-        protected int runCmd(DistributedLogClient client) throws Exception {
-            System.out.println("Truncating streams : " + streams);
-            for (String stream : streams) {
-                boolean success = Await.result(client.truncate(stream, dlsn));
-                System.out.println("Truncate " + stream + " to " + dlsn + " : " + success);
-            }
-            return 0;
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("d")) {
-                throw new ParseException("No DLSN provided");
-            }
-            String[] dlsnStrs = cmdline.getOptionValue("d").split(",");
-            if (dlsnStrs.length != 3) {
-                throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d"));
-            }
-            dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2]));
-        }
-
-        @Override
-        protected String getUsage() {
-            return "truncate [options]";
-        }
-    }
-
-    /**
-     * Abstract command to operate on a single proxy server.
-     */
-    protected abstract static class ProxyCommand extends OptsCommand {
-
-        protected Options options = new Options();
-        protected InetSocketAddress address;
-
-        protected ProxyCommand(String name, String description) {
-            super(name, description);
-            options.addOption("H", "host", true, "Single Proxy Address");
-        }
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            if (!cmdline.hasOption("H")) {
-                throw new ParseException("No proxy address provided");
-            }
-            address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H"));
-        }
-
-        @Override
-        protected int runCmd(CommandLine commandLine) throws Exception {
-            try {
-                parseCommandLine(commandLine);
-            } catch (ParseException pe) {
-                System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
-                printUsage();
-                return -1;
-            }
-
-            DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder()
-                    .name("proxy_tool")
-                    .clientId(ClientId$.MODULE$.apply("proxy_tool"))
-                    .maxRedirects(2)
-                    .host(address)
-                    .clientBuilder(ClientBuilder.get()
-                            .connectionTimeout(Duration.fromSeconds(2))
-                            .tcpConnectTimeout(Duration.fromSeconds(2))
-                            .requestTimeout(Duration.fromSeconds(10))
-                            .hostConnectionLimit(1)
-                            .hostConnectionCoresize(1)
-                            .keepAlive(true)
-                            .failFast(false));
-            Pair<DistributedLogClient, MonitorServiceClient> clientPair =
-                    ClientUtils.buildClient(clientBuilder);
-            try {
-                return runCmd(clientPair);
-            } finally {
-                clientPair.getLeft().close();
-            }
-        }
-
-        protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
-    }
-
-    /**
-     * Command to enable/disable accepting new streams.
-     */
-    static class AcceptNewStreamCommand extends ProxyCommand {
-
-        boolean enabled = false;
-
-        AcceptNewStreamCommand() {
-            super("accept-new-stream", "Enable/Disable accepting new streams for one proxy");
-            options.addOption("e", "enabled", true, "Enable/Disable accepting new streams");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("e")) {
-                throw new ParseException("No action 'enable/disable' provided");
-            }
-            enabled = Boolean.parseBoolean(cmdline.getOptionValue("e"));
-        }
-
-        @Override
-        protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client)
-                throws Exception {
-            Await.result(client.getRight().setAcceptNewStream(enabled));
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "accept-new-stream [options]";
-        }
-    }
-
-    public ProxyTool() {
-        super();
-        addCommand(new ReleaseCommand());
-        addCommand(new TruncateCommand());
-        addCommand(new AcceptNewStreamCommand());
-    }
-
-    @Override
-    protected String getName() {
-        return "proxy_tool";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java
deleted file mode 100644
index 92d0a7d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/tools/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Service related tools.
- */
-package org.apache.distributedlog.service.tools;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
deleted file mode 100644
index 9ee93b4..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/ServerUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.utils;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-/**
- * Utils that used by servers.
- */
-public class ServerUtils {
-
-  /**
-   * Retrieve the ledger allocator pool name.
-   *
-   * @param serverRegionId region id that that server is running
-   * @param shardId shard id of the server
-   * @param useHostname whether to use hostname as the ledger allocator pool name
-   * @return ledger allocator pool name
-   * @throws IOException
-   */
-    public static String getLedgerAllocatorPoolName(int serverRegionId,
-                                                    int shardId,
-                                                    boolean useHostname)
-        throws IOException {
-        if (useHostname) {
-            return String.format("allocator_%04d_%s", serverRegionId,
-                InetAddress.getLocalHost().getHostAddress());
-        } else {
-            return String.format("allocator_%04d_%010d", serverRegionId, shardId);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java
deleted file mode 100644
index 99cf736..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/utils/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Utilities used by proxy servers.
- */
-package org.apache.distributedlog.service.utils;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/config/server_decider.conf
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/resources/config/server_decider.conf b/distributedlog-service/src/main/resources/config/server_decider.conf
deleted file mode 100644
index d2fddf5..0000000
--- a/distributedlog-service/src/main/resources/config/server_decider.conf
+++ /dev/null
@@ -1,31 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-region_stop_accept_new_stream=0
-disable_durability_enforcement=0
-disable_write_limit=0
-bkc.repp_disable_durability_enforcement=0
-bkc.disable_ensemble_change=0
-dl.disable_logsegment_rolling=0
-dl.disable_write_limit=0
-bkc.atla.disallow_bookie_placement=0
-bkc.atlb.disallow_bookie_placement=0
-bkc.smf1.disallow_bookie_placement=0
-service_rate_limit_disabled=0
-service_checksum_disabled=0
-service_global_limiter_disabled=0

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/config/server_decider.yml
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/resources/config/server_decider.yml b/distributedlog-service/src/main/resources/config/server_decider.yml
deleted file mode 100644
index 7df24bb..0000000
--- a/distributedlog-service/src/main/resources/config/server_decider.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-region_stop_accept_new_stream:
-  default_availability: 0
-disable_durability_enforcement:
-  default_availability: 0
-disable_write_limit:
-  default_availability: 0
-bkc.repp_disable_durability_enforcement:
-  default_availability: 0
-bkc.disable_ensemble_change:
-  default_availability: 0
-dl.disable_logsegment_rolling:
-  default_availability: 0
-dl.disable_write_limit:
-  default_availability: 0
-bkc.atla.disallow_bookie_placement:
-  default_availability: 0
-bkc.atlb.disallow_bookie_placement:
-  default_availability: 0
-bkc.smf1.disallow_bookie_placement:
-  default_availability: 0
-service_rate_limit_disabled:
-  default_availability: 0
-service_checksum_disabled:
-  default_availability: 0
-service_global_limiter_disabled:
-  default_availability: 0

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/resources/findbugsExclude.xml b/distributedlog-service/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index e101a4d..0000000
--- a/distributedlog-service/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-//-->
-<FindBugsFilter>
-  <Match>
-    <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Class name="~org\.apache\.distributedlog\.thrift.*" />
-  </Match>
-  <Match>
-    <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" />
-  </Match>
-  <Match>
-    <!-- it is safe to cast exception here. //-->
-    <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" />
-    <Method name="onFailure" />
-    <Bug pattern="BC_UNCONFIRMED_CAST" />
-  </Match>
-  <Match>
-    <!-- it is safe to cast exception here. //-->
-    <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" />
-    <Method name="isDefiniteFailure" />
-    <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" />
-  </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/thrift/metadata.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift
deleted file mode 100644
index 9cb3c72..0000000
--- a/distributedlog-service/src/main/thrift/metadata.thrift
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-namespace java org.apache.distributedlog.service.placement.thrift
-
-struct StreamLoad {
-    1: optional string stream
-    2: optional i32 load
-}
-
-struct ServerLoad {
-    1: optional string server
-    2: optional i64 load
-    3: optional list<StreamLoad> streams
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
deleted file mode 100644
index a9ddae5..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/client/routing/LocalRoutingService.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.Sets;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * A local routing service that used for testing.
- */
-public class LocalRoutingService implements RoutingService {
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder to build a local routing service for testing.
-     */
-    public static class Builder implements RoutingService.Builder {
-
-        private Builder() {}
-
-        @Override
-        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
-            return this;
-        }
-
-        @Override
-        public LocalRoutingService build() {
-            return new LocalRoutingService();
-        }
-    }
-
-    private final Map<String, LinkedHashSet<SocketAddress>> localAddresses =
-            new HashMap<String, LinkedHashSet<SocketAddress>>();
-    private final CopyOnWriteArrayList<RoutingListener> listeners =
-            new CopyOnWriteArrayList<RoutingListener>();
-
-    boolean allowRetrySameHost = true;
-
-    @Override
-    public void startService() {
-        // nop
-    }
-
-    @Override
-    public void stopService() {
-        // nop
-    }
-
-    @Override
-    public synchronized Set<SocketAddress> getHosts() {
-        Set<SocketAddress> hosts = Sets.newHashSet();
-        for (LinkedHashSet<SocketAddress> addresses : localAddresses.values()) {
-            hosts.addAll(addresses);
-        }
-        return hosts;
-    }
-
-    @Override
-    public RoutingService registerListener(RoutingListener listener) {
-        listeners.add(listener);
-        return this;
-    }
-
-    @Override
-    public RoutingService unregisterListener(RoutingListener listener) {
-        listeners.remove(listener);
-        return this;
-    }
-
-    public LocalRoutingService setAllowRetrySameHost(boolean enabled) {
-        allowRetrySameHost = enabled;
-        return this;
-    }
-
-    public LocalRoutingService addHost(String stream, SocketAddress address) {
-        boolean notify = false;
-        synchronized (this) {
-            LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream);
-            if (null == addresses) {
-                addresses = new LinkedHashSet<SocketAddress>();
-                localAddresses.put(stream, addresses);
-            }
-            if (addresses.add(address)) {
-                notify = true;
-            }
-        }
-        if (notify) {
-            for (RoutingListener listener : listeners) {
-                listener.onServerJoin(address);
-            }
-        }
-        return this;
-    }
-
-    @Override
-    public synchronized SocketAddress getHost(String key, RoutingContext rContext)
-            throws NoBrokersAvailableException {
-        LinkedHashSet<SocketAddress> addresses = localAddresses.get(key);
-
-        SocketAddress candidate = null;
-        if (null != addresses) {
-            for (SocketAddress host : addresses) {
-                if (rContext.isTriedHost(host) && !allowRetrySameHost) {
-                    continue;
-                } else {
-                    candidate = host;
-                    break;
-                }
-            }
-        }
-        if (null != candidate) {
-            return candidate;
-        }
-        throw new NoBrokersAvailableException("No host available");
-    }
-
-    @Override
-    public void removeHost(SocketAddress address, Throwable reason) {
-        // nop
-    }
-}


Mime
View raw message