distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [49/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
deleted file mode 100644
index 072c3ef..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.BookKeeperClientBuilder;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark ledger reading.
- */
-public class LedgerReadBenchmark extends AbstractReaderBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
-
-    @Override
-    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
-        DistributedLogManager dlm = null;
-        while (null == dlm) {
-            try {
-                dlm = namespace.openLog(streamName);
-            } catch (IOException ioe) {
-                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
-            }
-            if (null == dlm) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
-                        streamName, e);
-                }
-            }
-        }
-        logger.info("Created dlm for stream {}.", streamName);
-
-        List<LogSegmentMetadata> segments = null;
-        while (null == segments) {
-            try {
-                segments = dlm.getLogSegments();
-            } catch (IOException ioe) {
-                logger.warn("Failed to get log segments for stream {} : ", streamName, ioe);
-            }
-            if (null == segments) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep while geting log segments for stream {} : ",
-                        streamName, e);
-                }
-            }
-        }
-
-        final Counter readCounter = statsLogger.getCounter("reads");
-
-        logger.info("Reading from log segments : {}", segments);
-
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .uri(uri)
-                .name("benchmark-zkc")
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .zkAclId(null)
-                .build();
-        BKDLConfig bkdlConfig;
-        try {
-            bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
-        } catch (IOException e) {
-            return;
-        }
-
-        BookKeeper bk;
-        try {
-            bk = BookKeeperClientBuilder.newBuilder()
-                    .name("benchmark-bkc")
-                    .dlConfig(conf)
-                    .zkServers(bkdlConfig.getBkZkServersForReader())
-                    .ledgersPath(bkdlConfig.getBkLedgersPath())
-                    .build()
-                    .get();
-        } catch (IOException e) {
-            return;
-        }
-
-        final int readConcurrency = conf.getInt("ledger_read_concurrency", 1000);
-        boolean streamRead = conf.getBoolean("ledger_stream_read", true);
-        try {
-            for (LogSegmentMetadata segment : segments) {
-                Stopwatch stopwatch = Stopwatch.createStarted();
-                long lid = segment.getLogSegmentId();
-                LedgerHandle lh = bk.openLedgerNoRecovery(
-                        lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
-                logger.info("It took {} ms to open log segment {}",
-                    new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
-                stopwatch.reset().start();
-                Runnable reader;
-                if (streamRead) {
-                    reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
-                        @Override
-                        public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
-                            readCounter.inc();
-                        }
-                    }, readConcurrency);
-                } else {
-                    reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
-                        @Override
-                        public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
-                            readCounter.inc();
-                        }
-                    }, readConcurrency);
-                }
-                reader.run();
-                logger.info("It took {} ms to complete reading {} entries from log segment {}",
-                    new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
-            }
-        } catch (Exception e) {
-            logger.error("Error on reading bk ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java
deleted file mode 100644
index e542af7..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import java.util.Enumeration;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Reading ledger in a streaming way.
- */
-public class LedgerStreamReader implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(LedgerStreamReader.class);
-
-    class PendingReadRequest implements AsyncCallback.ReadCallback {
-
-        final long entryId;
-        boolean isDone = false;
-        int rc;
-        LedgerEntry entry = null;
-
-        PendingReadRequest(long entryId) {
-            this.entryId = entryId;
-        }
-
-        void read() {
-            lh.asyncReadEntries(entryId, entryId, this, null);
-        }
-
-        void complete(ReadEntryListener listener) {
-            listener.onEntryComplete(rc, lh, entry, null);
-        }
-
-        @Override
-        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
-            this.rc = rc;
-            if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
-                entry = enumeration.nextElement();
-            } else {
-                entry = null;
-            }
-            isDone = true;
-            // construct a new read request
-            long nextEntry = nextReadEntry.getAndIncrement();
-            if (nextEntry <= lac) {
-                PendingReadRequest nextRead =
-                        new PendingReadRequest(nextEntry);
-                pendingReads.add(nextRead);
-                nextRead.read();
-            }
-            triggerCallbacks();
-        }
-    }
-
-    private final LedgerHandle lh;
-    private final long lac;
-    private final ReadEntryListener readEntryListener;
-    private final int concurrency;
-    private final AtomicLong nextReadEntry = new AtomicLong(0);
-    private final CountDownLatch done = new CountDownLatch(1);
-    private final ConcurrentLinkedQueue<PendingReadRequest> pendingReads =
-            new ConcurrentLinkedQueue<PendingReadRequest>();
-
-    public LedgerStreamReader(LedgerHandle lh,
-                              ReadEntryListener readEntryListener,
-                              int concurrency) {
-        this.lh = lh;
-        this.lac = lh.getLastAddConfirmed();
-        this.readEntryListener = readEntryListener;
-        this.concurrency = concurrency;
-        for (int i = 0; i < concurrency; i++) {
-            long entryId = nextReadEntry.getAndIncrement();
-            if (entryId > lac) {
-                break;
-            }
-            PendingReadRequest request = new PendingReadRequest(entryId);
-            pendingReads.add(request);
-            request.read();
-        }
-        if (pendingReads.isEmpty()) {
-            done.countDown();
-        }
-    }
-
-    synchronized void triggerCallbacks() {
-        PendingReadRequest request;
-        while ((request = pendingReads.peek()) != null) {
-            if (!request.isDone) {
-                break;
-            }
-            pendingReads.remove();
-            request.complete(readEntryListener);
-        }
-        if (pendingReads.isEmpty()) {
-            done.countDown();
-        }
-    }
-
-    @Override
-    public void run() {
-        try {
-            done.await();
-        } catch (InterruptedException e) {
-            logger.info("Interrupted on stream reading ledger {} : ", lh.getId(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java
deleted file mode 100644
index 280c9db..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-/**
- * The read mode for streaming read benchmark.
- */
-public enum ReadMode {
-    OLDEST,
-    LATEST,
-    REWIND,
-    POSITION
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java
deleted file mode 100644
index 1eff65a..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import java.io.File;
-import java.net.URI;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark Streams.
- */
-public abstract class StreamBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamBenchmark.class);
-
-    private static final String USAGE = "StreamBenchmark <benchmark-class> [options]";
-
-    protected final Options options = new Options();
-    protected URI uri;
-    protected DistributedLogConfiguration conf;
-    protected StatsProvider statsProvider;
-    protected String streamName;
-
-    protected StreamBenchmark() {
-        options.addOption("c", "conf", true, "Configuration File");
-        options.addOption("u", "uri", true, "DistributedLog URI");
-        options.addOption("p", "stats-provider", true, "Stats Provider");
-        options.addOption("s", "stream", true, "Stream Name");
-        options.addOption("h", "help", false, "Print usage.");
-    }
-
-    protected Options getOptions() {
-        return options;
-    }
-
-    protected void printUsage() {
-        HelpFormatter hf = new HelpFormatter();
-        hf.printHelp(USAGE, options);
-    }
-
-    protected void parseCommandLine(String[] args)
-            throws Exception {
-        BasicParser parser = new BasicParser();
-        CommandLine cmdline = parser.parse(options, args);
-        if (cmdline.hasOption("h")) {
-            printUsage();
-            System.exit(0);
-        }
-        if (cmdline.hasOption("u")) {
-            this.uri = URI.create(cmdline.getOptionValue("u"));
-        } else {
-            printUsage();
-            System.exit(0);
-        }
-        this.conf = new DistributedLogConfiguration();
-        if (cmdline.hasOption("c")) {
-            String configFile = cmdline.getOptionValue("c");
-            this.conf.loadConf(new File(configFile).toURI().toURL());
-        }
-        if (cmdline.hasOption("p")) {
-            statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
-        } else {
-            statsProvider = new NullStatsProvider();
-        }
-        if (cmdline.hasOption("s")) {
-            this.streamName = cmdline.getOptionValue("s");
-        } else {
-            printUsage();
-            System.exit(0);
-        }
-        parseCommandLine(cmdline);
-    }
-
-    protected abstract void parseCommandLine(CommandLine cmdline);
-
-    protected void run(String[] args) throws Exception {
-        logger.info("Parsing arguments for benchmark : {}", args);
-        // parse command line
-        parseCommandLine(args);
-        statsProvider.start(conf);
-        // run the benchmark
-        StatsLogger statsLogger = statsProvider.getStatsLogger("dl");
-        DistributedLogNamespace namespace =
-                DistributedLogNamespaceBuilder.newBuilder()
-                        .conf(conf)
-                        .uri(uri)
-                        .statsLogger(statsLogger)
-                        .build();
-        try {
-            benchmark(namespace, streamName, statsProvider.getStatsLogger("benchmark"));
-        } finally {
-            namespace.close();
-            statsProvider.stop();
-        }
-    }
-
-    protected abstract void benchmark(DistributedLogNamespace namespace,
-                                      String logName,
-                                      StatsLogger statsLogger);
-
-    public static void main(String[] args) throws Exception {
-        if (args.length <= 0) {
-            System.err.println(USAGE);
-            return;
-        }
-        String benchmarkClassName = args[0];
-        StreamBenchmark benchmark = ReflectionUtils.newInstance(
-                benchmarkClassName, StreamBenchmark.class);
-        benchmark.run(args);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java
deleted file mode 100644
index 122c8ef..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogReader;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark on {@link com.twitter.distributedlog.LogReader} reading from a stream.
- */
-public class SyncReaderBenchmark extends AbstractReaderBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
-
-    public SyncReaderBenchmark() {}
-
-    @Override
-    protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) {
-        DistributedLogManager dlm = null;
-        while (null == dlm) {
-            try {
-                dlm = namespace.openLog(streamName);
-            } catch (IOException ioe) {
-                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
-            }
-            if (null == dlm) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
-                        streamName, e);
-                }
-            }
-        }
-        OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
-        OpStatsLogger nonBlockingReadStats = statsLogger.getOpStatsLogger("non_blocking_read");
-        OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
-        Counter nullReadCounter = statsLogger.getCounter("null_read");
-
-        logger.info("Created dlm for stream {}.", streamName);
-        LogReader reader = null;
-        Long lastTxId = null;
-        while (null == reader) {
-            // initialize the last txid
-            if (null == lastTxId) {
-                switch (readMode) {
-                    case OLDEST:
-                        lastTxId = 0L;
-                        break;
-                    case LATEST:
-                        try {
-                            lastTxId = dlm.getLastTxId();
-                        } catch (IOException ioe) {
-                            continue;
-                        }
-                        break;
-                    case REWIND:
-                        lastTxId = System.currentTimeMillis() - rewindMs;
-                        break;
-                    case POSITION:
-                        lastTxId = fromTxId;
-                        break;
-                    default:
-                        logger.warn("Unsupported mode {}", readMode);
-                        printUsage();
-                        System.exit(0);
-                        break;
-                }
-                logger.info("Reading from transaction id {}", lastTxId);
-            }
-            // Open the reader
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            try {
-                reader = dlm.getInputStream(lastTxId);
-                long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
-                openReaderStats.registerSuccessfulEvent(elapsedMs);
-                logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
-            } catch (IOException ioe) {
-                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
-            }
-            if (null == reader) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
-                        streamName, e);
-                }
-                continue;
-            }
-
-            // read loop
-
-            LogRecord record;
-            boolean nonBlocking = false;
-            stopwatch = Stopwatch.createUnstarted();
-            long numCatchupReads = 0L;
-            long numCatchupBytes = 0L;
-            Stopwatch catchupStopwatch = Stopwatch.createStarted();
-            while (true) {
-                try {
-                    stopwatch.start();
-                    record = reader.readNext(nonBlocking);
-                    if (null != record) {
-                        long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-                        if (nonBlocking) {
-                            nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
-                        } else {
-                            numCatchupBytes += record.getPayload().length;
-                            ++numCatchupReads;
-                            blockingReadStats.registerSuccessfulEvent(elapsedMicros);
-                        }
-                        lastTxId = record.getTransactionId();
-                    } else {
-                        nullReadCounter.inc();
-                    }
-                    if (null == record && !nonBlocking) {
-                        nonBlocking = true;
-                        catchupStopwatch.stop();
-                        logger.info("Catchup {} records (total {} bytes) in {} milliseconds",
-                                new Object[] { numCatchupReads, numCatchupBytes,
-                                    stopwatch.elapsed(TimeUnit.MILLISECONDS) });
-                    }
-                    stopwatch.reset();
-                } catch (IOException e) {
-                    logger.warn("Encountered reading record from stream {} : ", streamName, e);
-                    reader = null;
-                    break;
-                }
-            }
-            try {
-                TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-            } catch (InterruptedException e) {
-                logger.warn("Interrupted from sleep while creating reader for stream {} : ",
-                    streamName, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java
deleted file mode 100644
index d8e198c..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Stream level benchmarks.
- */
-package com.twitter.distributedlog.benchmark.stream;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
deleted file mode 100644
index def0346..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.utils;
-
-import com.google.common.util.concurrent.RateLimiter;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A wrapper over rate limiter.
- */
-public class ShiftableRateLimiter implements Runnable {
-
-    private final RateLimiter rateLimiter;
-    private final ScheduledExecutorService executor;
-    private final double initialRate, maxRate, changeRate;
-    private final long changeInterval;
-    private final TimeUnit changeIntervalUnit;
-    private double nextRate;
-
-    public ShiftableRateLimiter(double initialRate,
-                                double maxRate,
-                                double changeRate,
-                                long changeInterval,
-                                TimeUnit changeIntervalUnit) {
-        this.initialRate = initialRate;
-        this.maxRate = maxRate;
-        this.changeRate = changeRate;
-        this.nextRate = initialRate;
-        this.changeInterval = changeInterval;
-        this.changeIntervalUnit = changeIntervalUnit;
-        this.rateLimiter = RateLimiter.create(initialRate);
-        this.executor = Executors.newSingleThreadScheduledExecutor();
-        this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit);
-    }
-
-    public ShiftableRateLimiter duplicate() {
-        return new ShiftableRateLimiter(
-                initialRate,
-                maxRate,
-                changeRate,
-                changeInterval,
-                changeIntervalUnit);
-    }
-
-    @Override
-    public void run() {
-        this.nextRate = Math.min(nextRate + changeRate, maxRate);
-        this.rateLimiter.setRate(nextRate);
-    }
-
-    public RateLimiter getLimiter() {
-        return this.rateLimiter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java
deleted file mode 100644
index 369b979..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Utils for benchmarking.
- */
-package com.twitter.distributedlog.benchmark.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java
new file mode 100644
index 0000000..f724102
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
+import com.twitter.finagle.stats.OstrichStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The launcher for benchmarks.
+ */
+public class Benchmarker {
+
+    private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class);
+
+    static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]";
+
+    final String[] args;
+    final Options options = new Options();
+
+    int rate = 100;
+    int maxRate = 1000;
+    int changeRate = 100;
+    int changeRateSeconds = 1800;
+    int concurrency = 10;
+    String streamPrefix = "dlog-loadtest";
+    int shardId = -1;
+    int numStreams = 10;
+    List<String> serversetPaths = new ArrayList<String>();
+    List<String> finagleNames = new ArrayList<String>();
+    int msgSize = 256;
+    String mode = null;
+    int durationMins = 60;
+    URI dlUri = null;
+    int batchSize = 0;
+    int readersPerStream = 1;
+    Integer maxStreamId = null;
+    int truncationInterval = 3600;
+    Integer startStreamId = null;
+    Integer endStreamId = null;
+    int hostConnectionCoreSize = 10;
+    int hostConnectionLimit = 10;
+    boolean thriftmux = false;
+    boolean handshakeWithClientInfo = false;
+    boolean readFromHead = false;
+    int sendBufferSize = 1024 * 1024;
+    int recvBufferSize = 1024 * 1024;
+    boolean enableBatching = false;
+    int batchBufferSize = 256 * 1024;
+    int batchFlushIntervalMicros = 2000;
+    String routingServiceFinagleNameString;
+
+    final DistributedLogConfiguration conf = new DistributedLogConfiguration();
+    final StatsReceiver statsReceiver = new OstrichStatsReceiver();
+    StatsProvider statsProvider = null;
+
+    Benchmarker(String[] args) {
+        this.args = args;
+        // prepare options
+        options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')");
+        options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')");
+        options.addOption("c", "conf", true, "DistributedLog Configuration File");
+        options.addOption("u", "uri", true, "DistributedLog URI");
+        options.addOption("i", "shard", true, "Shard Id");
+        options.addOption("p", "provider", true, "DistributedLog Stats Provider");
+        options.addOption("d", "duration", true, "Duration (minutes)");
+        options.addOption("sp", "streamprefix", true, "Stream Prefix");
+        options.addOption("sc", "streamcount", true, "Number of Streams");
+        options.addOption("ms", "messagesize", true, "Message Size (bytes)");
+        options.addOption("bs", "batchsize", true, "Batch Size");
+        options.addOption("r", "rate", true, "Rate limit (requests/second)");
+        options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)");
+        options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)");
+        options.addOption("ci", "change-interval", true, "Rate to increase period, seconds");
+        options.addOption("t", "concurrency", true, "Concurrency (number of threads)");
+        options.addOption("m", "mode", true, "Benchmark mode (read/write)");
+        options.addOption("rps", "readers-per-stream", true, "Number readers per stream");
+        options.addOption("msid", "max-stream-id", true, "Max Stream ID");
+        options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds");
+        options.addOption("ssid", "start-stream-id", true, "Start Stream ID");
+        options.addOption("esid", "end-stream-id", true, "Start Stream ID");
+        options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize");
+        options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit");
+        options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)");
+        options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
+        options.addOption("rfh", "read-from-head", false, "Read from head of the stream");
+        options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes");
+        options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes");
+        options.addOption("bt", "enable-batch", false, "Enable batching on writers");
+        options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
+        options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
+        options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
+        options.addOption("h", "help", false, "Print usage.");
+    }
+
+    void printUsage() {
+        HelpFormatter helpFormatter = new HelpFormatter();
+        helpFormatter.printHelp(USAGE, options);
+    }
+
+    void run() throws Exception {
+        logger.info("Running benchmark.");
+
+        BasicParser parser = new BasicParser();
+        CommandLine cmdline = parser.parse(options, args);
+        if (cmdline.hasOption("h")) {
+            printUsage();
+            System.exit(0);
+        }
+        if (cmdline.hasOption("s")) {
+            String serversetPathStr = cmdline.getOptionValue("s");
+            serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ','));
+        }
+        if (cmdline.hasOption("fn")) {
+            String finagleNameStr = cmdline.getOptionValue("fn");
+            finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ','));
+        }
+        if (cmdline.hasOption("i")) {
+            shardId = Integer.parseInt(cmdline.getOptionValue("i"));
+        }
+        if (cmdline.hasOption("d")) {
+            durationMins = Integer.parseInt(cmdline.getOptionValue("d"));
+        }
+        if (cmdline.hasOption("sp")) {
+            streamPrefix = cmdline.getOptionValue("sp");
+        }
+        if (cmdline.hasOption("sc")) {
+            numStreams = Integer.parseInt(cmdline.getOptionValue("sc"));
+        }
+        if (cmdline.hasOption("ms")) {
+            msgSize = Integer.parseInt(cmdline.getOptionValue("ms"));
+        }
+        if (cmdline.hasOption("r")) {
+            rate = Integer.parseInt(cmdline.getOptionValue("r"));
+        }
+        if (cmdline.hasOption("mr")) {
+            maxRate = Integer.parseInt(cmdline.getOptionValue("mr"));
+        }
+        if (cmdline.hasOption("cr")) {
+            changeRate = Integer.parseInt(cmdline.getOptionValue("cr"));
+        }
+        if (cmdline.hasOption("ci")) {
+            changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci"));
+        }
+        if (cmdline.hasOption("t")) {
+            concurrency = Integer.parseInt(cmdline.getOptionValue("t"));
+        }
+        if (cmdline.hasOption("m")) {
+            mode = cmdline.getOptionValue("m");
+        }
+        if (cmdline.hasOption("u")) {
+            dlUri = URI.create(cmdline.getOptionValue("u"));
+        }
+        if (cmdline.hasOption("bs")) {
+            batchSize = Integer.parseInt(cmdline.getOptionValue("bs"));
+            checkArgument("write" != mode, "batchSize supported only for mode=write");
+        }
+        if (cmdline.hasOption("c")) {
+            String configFile = cmdline.getOptionValue("c");
+            conf.loadConf(new File(configFile).toURI().toURL());
+        }
+        if (cmdline.hasOption("rps")) {
+            readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps"));
+        }
+        if (cmdline.hasOption("msid")) {
+            maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid"));
+        }
+        if (cmdline.hasOption("ti")) {
+            truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti"));
+        }
+        if (cmdline.hasOption("ssid")) {
+            startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid"));
+        }
+        if (cmdline.hasOption("esid")) {
+            endStreamId = Integer.parseInt(cmdline.getOptionValue("esid"));
+        }
+        if (cmdline.hasOption("hccs")) {
+            hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs"));
+        }
+        if (cmdline.hasOption("hcl")) {
+            hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl"));
+        }
+        if (cmdline.hasOption("sb")) {
+            sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb"));
+        }
+        if (cmdline.hasOption("rb")) {
+            recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
+        }
+        if (cmdline.hasOption("rs")) {
+            routingServiceFinagleNameString = cmdline.getOptionValue("rs");
+        }
+        thriftmux = cmdline.hasOption("mx");
+        handshakeWithClientInfo = cmdline.hasOption("hsci");
+        readFromHead = cmdline.hasOption("rfh");
+        enableBatching = cmdline.hasOption("bt");
+        if (cmdline.hasOption("bbs")) {
+            batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs"));
+        }
+        if (cmdline.hasOption("bfi")) {
+            batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi"));
+        }
+
+        checkArgument(shardId >= 0, "shardId must be >= 0");
+        checkArgument(numStreams > 0, "numStreams must be > 0");
+        checkArgument(durationMins > 0, "durationMins must be > 0");
+        checkArgument(streamPrefix != null, "streamPrefix must be defined");
+        checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0");
+        checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0");
+
+        if (cmdline.hasOption("p")) {
+            statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
+        } else {
+            statsProvider = new NullStatsProvider();
+        }
+
+        logger.info("Starting stats provider : {}.", statsProvider.getClass());
+        statsProvider.start(conf);
+
+        Worker w = null;
+        if (mode.startsWith("read")) {
+            w = runReader();
+        } else if (mode.startsWith("write")) {
+            w = runWriter();
+        } else if (mode.startsWith("dlwrite")) {
+            w = runDLWriter();
+        } else if (mode.startsWith("dlread")) {
+            w = runDLReader();
+        }
+
+        if (w == null) {
+            throw new IOException("Unknown mode " + mode + " to run the benchmark.");
+        }
+
+        Thread workerThread = new Thread(w, mode + "-benchmark-thread");
+        workerThread.start();
+
+        TimeUnit.MINUTES.sleep(durationMins);
+
+        logger.info("{} minutes passed, exiting...", durationMins);
+        w.close();
+
+        if (null != statsProvider) {
+            statsProvider.stop();
+        }
+
+        Runtime.getRuntime().exit(0);
+    }
+
+    Worker runWriter() {
+        checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+                "either serverset paths, finagle-names or uri required");
+        checkArgument(msgSize > 0, "messagesize must be greater than 0");
+        checkArgument(rate > 0, "rate must be greater than 0");
+        checkArgument(maxRate >= rate, "max rate must be greater than rate");
+        checkArgument(changeRate >= 0, "change rate must be positive");
+        checkArgument(changeRateSeconds >= 0, "change rate must be positive");
+        checkArgument(concurrency > 0, "concurrency must be greater than 0");
+
+        ShiftableRateLimiter rateLimiter =
+                new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
+        return createWriteWorker(
+                streamPrefix,
+                dlUri,
+                null == startStreamId ? shardId * numStreams : startStreamId,
+                null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
+                rateLimiter,
+                concurrency,
+                msgSize,
+                batchSize,
+                hostConnectionCoreSize,
+                hostConnectionLimit,
+                serversetPaths,
+                finagleNames,
+                statsReceiver.scope("write_client"),
+                statsProvider.getStatsLogger("write"),
+                thriftmux,
+                handshakeWithClientInfo,
+                sendBufferSize,
+                recvBufferSize,
+                enableBatching,
+                batchBufferSize,
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
+    }
+
+    protected WriterWorker createWriteWorker(
+            String streamPrefix,
+            URI uri,
+            int startStreamId,
+            int endStreamId,
+            ShiftableRateLimiter rateLimiter,
+            int writeConcurrency,
+            int messageSizeBytes,
+            int batchSize,
+            int hostConnectionCoreSize,
+            int hostConnectionLimit,
+            List<String> serverSetPaths,
+            List<String> finagleNames,
+            StatsReceiver statsReceiver,
+            StatsLogger statsLogger,
+            boolean thriftmux,
+            boolean handshakeWithClientInfo,
+            int sendBufferSize,
+            int recvBufferSize,
+            boolean enableBatching,
+            int batchBufferSize,
+            int batchFlushIntervalMicros,
+            String routingServiceFinagleNameString) {
+        return new WriterWorker(
+                streamPrefix,
+                uri,
+                startStreamId,
+                endStreamId,
+                rateLimiter,
+                writeConcurrency,
+                messageSizeBytes,
+                batchSize,
+                hostConnectionCoreSize,
+                hostConnectionLimit,
+                serverSetPaths,
+                finagleNames,
+                statsReceiver,
+                statsLogger,
+                thriftmux,
+                handshakeWithClientInfo,
+                sendBufferSize,
+                recvBufferSize,
+                enableBatching,
+                batchBufferSize,
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
+    }
+
+    Worker runDLWriter() throws IOException {
+        checkNotNull(dlUri, "dlUri must be defined");
+        checkArgument(rate > 0, "rate must be greater than 0");
+        checkArgument(maxRate >= rate, "max rate must be greater than rate");
+        checkArgument(changeRate >= 0, "change rate must be positive");
+        checkArgument(changeRateSeconds >= 0, "change rate must be positive");
+        checkArgument(concurrency > 0, "concurrency must be greater than 0");
+
+        ShiftableRateLimiter rateLimiter =
+                new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
+
+        return new DLWriterWorker(conf,
+                dlUri,
+                streamPrefix,
+                shardId * numStreams,
+                (shardId + 1) * numStreams,
+                rateLimiter,
+                concurrency,
+                msgSize,
+                statsProvider.getStatsLogger("dlwrite"));
+    }
+
+    Worker runReader() throws IOException {
+        checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+                "either serverset paths, finagle-names or dlUri required");
+        checkArgument(concurrency > 0, "concurrency must be greater than 0");
+        checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
+        return runReaderInternal(serversetPaths, finagleNames, truncationInterval);
+    }
+
+    Worker runDLReader() throws IOException {
+        return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0);
+    }
+
+    private Worker runReaderInternal(List<String> serversetPaths,
+                                     List<String> finagleNames,
+                                     int truncationInterval) throws IOException {
+        checkNotNull(dlUri);
+
+        int ssid = null == startStreamId ? shardId * numStreams : startStreamId;
+        int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId;
+        if (null != maxStreamId) {
+            esid = Math.min(esid, maxStreamId);
+        }
+
+        return createReaderWorker(
+                conf,
+                dlUri,
+                streamPrefix,
+                ssid,
+                esid,
+                concurrency,
+                serversetPaths,
+                finagleNames,
+                truncationInterval,
+                readFromHead,
+                statsReceiver,
+                statsProvider.getStatsLogger("dlreader"));
+    }
+
+    protected ReaderWorker createReaderWorker(
+            DistributedLogConfiguration conf,
+            URI uri,
+            String streamPrefix,
+            int startStreamId,
+            int endStreamId,
+            int readThreadPoolSize,
+            List<String> serverSetPaths,
+            List<String> finagleNames,
+            int truncationIntervalInSeconds,
+            boolean readFromHead, /* read from the earliest data of log */
+            StatsReceiver statsReceiver,
+            StatsLogger statsLogger) throws IOException {
+        return new ReaderWorker(
+                conf,
+                uri,
+                streamPrefix,
+                startStreamId,
+                endStreamId,
+                readThreadPoolSize,
+                serverSetPaths,
+                finagleNames,
+                truncationIntervalInSeconds,
+                readFromHead,
+                statsReceiver,
+                statsLogger);
+    }
+
+    public static void main(String[] args) {
+        Benchmarker benchmarker = new Benchmarker(args);
+        try {
+            benchmarker.run();
+        } catch (Exception e) {
+            logger.info("Benchmark quit due to : ", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
new file mode 100644
index 0000000..a5e7a0a
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.util.FutureEventListener;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The benchmark for core library writer.
+ */
+public class DLWriterWorker implements Worker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class);
+
+    static final int BACKOFF_MS = 200;
+
+    final String streamPrefix;
+    final int startStreamId;
+    final int endStreamId;
+    final int writeConcurrency;
+    final int messageSizeBytes;
+    final ExecutorService executorService;
+    final ScheduledExecutorService rescueService;
+    final ShiftableRateLimiter rateLimiter;
+    final Random random;
+    final DistributedLogNamespace namespace;
+    final List<DistributedLogManager> dlms;
+    final List<AsyncLogWriter> streamWriters;
+    final int numStreams;
+
+    volatile boolean running = true;
+
+    final StatsLogger statsLogger;
+    final OpStatsLogger requestStat;
+
+    public DLWriterWorker(DistributedLogConfiguration conf,
+                          URI uri,
+                          String streamPrefix,
+                          int startStreamId,
+                          int endStreamId,
+                          ShiftableRateLimiter rateLimiter,
+                          int writeConcurrency,
+                          int messageSizeBytes,
+                          StatsLogger statsLogger) throws IOException {
+        checkArgument(startStreamId <= endStreamId);
+        this.streamPrefix = streamPrefix;
+        this.startStreamId = startStreamId;
+        this.endStreamId = endStreamId;
+        this.rateLimiter = rateLimiter;
+        this.writeConcurrency = writeConcurrency;
+        this.messageSizeBytes = messageSizeBytes;
+        this.statsLogger = statsLogger;
+        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
+        this.executorService = Executors.newCachedThreadPool();
+        this.rescueService = Executors.newSingleThreadScheduledExecutor();
+        this.random = new Random(System.currentTimeMillis());
+
+        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .statsLogger(statsLogger.scope("dl"))
+                .build();
+        this.numStreams = endStreamId - startStreamId;
+        dlms = new ArrayList<DistributedLogManager>(numStreams);
+        streamWriters = new ArrayList<AsyncLogWriter>(numStreams);
+        final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>();
+        final CountDownLatch latch = new CountDownLatch(this.numStreams);
+        for (int i = startStreamId; i < endStreamId; i++) {
+            final String streamName = String.format("%s_%d", streamPrefix, i);
+            final DistributedLogManager dlm = namespace.openLog(streamName);
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
+                        if (null != writers.putIfAbsent(streamName, writer)) {
+                            FutureUtils.result(writer.asyncClose());
+                        }
+                        latch.countDown();
+                    } catch (IOException e) {
+                        LOG.error("Failed to intialize writer for stream : {}", streamName, e);
+                    }
+
+                }
+            });
+            dlms.add(dlm);
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted on initializing writers for streams.", e);
+        }
+        for (int i = startStreamId; i < endStreamId; i++) {
+            final String streamName = String.format("%s_%d", streamPrefix, i);
+            AsyncLogWriter writer = writers.get(streamName);
+            if (null == writer) {
+                throw new IOException("Writer for " + streamName + " never initialized.");
+            }
+            streamWriters.add(writer);
+        }
+        LOG.info("Writing to {} streams.", numStreams);
+    }
+
+    void rescueWriter(int idx, AsyncLogWriter writer) {
+        if (streamWriters.get(idx) == writer) {
+            try {
+                FutureUtils.result(writer.asyncClose());
+            } catch (IOException e) {
+                LOG.error("Failed to close writer for stream {}.", idx);
+            }
+            AsyncLogWriter newWriter = null;
+            try {
+                newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned();
+            } catch (IOException e) {
+                LOG.error("Failed to create new writer for stream {}, backoff for {} ms.",
+                          idx, BACKOFF_MS);
+                scheduleRescue(idx, writer, BACKOFF_MS);
+            }
+            streamWriters.set(idx, newWriter);
+        } else {
+            LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx);
+        }
+    }
+
+    void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) {
+        Runnable r = new Runnable() {
+            @Override
+            public void run() {
+                rescueWriter(idx, writer);
+            }
+        };
+        if (delayMs > 0) {
+            rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS);
+        } else {
+            rescueService.submit(r);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.running = false;
+        SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
+        SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
+        for (AsyncLogWriter writer : streamWriters) {
+            FutureUtils.result(writer.asyncClose());
+        }
+        for (DistributedLogManager dlm : dlms) {
+            dlm.close();
+        }
+        namespace.close();
+    }
+
+    @Override
+    public void run() {
+        LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})",
+                 new Object[] { writeConcurrency, streamPrefix, numStreams });
+        for (int i = 0; i < writeConcurrency; i++) {
+            executorService.submit(new Writer(i));
+        }
+    }
+
+    class Writer implements Runnable {
+
+        final int idx;
+
+        Writer(int idx) {
+            this.idx = idx;
+        }
+
+        @Override
+        public void run() {
+            LOG.info("Started writer {}.", idx);
+            while (running) {
+                final int streamIdx = random.nextInt(numStreams);
+                final AsyncLogWriter writer = streamWriters.get(streamIdx);
+                rateLimiter.getLimiter().acquire();
+                final long requestMillis = System.currentTimeMillis();
+                final byte[] data;
+                try {
+                    data = Utils.generateMessage(requestMillis, messageSizeBytes);
+                } catch (TException e) {
+                    LOG.error("Error on generating message : ", e);
+                    break;
+                }
+                writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() {
+                    @Override
+                    public void onSuccess(DLSN value) {
+                        requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+                        LOG.error("Failed to publish, rescue it : ", cause);
+                        scheduleRescue(streamIdx, writer, 0);
+                    }
+                });
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
new file mode 100644
index 0000000..11cba6f
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogRecordSet;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.benchmark.thrift.Message;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration$;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The benchmark for core library reader.
+ */
+public class ReaderWorker implements Worker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class);
+
+    static final int BACKOFF_MS = 200;
+
+    final String streamPrefix;
+    final int startStreamId;
+    final int endStreamId;
+    final ScheduledExecutorService executorService;
+    final ExecutorService callbackExecutor;
+    final DistributedLogNamespace namespace;
+    final DistributedLogManager[] dlms;
+    final AsyncLogReader[] logReaders;
+    final StreamReader[] streamReaders;
+    final int numStreams;
+    final boolean readFromHead;
+
+    final int truncationIntervalInSeconds;
+    // DL Client Related Variables
+    final DLZkServerSet[] serverSets;
+    final List<String> finagleNames;
+    final DistributedLogClient dlc;
+
+    volatile boolean running = true;
+
+    final StatsReceiver statsReceiver;
+    final StatsLogger statsLogger;
+    final OpStatsLogger e2eStat;
+    final OpStatsLogger deliveryStat;
+    final OpStatsLogger negativeE2EStat;
+    final OpStatsLogger negativeDeliveryStat;
+    final OpStatsLogger truncationStat;
+    final Counter invalidRecordsCounter;
+    final Counter outOfOrderSequenceIdCounter;
+
+    class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
+
+        final int streamIdx;
+        final String streamName;
+        DLSN prevDLSN = null;
+        long prevSequenceId = Long.MIN_VALUE;
+        private static final String gaugeLabel = "sequence_id";
+
+        StreamReader(int idx, StatsLogger statsLogger) {
+            this.streamIdx = idx;
+            int streamId = startStreamId + streamIdx;
+            streamName = String.format("%s_%d", streamPrefix, streamId);
+            statsLogger.scope(streamName).registerGauge(gaugeLabel, this);
+        }
+
+        @Override
+        public void onSuccess(final List<LogRecordWithDLSN> records) {
+            for (final LogRecordWithDLSN record : records) {
+                if (record.isRecordSet()) {
+                    try {
+                        processRecordSet(record);
+                    } catch (IOException e) {
+                        onFailure(e);
+                    }
+                } else {
+                    processRecord(record);
+                }
+            }
+            readLoop();
+        }
+
+        public void processRecordSet(final LogRecordWithDLSN record) throws IOException {
+            LogRecordSet.Reader reader = LogRecordSet.of(record);
+            LogRecordWithDLSN nextRecord = reader.nextRecord();
+            while (null != nextRecord) {
+                processRecord(nextRecord);
+                nextRecord = reader.nextRecord();
+            }
+        }
+
+        public void processRecord(final LogRecordWithDLSN record) {
+            Message msg;
+            try {
+                msg = Utils.parseMessage(record.getPayload());
+            } catch (TException e) {
+                invalidRecordsCounter.inc();
+                LOG.warn("Failed to parse record {} for stream {} : size = {} , ",
+                         new Object[] { record, streamIdx, record.getPayload().length, e });
+                return;
+            }
+            long curTimeMillis = System.currentTimeMillis();
+            long e2eLatency = curTimeMillis - msg.getPublishTime();
+            long deliveryLatency = curTimeMillis - record.getTransactionId();
+            if (e2eLatency >= 0) {
+                e2eStat.registerSuccessfulEvent(e2eLatency);
+            } else {
+                negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
+            }
+            if (deliveryLatency >= 0) {
+                deliveryStat.registerSuccessfulEvent(deliveryLatency);
+            } else {
+                negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
+            }
+
+            prevDLSN = record.getDlsn();
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            scheduleReinitStream(streamIdx).map(new Function<Void, Void>() {
+                @Override
+                public Void apply(Void value) {
+                    prevDLSN = null;
+                    prevSequenceId = Long.MIN_VALUE;
+                    readLoop();
+                    return null;
+                }
+            });
+        }
+
+        void readLoop() {
+            if (!running) {
+                return;
+            }
+            logReaders[streamIdx].readBulk(10).addEventListener(this);
+        }
+
+        @Override
+        public void run() {
+            final DLSN dlsnToTruncate = prevDLSN;
+            if (null == dlsnToTruncate) {
+                return;
+            }
+            final Stopwatch stopwatch = Stopwatch.createStarted();
+            dlc.truncate(streamName, dlsnToTruncate).addEventListener(
+                    new FutureEventListener<Boolean>() {
+                        @Override
+                        public void onSuccess(Boolean value) {
+                            truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+                        }
+
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+                            LOG.error("Failed to truncate stream {} to {} : ",
+                                    new Object[]{streamName, dlsnToTruncate, cause});
+                        }
+                    });
+        }
+
+        @Override
+        public Number getDefaultValue() {
+            return Long.MIN_VALUE;
+        }
+
+        @Override
+        public synchronized Number getSample() {
+            return prevSequenceId;
+        }
+
+        void unregisterGauge() {
+            statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this);
+        }
+    }
+
+    public ReaderWorker(DistributedLogConfiguration conf,
+                        URI uri,
+                        String streamPrefix,
+                        int startStreamId,
+                        int endStreamId,
+                        int readThreadPoolSize,
+                        List<String> serverSetPaths,
+                        List<String> finagleNames,
+                        int truncationIntervalInSeconds,
+                        boolean readFromHead, /* read from the earliest data of log */
+                        StatsReceiver statsReceiver,
+                        StatsLogger statsLogger) throws IOException {
+        checkArgument(startStreamId <= endStreamId);
+        this.streamPrefix = streamPrefix;
+        this.startStreamId = startStreamId;
+        this.endStreamId = endStreamId;
+        this.truncationIntervalInSeconds = truncationIntervalInSeconds;
+        this.readFromHead = readFromHead;
+        this.statsReceiver = statsReceiver;
+        this.statsLogger = statsLogger;
+        this.e2eStat = this.statsLogger.getOpStatsLogger("e2e");
+        this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative");
+        this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery");
+        this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative");
+        this.truncationStat = this.statsLogger.getOpStatsLogger("truncation");
+        this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records");
+        this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id");
+        this.executorService = Executors.newScheduledThreadPool(
+                readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
+        this.callbackExecutor = Executors.newFixedThreadPool(
+                Runtime.getRuntime().availableProcessors(),
+                new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build());
+        this.finagleNames = finagleNames;
+        this.serverSets = createServerSets(serverSetPaths);
+
+        conf.setDeserializeRecordSetOnReads(false);
+
+        if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) {
+            // Construct client for truncation
+            DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+                    .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader"))
+                    .clientBuilder(ClientBuilder.get()
+                        .hostConnectionLimit(10)
+                        .hostConnectionCoresize(10)
+                        .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1))
+                        .requestTimeout(Duration$.MODULE$.fromSeconds(2)))
+                    .redirectBackoffStartMs(100)
+                    .redirectBackoffMaxMs(500)
+                    .requestTimeoutMs(2000)
+                    .statsReceiver(statsReceiver)
+                    .thriftmux(true)
+                    .name("reader");
+
+            if (serverSetPaths.isEmpty()) {
+                // Prepare finagle names
+                String local = finagleNames.get(0);
+                String[] remotes = new String[finagleNames.size() - 1];
+                finagleNames.subList(1, finagleNames.size()).toArray(remotes);
+
+                builder = builder.finagleNameStrs(local, remotes);
+                LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
+            } else if (serverSets.length != 0){
+                ServerSet local = this.serverSets[0].getServerSet();
+                ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
+                for (int i = 1; i < serverSets.length; i++) {
+                    remotes[i - 1] = serverSets[i].getServerSet();
+                }
+
+                builder = builder.serverSets(local, remotes);
+                LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
+            } else {
+                builder = builder.uri(uri);
+                LOG.info("Initialized distributedlog client for namespace {}", uri);
+            }
+            dlc = builder.build();
+        } else {
+            dlc = null;
+        }
+
+        // construct the factory
+        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .statsLogger(statsLogger.scope("dl"))
+                .build();
+        this.numStreams = endStreamId - startStreamId;
+        this.dlms = new DistributedLogManager[numStreams];
+        this.logReaders = new AsyncLogReader[numStreams];
+        final CountDownLatch latch = new CountDownLatch(numStreams);
+        for (int i = 0; i < numStreams; i++) {
+            final int idx = i;
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    reinitStream(idx).map(new Function<Void, Void>() {
+                        @Override
+                        public Void apply(Void value) {
+                            LOG.info("Initialized stream reader {}.", idx);
+                            latch.countDown();
+                            return null;
+                        }
+                    });
+                }
+            });
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new DLInterruptedException("Failed to intialize benchmark readers : ", e);
+        }
+        this.streamReaders = new StreamReader[numStreams];
+        for (int i = 0; i < numStreams; i++) {
+            streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream"));
+            if (truncationIntervalInSeconds > 0) {
+                executorService.scheduleWithFixedDelay(streamReaders[i],
+                        truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS);
+            }
+        }
+        LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})",
+                 new Object[] { numStreams, streamPrefix, startStreamId, endStreamId });
+    }
+
+    protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
+        DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
+        for (int i = 0; i < serverSets.length; i++) {
+            String serverSetPath = serverSetPaths.get(i);
+            serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
+        }
+        return serverSets;
+    }
+
+    private Future<Void> reinitStream(int idx) {
+        Promise<Void> promise = new Promise<Void>();
+        reinitStream(idx, promise);
+        return promise;
+    }
+
+    private void reinitStream(int idx, Promise<Void> promise) {
+        int streamId = startStreamId + idx;
+        String streamName = String.format("%s_%d", streamPrefix, streamId);
+
+        if (logReaders[idx] != null) {
+            try {
+                FutureUtils.result(logReaders[idx].asyncClose());
+            } catch (IOException e) {
+                LOG.warn("Failed on closing stream reader {} : ", streamName, e);
+            }
+            logReaders[idx] = null;
+        }
+        if (dlms[idx] != null) {
+            try {
+                dlms[idx].close();
+            } catch (IOException e) {
+                LOG.warn("Failed on closing dlm {} : ", streamName, e);
+            }
+            dlms[idx] = null;
+        }
+
+        try {
+            dlms[idx] = namespace.openLog(streamName);
+        } catch (IOException ioe) {
+            LOG.error("Failed on creating dlm {} : ", streamName, ioe);
+            scheduleReinitStream(idx, promise);
+            return;
+        }
+        DLSN lastDLSN;
+        if (readFromHead) {
+            lastDLSN = DLSN.InitialDLSN;
+        } else {
+            try {
+                lastDLSN = dlms[idx].getLastDLSN();
+            } catch (IOException ioe) {
+                LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe);
+                scheduleReinitStream(idx, promise);
+                return;
+            }
+        }
+        try {
+            logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN);
+        } catch (IOException ioe) {
+            LOG.error("Failed on opening reader for stream {} starting from {} : ",
+                      new Object[] { streamName, lastDLSN, ioe });
+            scheduleReinitStream(idx, promise);
+            return;
+        }
+        LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN);
+        promise.setValue(null);
+    }
+
+    Future<Void> scheduleReinitStream(int idx) {
+        Promise<Void> promise = new Promise<Void>();
+        scheduleReinitStream(idx, promise);
+        return promise;
+    }
+
+    void scheduleReinitStream(final int idx, final Promise<Void> promise) {
+        executorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                reinitStream(idx, promise);
+            }
+        }, BACKOFF_MS, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.running = false;
+        for (AsyncLogReader reader : logReaders) {
+            if (null != reader) {
+                FutureUtils.result(reader.asyncClose());
+            }
+        }
+        for (DistributedLogManager dlm : dlms) {
+            if (null != dlm) {
+                dlm.close();
+            }
+        }
+        namespace.close();
+        SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
+        SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES);
+        if (this.dlc != null) {
+            this.dlc.close();
+        }
+        for (DLZkServerSet serverSet: serverSets) {
+            serverSet.close();
+        }
+        // Unregister gauges to prevent GC spirals
+        for (StreamReader sr : streamReaders) {
+            sr.unregisterGauge();
+        }
+    }
+
+    @Override
+    public void run() {
+        LOG.info("Starting reader (prefix = {}, numStreams = {}).",
+                 streamPrefix, numStreams);
+        for (StreamReader sr : streamReaders) {
+            sr.readLoop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java
new file mode 100644
index 0000000..81f99ef
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import org.apache.distributedlog.benchmark.thrift.Message;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+/**
+ * Utils for generating and parsing messages.
+ */
+public class Utils {
+
+    static final Random RAND = new Random(System.currentTimeMillis());
+    static final ThreadLocal<TSerializer> MSG_SERIALIZER =
+            new ThreadLocal<TSerializer>() {
+                @Override
+                public TSerializer initialValue() {
+                    return new TSerializer(new TBinaryProtocol.Factory());
+                }
+            };
+
+    public static byte[] generateMessage(long requestMillis, int payLoadSize) throws TException {
+        byte[] payload = new byte[payLoadSize];
+        RAND.nextBytes(payload);
+        Message msg = new Message(requestMillis, ByteBuffer.wrap(payload));
+        return MSG_SERIALIZER.get().serialize(msg);
+    }
+
+    public static Message parseMessage(byte[] data) throws TException {
+        Message msg = new Message();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TBinaryProtocol protocol = new TBinaryProtocol(transport);
+        msg.read(protocol);
+        return msg;
+    }
+
+}



Mime
View raw message