bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [22/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future
Date Wed, 21 Jun 2017 17:20:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java
new file mode 100644
index 0000000..72a5657
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.FileNotFoundException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class
+ * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the
+ * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only
+ * be reloaded if something changed.
+ * Notes:
+ * 1. Reload schedule is never terminated. The assumption is a finite number of these are started
+ * at the calling layer, and terminated only once the executor service is shut down.
+ * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access
+ * to this object is always single threaded.
+ */
+public class ConfigurationSubscription {
+    static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class);
+
+    private final ConcurrentBaseConfiguration viewConfig;
+    private final ScheduledExecutorService executorService;
+    private final int reloadPeriod;
+    private final TimeUnit reloadUnit;
+    private final List<FileConfigurationBuilder> fileConfigBuilders;
+    private final List<FileConfiguration> fileConfigs;
+    private final CopyOnWriteArraySet<ConfigurationListener> confListeners;
+
+    public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig,
+                                     List<FileConfigurationBuilder> fileConfigBuilders,
+                                     ScheduledExecutorService executorService,
+                                     int reloadPeriod,
+                                     TimeUnit reloadUnit)
+            throws ConfigurationException {
+        checkNotNull(fileConfigBuilders);
+        checkArgument(!fileConfigBuilders.isEmpty());
+        checkNotNull(executorService);
+        checkNotNull(viewConfig);
+        this.viewConfig = viewConfig;
+        this.executorService = executorService;
+        this.reloadPeriod = reloadPeriod;
+        this.reloadUnit = reloadUnit;
+        this.fileConfigBuilders = fileConfigBuilders;
+        this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size());
+        this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>();
+        reload();
+        scheduleReload();
+    }
+
+    public void registerListener(ConfigurationListener listener) {
+        this.confListeners.add(listener);
+    }
+
+    public void unregisterListener(ConfigurationListener listener) {
+        this.confListeners.remove(listener);
+    }
+
+    private boolean initConfig() {
+        if (fileConfigs.isEmpty()) {
+            try {
+                for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) {
+                    FileConfiguration fileConfig = fileConfigBuilder.getConfiguration();
+                    FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
+                    reloadingStrategy.setRefreshDelay(0);
+                    fileConfig.setReloadingStrategy(reloadingStrategy);
+                    fileConfigs.add(fileConfig);
+                }
+            } catch (ConfigurationException ex) {
+                if (!fileNotFound(ex)) {
+                    LOG.error("Config init failed {}", ex);
+                }
+            }
+        }
+        return !fileConfigs.isEmpty();
+    }
+
+    private void scheduleReload() {
+        executorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                reload();
+            }
+        }, 0, reloadPeriod, reloadUnit);
+    }
+
+    @VisibleForTesting
+    void reload() {
+        // No-op if already loaded.
+        if (!initConfig()) {
+            return;
+        }
+        // Reload if config exists.
+        Set<String> confKeys = Sets.newHashSet();
+        for (FileConfiguration fileConfig : fileConfigs) {
+            LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(),
+                    fileConfig.getFile().lastModified());
+            fileConfig.reload();
+            // load keys
+            Iterator keyIter = fileConfig.getKeys();
+            while (keyIter.hasNext()) {
+                String key = (String) keyIter.next();
+                confKeys.add(key);
+            }
+        }
+        // clear unexisted keys
+        Iterator viewIter = viewConfig.getKeys();
+        while (viewIter.hasNext()) {
+            String key = (String) viewIter.next();
+            if (!confKeys.contains(key)) {
+                clearViewProperty(key);
+            }
+        }
+        LOG.info("Reload features : {}", confKeys);
+        // load keys from files
+        for (FileConfiguration fileConfig : fileConfigs) {
+            try {
+                loadView(fileConfig);
+            } catch (Exception ex) {
+                if (!fileNotFound(ex)) {
+                    LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex);
+                }
+            }
+        }
+        for (ConfigurationListener listener : confListeners) {
+            listener.onReload(viewConfig);
+        }
+    }
+
+    private boolean fileNotFound(Exception ex) {
+        return ex instanceof FileNotFoundException
+            || ex.getCause() != null && ex.getCause() instanceof FileNotFoundException;
+    }
+
+    private void loadView(FileConfiguration fileConfig) {
+        Iterator fileIter = fileConfig.getKeys();
+        while (fileIter.hasNext()) {
+            String key = (String) fileIter.next();
+            setViewProperty(fileConfig, key, fileConfig.getProperty(key));
+        }
+    }
+
+    private void clearViewProperty(String key) {
+        LOG.debug("Removing property, key={}", key);
+        viewConfig.clearProperty(key);
+    }
+
+    private void setViewProperty(FileConfiguration fileConfig,
+                                 String key,
+                                 Object value) {
+        if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) {
+            LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key));
+            viewConfig.setProperty(key, fileConfig.getProperty(key));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java
new file mode 100644
index 0000000..0ff967d
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.FileConfiguration;
+
+/**
+ * Abstract out FileConfiguration subclass construction.
+ */
+public interface FileConfigurationBuilder {
+    FileConfiguration getConfiguration() throws ConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java
new file mode 100644
index 0000000..2d07535
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import java.net.URL;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+/**
+ * Hide PropertiesConfiguration dependency.
+ */
+public class PropertiesConfigurationBuilder implements FileConfigurationBuilder {
+    private URL url;
+
+    public PropertiesConfigurationBuilder(URL url) {
+        this.url = url;
+    }
+
+    @Override
+    public FileConfiguration getConfiguration() throws ConfigurationException {
+        return new PropertiesConfiguration(url);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java
new file mode 100644
index 0000000..88e68f2
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Configuration Related Utils.
+ */
+package org.apache.distributedlog.common.config;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java
new file mode 100644
index 0000000..8d5069e
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.functions;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Functions for transforming structures related to {@link Void}.
+ */
+public class VoidFunctions {
+
+    public static final Function<List<Void>, Void> LIST_TO_VOID_FUNC =
+      list -> null;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java
new file mode 100644
index 0000000..9e88612
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Common Functions.
+ */
+package org.apache.distributedlog.common.functions;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java
new file mode 100644
index 0000000..4c90bd2
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common functions and utils used across the project.
+ */
+package org.apache.distributedlog.common;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java
new file mode 100644
index 0000000..f3e8c33
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.rate;
+
+/**
+ * Moving Average Rate.
+ */
+public interface MovingAverageRate {
+    double get();
+    void add(long amount);
+    void inc();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java
new file mode 100644
index 0000000..790ba03
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.rate;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Factory to create {@link MovingAverageRate} instances.
+ */
+public class MovingAverageRateFactory implements Runnable {
+
+    private static final int DEFAULT_INTERVAL_SECS = 1;
+
+    private final ScheduledExecutorService scheduler;
+    private final ScheduledFuture<?> scheduledFuture;
+    private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs;
+
+    public MovingAverageRateFactory(ScheduledExecutorService scheduler) {
+        this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>();
+        this.scheduler = scheduler;
+        this.scheduledFuture = this.scheduler.scheduleAtFixedRate(
+            this, DEFAULT_INTERVAL_SECS, DEFAULT_INTERVAL_SECS, TimeUnit.SECONDS);
+    }
+
+    public MovingAverageRate create(int intervalSecs) {
+        SampledMovingAverageRate avg = new SampledMovingAverageRate(intervalSecs);
+        avgs.add(avg);
+        return avg;
+    }
+
+    public void close() {
+        scheduledFuture.cancel(true);
+        avgs.clear();
+    }
+
+    @Override
+    public void run() {
+        sampleAll();
+    }
+
+    private void sampleAll() {
+        avgs.forEach(SampledMovingAverageRate::sample);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
new file mode 100644
index 0000000..2c89d64
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.rate;
+
+import com.google.common.base.Ticker;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Sampled {@link MovingAverageRate}.
+ */
+class SampledMovingAverageRate implements MovingAverageRate {
+
+    private static final long NANOS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
+
+    private final AtomicLong total;
+    private final Ticker ticker;
+    private final double scaleFactor;
+    private final LinkedBlockingDeque<Pair<Long, Long>> samples;
+
+    private double value;
+
+    public SampledMovingAverageRate(int intervalSecs) {
+        this(intervalSecs, 1, Ticker.systemTicker());
+    }
+
+    SampledMovingAverageRate(int intervalSecs,
+                             double scaleFactor,
+                             Ticker ticker) {
+        this.value = 0;
+        this.total = new AtomicLong(0);
+        this.scaleFactor = scaleFactor;
+        this.ticker = ticker;
+        this.samples = new LinkedBlockingDeque<>(intervalSecs);
+    }
+
+    @Override
+    public double get() {
+        return value;
+    }
+
+    @Override
+    public void add(long amount) {
+        total.getAndAdd(amount);
+    }
+
+    @Override
+    public void inc() {
+        add(1);
+    }
+
+    void sample() {
+        value = doSample();
+    }
+
+    private double doSample() {
+        long newSample = total.get();
+        long newTimestamp = ticker.read();
+
+        double rate = 0;
+        if (!samples.isEmpty()) {
+            Pair<Long, Long> oldestSample = samples.peekLast();
+
+            double dy = newSample - oldestSample.getRight();
+            double dt = newTimestamp - oldestSample.getLeft();
+
+            rate = (dt == 0) ? 0 : (NANOS_PER_SEC * scaleFactor * dy) / dt;
+        }
+
+        if (samples.remainingCapacity() == 0) {
+            samples.removeLast();
+        } else {
+            samples.addFirst(Pair.of(newTimestamp, newSample));
+        }
+
+        return rate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java
new file mode 100644
index 0000000..3117c64
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rate relate functions.
+ */
+package org.apache.distributedlog.common.rate;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
new file mode 100644
index 0000000..61a20f1
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.bookkeeper.stats.CachingStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsData;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Stats Loggers that broadcast stats to multiple {@link StatsLogger}.
+ */
+public class BroadCastStatsLogger {
+
+    /**
+     * Create a broadcast stats logger of two stats loggers `<code>first</code>` and
+     * `<code>second</code>`. The returned stats logger doesn't allow registering any
+     * {@link Gauge}.
+     *
+     * @param first
+     *          first stats logger
+     * @param second
+     *          second stats logger
+     * @return broadcast stats logger
+     */
+    public static StatsLogger two(StatsLogger first, StatsLogger second) {
+        return new CachingStatsLogger(new Two(first, second));
+    }
+
+    static class Two implements StatsLogger {
+        protected final StatsLogger first;
+        protected final StatsLogger second;
+
+        private Two(StatsLogger first, StatsLogger second) {
+            super();
+            checkNotNull(first);
+            checkNotNull(second);
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public OpStatsLogger getOpStatsLogger(final String statName) {
+            final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
+            final OpStatsLogger secondLogger = second.getOpStatsLogger(statName);
+            return new OpStatsLogger() {
+                @Override
+                public void registerFailedEvent(long l) {
+                    firstLogger.registerFailedEvent(l);
+                    secondLogger.registerFailedEvent(l);
+                }
+
+                @Override
+                public void registerSuccessfulEvent(long l) {
+                    firstLogger.registerSuccessfulEvent(l);
+                    secondLogger.registerSuccessfulEvent(l);
+                }
+
+                @Override
+                public OpStatsData toOpStatsData() {
+                    // Eventually consistent.
+                    return firstLogger.toOpStatsData();
+                }
+
+                @Override
+                public void clear() {
+                    firstLogger.clear();
+                    secondLogger.clear();
+                }
+            };
+        }
+
+        @Override
+        public Counter getCounter(final String statName) {
+            final Counter firstCounter = first.getCounter(statName);
+            final Counter secondCounter = second.getCounter(statName);
+            return new Counter() {
+                @Override
+                public void clear() {
+                    firstCounter.clear();
+                    secondCounter.clear();
+                }
+
+                @Override
+                public void inc() {
+                    firstCounter.inc();
+                    secondCounter.inc();
+                }
+
+                @Override
+                public void dec() {
+                    firstCounter.dec();
+                    secondCounter.dec();
+                }
+
+                @Override
+                public void add(long l) {
+                    firstCounter.add(l);
+                    secondCounter.add(l);
+                }
+
+                @Override
+                public Long get() {
+                    // Eventually consistent.
+                    return firstCounter.get();
+                }
+            };
+        }
+
+        @Override
+        public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) {
+            // Different underlying stats loggers have different semantics wrt. gauge registration.
+            throw new RuntimeException("Cannot register a gauge on BroadCastStatsLogger.Two");
+        }
+
+        @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            // no-op
+        }
+
+        @Override
+        public StatsLogger scope(final String scope) {
+            return new Two(first.scope(scope), second.scope(scope));
+        }
+
+        @Override
+        public void removeScope(String scope, StatsLogger statsLogger) {
+            if (!(statsLogger instanceof Two)) {
+                return;
+            }
+
+            Two another = (Two) statsLogger;
+
+            first.removeScope(scope, another.first);
+            second.removeScope(scope, another.second);
+        }
+    }
+
+    /**
+     * Create a broadcast stats logger of two stats loggers <code>master</code> and <code>slave</code>.
+     * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows registering {@link Gauge}s.
+     * The {@link Gauge} will be registered under master.
+     *
+     * @param master
+     *          master stats logger to receive {@link Counter}, {@link OpStatsLogger} and {@link Gauge}.
+     * @param slave
+     *          slave stats logger to receive only {@link Counter} and {@link OpStatsLogger}.
+     * @return broadcast stats logger
+     */
+    public static StatsLogger masterslave(StatsLogger master, StatsLogger slave) {
+        return new CachingStatsLogger(new MasterSlave(master, slave));
+    }
+
+    static class MasterSlave extends Two {
+
+        private MasterSlave(StatsLogger master, StatsLogger slave) {
+            super(master, slave);
+        }
+
+        @Override
+        public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) {
+            first.registerGauge(statName, gauge);
+        }
+
+        @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            first.unregisterGauge(statName, gauge);
+        }
+
+        @Override
+        public StatsLogger scope(String scope) {
+            return new MasterSlave(first.scope(scope), second.scope(scope));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
new file mode 100644
index 0000000..e71a799
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.stats;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+
+/**
+ * A {@link FutureEventListener} monitors the stats for a given operation.
+ */
+public class OpStatsListener<T> implements FutureEventListener<T> {
+    OpStatsLogger opStatsLogger;
+    Stopwatch stopwatch;
+
+    public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) {
+        this.opStatsLogger = opStatsLogger;
+        if (null == stopwatch) {
+            this.stopwatch = Stopwatch.createStarted();
+        } else {
+            this.stopwatch = stopwatch;
+        }
+    }
+
+    public OpStatsListener(OpStatsLogger opStatsLogger) {
+        this(opStatsLogger, null);
+    }
+
+    @Override
+    public void onSuccess(T value) {
+        opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+    }
+
+    @Override
+    public void onFailure(Throwable cause) {
+        opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java
new file mode 100644
index 0000000..bf3859d
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stats Related Utils.
+ */
+package org.apache.distributedlog.common.stats;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java
new file mode 100644
index 0000000..53f4ab2
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Utils for bit mask operations.
+ */
+public class BitMaskUtils {
+
+    /**
+     * 1) Unset all bits where value in mask is set.
+     * 2) Set these bits to value specified by newValue.
+     *
+     * <p>e.g.
+     * if oldValue = 1010, mask = 0011, newValue = 0001
+     * 1) 1010 -> 1000
+     * 2) 1000 -> 1001
+     *
+     * @param oldValue expected old value
+     * @param mask the mask of the value for updates
+     * @param newValue new value to set
+     * @return updated value
+     */
+    public static long set(long oldValue, long mask, long newValue) {
+        checkArgument(oldValue >= 0L && mask >= 0L && newValue >= 0L);
+        return ((oldValue & (~mask)) | (newValue & mask));
+    }
+
+    /**
+     * Get the bits where mask is 1.
+     *
+     * @param value value
+     * @param mask mask of the value
+     * @return the bit of the mask
+     */
+    public static long get(long value, long mask) {
+        checkArgument(value >= 0L && mask >= 0L);
+        return (value & mask);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java
new file mode 100644
index 0000000..38b3ed2
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.util;
+
+/**
+ * Helpers for math related utils.
+ */
+public class MathUtil {
+
+  public static int signSafeMod(long dividend, int divisor) {
+    int mod = (int) (dividend % divisor);
+
+    if (mod < 0) {
+      mod += divisor;
+    }
+
+    return mod;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java
new file mode 100644
index 0000000..24cb63d
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * Permit.
+ */
+@FunctionalInterface
+public interface Permit {
+
+    void release();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java
new file mode 100644
index 0000000..8fcbf12
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * A simple limiter interface which tracks acquire/release of permits, for
+ * example for tracking outstanding writes.
+ */
+public interface PermitLimiter {
+
+    PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() {
+        @Override
+        public boolean acquire() {
+            return true;
+        }
+        @Override
+        public void release(int permits) {
+        }
+
+        @Override
+        public void close() {
+
+        }
+    };
+
+    /**
+     * Acquire a permit.
+     *
+     * @return true if successfully acquire a permit, otherwise false.
+     */
+    boolean acquire();
+
+    /**
+     * Release a permit.
+     */
+    void release(int permits);
+
+    /**
+     * Close the resources created by the limiter.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java
new file mode 100644
index 0000000..3b6e3a1
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * Permit manager for managing permits.
+ */
+public interface PermitManager {
+
+    /**
+     * A class present a permit managed by a permit manager.
+     */
+    interface Permit {
+        Permit ALLOWED = () -> true;
+        boolean isAllowed();
+    }
+
+    PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() {
+        @Override
+        public Permit acquirePermit() {
+            return Permit.ALLOWED;
+        }
+
+        @Override
+        public void releasePermit(Permit permit) {
+            // nop
+        }
+
+        @Override
+        public boolean allowObtainPermits() {
+            return true;
+        }
+
+        @Override
+        public boolean disallowObtainPermits(Permit permit) {
+            return false;
+        }
+
+        @Override
+        public void close() {
+            // nop
+        }
+
+    };
+
+    /**
+     * Obetain a permit from permit manager.
+     *
+     * @return permit.
+     */
+    Permit acquirePermit();
+
+    /**
+     * Release a given permit.
+     *
+     * @param permit
+     *          permit to release
+     */
+    void releasePermit(Permit permit);
+
+    /**
+     * Allow obtaining permits.
+     */
+    boolean allowObtainPermits();
+
+    /**
+     * Disallow obtaining permits. Disallow needs to be performed under the context
+     * of <i>permit</i>.
+     *
+     * @param permit
+     *          permit context to disallow
+     */
+    boolean disallowObtainPermits(Permit permit);
+
+    /**
+     * Release the resources.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java
new file mode 100644
index 0000000..f6d4f23
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Scheduler related utils.
+ */
+@Slf4j
+public class SchedulerUtils {
+
+    public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) {
+        if (null == service) {
+            return;
+        }
+        service.shutdown();
+        try {
+            service.awaitTermination(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            log.warn("Interrupted when shutting down scheduler : ", e);
+        }
+        service.shutdownNow();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java
new file mode 100644
index 0000000..a40b8e2
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * Sequencer generating transaction id.
+ */
+public interface Sequencer {
+
+    /**
+     * Return next transaction id generated by the sequencer.
+     *
+     * @return next transaction id generated by the sequencer.
+     */
+    long nextId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java
new file mode 100644
index 0000000..d418e0f
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+/**
+ * The {@code Sizable} interface is to provide the capability of calculating size
+ * of any objects.
+ */
+public interface Sizable {
+    /**
+     * Calculate the size for this instance.
+     *
+     * @return size of the instance.
+     */
+    long size();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java
new file mode 100644
index 0000000..e2bde37
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common utility functions.
+ */
+package org.apache.distributedlog.common.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java
new file mode 100644
index 0000000..4edc09d
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import java.io.IOException;
+
+/**
+ * An {@code Abortable} is a source or destination of data that can be aborted.
+ * The abort method is invoked to release resources that the object is holding
+ * (such as open files). The abort happens when the object is in an error state,
+ * which it couldn't be closed gracefully.
+ *
+ * @see java.io.Closeable
+ * @since 0.3.32
+ */
+public interface Abortable {
+
+    /**
+     * Aborts the object and releases any resources associated with it.
+     * If the object is already aborted then invoking this method has no
+     * effect.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    void abort() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java
new file mode 100644
index 0000000..b6101a8
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.common.functions.VoidFunctions;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+
+/**
+ * Utility methods for working with {@link Abortable} objects.
+ *
+ * @since 0.3.32
+ */
+@Slf4j
+public final class Abortables {
+
+    private Abortables() {}
+
+    public static CompletableFuture<Void> asyncAbort(@Nullable AsyncAbortable abortable,
+                                                     boolean swallowIOException) {
+        if (null == abortable) {
+            return FutureUtils.Void();
+        } else if (swallowIOException) {
+            return FutureUtils.ignore(abortable.asyncAbort());
+        } else {
+            return abortable.asyncAbort();
+        }
+    }
+
+    /**
+     * Aborts a {@link Abortable}, with control over whether an {@link IOException} may be thrown.
+     * This is primarily useful in a finally block, where a thrown exception needs to be logged but
+     * not propagated (otherwise the original exception will be lost).
+     *
+     * <p>If {@code swallowIOException} is true then we never throw {@code IOException} but merely log it.
+     *
+     * <p>Example: <pre>   {@code
+     *
+     *   public void abortStreamNicely() throws IOException {
+     *      SomeStream stream = new SomeStream("foo");
+     *      try {
+     *          // ... code which does something with the stream ...
+     *      } catch (IOException ioe) {
+     *          // If an exception occurs, we might abort the stream.
+     *          Abortables.abort(stream, true);
+     *      }
+     *   }}</pre>
+     *
+     * @param abortable the {@code Abortable} object to be aborted, or null, in which case this method
+     *                  does nothing.
+     * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
+     * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
+     */
+    public static void abort(@Nullable Abortable abortable,
+                             boolean swallowIOException)
+        throws IOException {
+        if (null == abortable) {
+            return;
+        }
+        try {
+            abortable.abort();
+        } catch (IOException ioe) {
+            if (swallowIOException) {
+                log.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe);
+            } else {
+                throw ioe;
+            }
+        }
+    }
+
+    /**
+     * Abort async <i>abortable</i>.
+     *
+     * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method
+     *                  does nothing.
+     * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
+     * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
+     * @see #abort(Abortable, boolean)
+     */
+    public static void abort(@Nullable AsyncAbortable abortable,
+                             boolean swallowIOException)
+            throws IOException {
+        if (null == abortable) {
+            return;
+        }
+        try {
+            FutureUtils.result(abortable.asyncAbort());
+        } catch (Exception e) {
+            if (swallowIOException) {
+                log.warn("IOException thrown while aborting Abortable {} : ", abortable, e);
+            } else {
+                if (e instanceof IOException) {
+                    throw (IOException) e;
+                } else {
+                    throw new IOException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than
+     * propagating it.
+     *
+     * <p>While it's not safe in the general case to ignore exceptions that are thrown when aborting an
+     * I/O resource, it should generally be safe in the case of a resource that's being used only for
+     * reading.
+     *
+     * @param abortable the {@code Abortable} to be closed, or {@code null} in which case this method
+     *                  does nothing.
+     */
+    public static void abortQuietly(@Nullable Abortable abortable) {
+        try {
+            abort(abortable, true);
+        } catch (IOException e) {
+            log.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e);
+        }
+    }
+
+    /**
+     * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than
+     * propagating it.
+     *
+     * <p>While it's not safe in the general case to ignore exceptions that are thrown when aborting an
+     * I/O resource, it should generally be safe in the case of a resource that's being used only for
+     * reading.
+     *
+     * @param abortable the {@code AsyncAbortable} to be closed, or {@code null} in which case this method
+     *                  does nothing.
+     */
+    public static void abortQuietly(@Nullable AsyncAbortable abortable) {
+        try {
+            abort(abortable, true);
+        } catch (IOException e) {
+            log.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e);
+        }
+    }
+
+    /**
+     * Abort the abortables in sequence.
+     *
+     * @param executorService
+     *          executor service to execute
+     * @param abortables
+     *          abortables to abort
+     * @return future represents the abort future
+     */
+    public static CompletableFuture<Void> abortSequence(ExecutorService executorService,
+                                             AsyncAbortable... abortables) {
+        List<AsyncAbortable> abortableList = Lists.newArrayListWithExpectedSize(abortables.length);
+        for (AsyncAbortable abortable : abortables) {
+            if (null == abortable) {
+                abortableList.add(AsyncAbortable.NULL);
+            } else {
+                abortableList.add(abortable);
+            }
+        }
+        return FutureUtils.processList(
+            abortableList,
+            AsyncAbortable.ABORT_FUNC,
+            executorService
+        ).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
new file mode 100644
index 0000000..7636c57
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+
+/**
+ * An {@code Abortable} is a source or destination of data that can be aborted.
+ * The abort method is invoked to release resources that the object is holding
+ * (such as open files). The abort happens when the object is in an error state,
+ * which it couldn't be closed gracefully.
+ *
+ * @see AsyncCloseable
+ * @see Abortable
+ * @since 0.3.43
+ */
+public interface AsyncAbortable {
+
+    Function<AsyncAbortable, CompletableFuture<Void>> ABORT_FUNC = abortable -> abortable.asyncAbort();
+
+    AsyncAbortable NULL = () -> FutureUtils.Void();
+
+    /**
+     * Aborts the object and releases any resources associated with it.
+     * If the object is already aborted then invoking this method has no
+     * effect.
+     *
+     * @return future represents the abort result
+     */
+    CompletableFuture<Void> asyncAbort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
new file mode 100644
index 0000000..851f426
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+
+/**
+ * A {@code AsyncCloseable} is a source or destination of data that can be closed asynchronously.
+ * The close method is invoked to release resources that the object is
+ * holding (such as open files).
+ */
+public interface AsyncCloseable {
+
+    Function<AsyncCloseable, CompletableFuture<Void>> CLOSE_FUNC = closeable -> closeable.asyncClose();
+
+    Function<AsyncCloseable, CompletableFuture<Void>> CLOSE_FUNC_IGNORE_ERRORS =
+        closeable -> FutureUtils.ignore(closeable.asyncClose());
+
+    AsyncCloseable NULL = () -> FutureUtils.Void();
+
+    /**
+     * Closes this source and releases any system resources associated
+     * with it. If the source is already closed then invoking this
+     * method has no effect.
+     *
+     * @return future representing the close result.
+     */
+    CompletableFuture<Void> asyncClose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
new file mode 100644
index 0000000..f7c3e3b
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.io;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@code AsyncDeleteable} is a source or destination of data that can be deleted asynchronously.
+ * This delete method is invoked to delete the source.
+ */
+public interface AsyncDeleteable {
+    /**
+     * Releases any system resources associated with this and delete the source. If the source is
+     * already deleted then invoking this method has no effect.
+     *
+     * @return future representing the deletion result.
+     */
+    CompletableFuture<Void> delete();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java
new file mode 100644
index 0000000..c8e957f
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * IO Utils for distributedlog.
+ */
+package org.apache.distributedlog.io;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
new file mode 100644
index 0000000..89b448e
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.util;
+
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.MathUtil;
+
+/**
+ * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing
+ * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i>
+ * will be executed in order.
+ *
+ * <p>The scheduler is comprised of multiple {@link ScheduledExecutorService}s. Each
+ * {@link ScheduledExecutorService} is a single thread executor. Normal task submissions will
+ * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g
+ * {@link OrderedScheduler#submit(Object, Runnable)} will be submitted to a dedicated executor based on
+ * the hash value of submit <i>key</i>.
+ */
+public class OrderedScheduler implements ScheduledExecutorService {
+
+    /**
+     * Create a builder to build scheduler.
+     *
+     * @return scheduler builder
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link OrderedScheduler}.
+     */
+    public static class Builder {
+
+        private String name = "OrderedScheduler";
+        private int corePoolSize = -1;
+        private ThreadFactory threadFactory = null;
+
+        /**
+         * Set the name of this scheduler. It would be used as part of stats scope and thread name.
+         *
+         * @param name name of the scheduler.
+         * @return scheduler builder
+         */
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        /**
+         * Set the number of threads to be used in this scheduler.
+         *
+         * @param corePoolSize the number of threads to keep in the pool, even
+         *                     if they are idle
+         * @return scheduler builder
+         */
+        public Builder corePoolSize(int corePoolSize) {
+            this.corePoolSize = corePoolSize;
+            return this;
+        }
+
+        /**
+         * Set the thread factory that the scheduler uses to create a new thread.
+         *
+         * @param threadFactory the factory to use when the executor
+         *                      creates a new thread
+         * @return scheduler builder
+         */
+        public Builder threadFactory(ThreadFactory threadFactory) {
+            this.threadFactory = threadFactory;
+            return this;
+        }
+
+        /**
+         * Build the ordered scheduler.
+         *
+         * @return ordered scheduler
+         */
+        public OrderedScheduler build() {
+            if (corePoolSize <= 0) {
+                corePoolSize = Runtime.getRuntime().availableProcessors();
+            }
+            if (null == threadFactory) {
+                threadFactory = Executors.defaultThreadFactory();
+            }
+
+            return new OrderedScheduler(
+                name,
+                corePoolSize,
+                threadFactory);
+        }
+
+    }
+
+    protected final String name;
+    protected final int corePoolSize;
+    protected final ScheduledExecutorService[] executors;
+    protected final Random random;
+
+    private OrderedScheduler(String name,
+                             int corePoolSize,
+                             ThreadFactory threadFactory) {
+        this.name = name;
+        this.corePoolSize = corePoolSize;
+        this.executors = new ScheduledExecutorService[corePoolSize];
+        for (int i = 0; i < corePoolSize; i++) {
+            ThreadFactory tf = new ThreadFactoryBuilder()
+                .setNameFormat(name + "-scheduler-" + i + "-%d")
+                .setThreadFactory(threadFactory)
+                .build();
+            executors[i] = Executors.newSingleThreadScheduledExecutor(tf);
+        }
+        this.random = new Random(System.currentTimeMillis());
+    }
+
+    protected ScheduledExecutorService chooseExecutor() {
+        return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)];
+    }
+
+    public ScheduledExecutorService chooseExecutor(Object key) {
+        if (null == key) {
+            return chooseExecutor();
+        }
+        return corePoolSize == 1 ? executors[0] :
+            executors[MathUtil.signSafeMod(Objects.hashCode(key), corePoolSize)];
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return chooseExecutor().schedule(command, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return chooseExecutor().schedule(callable, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay, long period, TimeUnit unit) {
+        return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay, long delay, TimeUnit unit) {
+        return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void shutdown() {
+        for (ScheduledExecutorService executor : executors) {
+            executor.shutdown();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Runnable> shutdownNow() {
+        List<Runnable> runnables = new ArrayList<Runnable>();
+        for (ScheduledExecutorService executor : executors) {
+            runnables.addAll(executor.shutdownNow());
+        }
+        return runnables;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isShutdown() {
+        for (ScheduledExecutorService executor : executors) {
+            if (!executor.isShutdown()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isTerminated() {
+        for (ScheduledExecutorService executor : executors) {
+            if (!executor.isTerminated()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+        throws InterruptedException {
+        for (ScheduledExecutorService executor : executors) {
+            if (!executor.awaitTermination(timeout, unit)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return chooseExecutor().submit(task);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return chooseExecutor().submit(task, result);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Future<?> submit(Runnable task) {
+        return chooseExecutor().submit(task);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+        return chooseExecutor().invokeAll(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException {
+        return chooseExecutor().invokeAll(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+        return chooseExecutor().invokeAny(tasks);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return chooseExecutor().invokeAny(tasks, timeout, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Runnable command) {
+        chooseExecutor().execute(command);
+    }
+
+    // Ordered Functions
+
+    public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) {
+        return chooseExecutor(key).schedule(command, delay, unit);
+    }
+
+    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
+                                                  Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit) {
+        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    public Future<?> submit(Object key, Runnable command) {
+        return chooseExecutor(key).submit(command);
+    }
+
+    public <T> CompletableFuture<T> submit(Object key, Callable<T> callable) {
+        CompletableFuture<T> future = FutureUtils.createFuture();
+        chooseExecutor(key).submit(() -> {
+            try {
+                future.complete(callable.call());
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        });
+        return future;
+    }
+
+}



Mime
View raw message