bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [02/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace
Date Thu, 05 Jan 2017 00:51:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java
new file mode 100644
index 0000000..e56a22d
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.zk.ZKWatcherManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}.
+ * It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}.
+ *
+ * <h3>Metrics</h3>
+ * <ul>
+ * <li> zookeeper operation stats are exposed under scope <code>zk</code> by
+ * {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}
+ * <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by
+ * {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase}
+ * <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code>
+ * </ul>
+ */
+public class ZooKeeperClient {
+
+    public static interface Credentials {
+
+        Credentials NONE = new Credentials() {
+            @Override
+            public void authenticate(ZooKeeper zooKeeper) {
+                // noop
+            }
+        };
+
+        void authenticate(ZooKeeper zooKeeper);
+    }
+
+    public static class DigestCredentials implements Credentials {
+
+        String username;
+        String password;
+
+        public DigestCredentials(String username, String password) {
+            this.username = username;
+            this.password = password;
+        }
+
+        @Override
+        public void authenticate(ZooKeeper zooKeeper) {
+            zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8));
+        }
+    }
+
+    public interface ZooKeeperSessionExpireNotifier {
+        void notifySessionExpired();
+    }
+
+    /**
+     * Indicates an error connecting to a zookeeper cluster.
+     */
+    public static class ZooKeeperConnectionException extends IOException {
+        private static final long serialVersionUID = 6682391687004819361L;
+
+        public ZooKeeperConnectionException(String message) {
+            super(message);
+        }
+
+        public ZooKeeperConnectionException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName());
+
+    private final String name;
+    private final int sessionTimeoutMs;
+    private final int defaultConnectionTimeoutMs;
+    private final String zooKeeperServers;
+    // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+    // made from within long synchronized blocks.
+    private volatile ZooKeeper zooKeeper = null;
+    private final RetryPolicy retryPolicy;
+    private final StatsLogger statsLogger;
+    private final int retryThreadCount;
+    private final double requestRateLimit;
+    private final Credentials credentials;
+    private volatile boolean authenticated = false;
+    private Stopwatch disconnectedStopwatch = null;
+
+    private boolean closed = false;
+
+    final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+
+    // watcher manager to manage watchers
+    private final ZKWatcherManager watcherManager;
+
+    /**
+     * Creates an unconnected client that will lazily attempt to connect on the first call to
+     * {@link #get}.  All successful connections will be authenticated with the given
+     * {@code credentials}.
+     *
+     * @param sessionTimeoutMs
+     *          ZK session timeout in milliseconds
+     * @param connectionTimeoutMs
+     *          ZK connection timeout in milliseconds
+     * @param zooKeeperServers
+     *          the set of servers forming the ZK cluster
+     */
+    ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) {
+        this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0,
+             Credentials.NONE);
+    }
+
+    ZooKeeperClient(String name,
+                    int sessionTimeoutMs,
+                    int connectionTimeoutMs,
+                    String zooKeeperServers,
+                    RetryPolicy retryPolicy,
+                    StatsLogger statsLogger,
+                    int retryThreadCount,
+                    double requestRateLimit,
+                    Credentials credentials) {
+        this.name = name;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.zooKeeperServers = zooKeeperServers;
+        this.defaultConnectionTimeoutMs = connectionTimeoutMs;
+        this.retryPolicy = retryPolicy;
+        this.statsLogger = statsLogger;
+        this.retryThreadCount = retryThreadCount;
+        this.requestRateLimit = requestRateLimit;
+        this.credentials = credentials;
+        this.watcherManager = ZKWatcherManager.newBuilder()
+                .name(name)
+                .zkc(this)
+                .statsLogger(statsLogger.scope("watcher_manager"))
+                .build();
+    }
+
+    public List<ACL> getDefaultACL() {
+        if (Credentials.NONE == credentials) {
+            return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+        } else {
+            return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL;
+        }
+    }
+
+    public ZKWatcherManager getWatcherManager() {
+        return watcherManager;
+    }
+
+    /**
+     * Returns the current active ZK connection or establishes a new one if none has yet been
+     * established or a previous connection was disconnected or had its session time out.
+     *
+     * @return a connected ZooKeeper client
+     * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+     * @throws InterruptedException if interrupted while waiting for a connection to be established
+     * @throws TimeoutException if a connection could not be established within the configured
+     * session timeout
+     */
+    public synchronized ZooKeeper get()
+        throws ZooKeeperConnectionException, InterruptedException {
+
+        try {
+            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
+        } catch (IOException ioe) {
+            throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe);
+        }
+
+        // This indicates that the client was explictly closed
+        if (closed) {
+            throw new ZooKeeperConnectionException("Client " + name + " has already been closed");
+        }
+
+        // the underneath zookeeper is retryable zookeeper
+        if (zooKeeper != null && retryPolicy != null) {
+            if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
+                // the zookeeper client is connected
+                disconnectedStopwatch = null;
+            } else {
+                if (disconnectedStopwatch == null) {
+                    disconnectedStopwatch = Stopwatch.createStarted();
+                } else {
+                    long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS);
+                    if (disconnectedMs > defaultConnectionTimeoutMs) {
+                        closeInternal();
+                        authenticated = false;
+                    }
+                }
+            }
+        }
+
+        if (zooKeeper == null) {
+            zooKeeper = buildZooKeeper();
+            disconnectedStopwatch = null;
+        }
+
+        // In case authenticate throws an exception, the caller can try to recover the client by
+        // calling get again.
+        if (!authenticated) {
+            credentials.authenticate(zooKeeper);
+            authenticated = true;
+        }
+
+        return zooKeeper;
+    }
+
+    private ZooKeeper buildZooKeeper()
+        throws ZooKeeperConnectionException, InterruptedException {
+        Watcher watcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                    case None:
+                        switch (event.getState()) {
+                            case Expired:
+                                if (null == retryPolicy) {
+                                    LOG.info("ZooKeeper {}' session expired. Event: {}", name, event);
+                                    closeInternal();
+                                }
+                                authenticated = false;
+                                break;
+                            case Disconnected:
+                                if (null == retryPolicy) {
+                                    LOG.info("ZooKeeper {} is disconnected from zookeeper now," +
+                                            " but it is OK unless we received EXPIRED event.", name);
+                                }
+                                // Mark as not authenticated if expired or disconnected. In both cases
+                                // we lose any attached auth info. Relying on Expired/Disconnected is
+                                // sufficient since all Expired/Disconnected events are processed before
+                                // all SyncConnected events, and the underlying member is not updated until
+                                // SyncConnected is received.
+                                authenticated = false;
+                                break;
+                            default:
+                                break;
+                        }
+                }
+
+                try {
+                    for (Watcher watcher : watchers) {
+                        try {
+                            watcher.process(event);
+                        } catch (Throwable t) {
+                            LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t);
+                        }
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t);
+                }
+            }
+        };
+
+        Set<Watcher> watchers = new HashSet<Watcher>();
+        watchers.add(watcher);
+
+        ZooKeeper zk;
+        try {
+            RetryPolicy opRetryPolicy = null == retryPolicy ?
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy;
+            RetryPolicy connectRetryPolicy = null == retryPolicy ?
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) :
+                    new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
+            zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder()
+                    .connectString(zooKeeperServers)
+                    .sessionTimeoutMs(sessionTimeoutMs)
+                    .watchers(watchers)
+                    .operationRetryPolicy(opRetryPolicy)
+                    .connectRetryPolicy(connectRetryPolicy)
+                    .statsLogger(statsLogger)
+                    .retryThreadCount(retryThreadCount)
+                    .requestRateLimit(requestRateLimit)
+                    .build();
+        } catch (KeeperException e) {
+            throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
+        } catch (IOException e) {
+            throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
+        }
+        return zk;
+    }
+
+    /**
+     * Clients that need to re-establish state after session expiration can register an
+     * {@code onExpired} command to execute.
+     *
+     * @param onExpired the {@code Command} to register
+     * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+     *         removal.
+     */
+    public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) {
+        Watcher watcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+                    try {
+                        onExpired.notifySessionExpired();
+                    } catch (Exception exc) {
+                        // do nothing
+                    }
+                }
+            }
+        };
+        register(watcher);
+        return watcher;
+    }
+
+    /**
+     * Clients that need to register a top-level {@code Watcher} should do so using this method.  The
+     * registered {@code watcher} will remain registered across re-connects and session expiration
+     * events.
+     *
+     * @param watcher the {@code Watcher to register}
+     */
+    public void register(Watcher watcher) {
+        if (null != watcher) {
+            watchers.add(watcher);
+        }
+    }
+
+    /**
+     * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+     * registered.
+     *
+     * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+     * @return whether the given {@code Watcher} was found and removed from the active set
+     */
+    public boolean unregister(Watcher watcher) {
+        return null != watcher && watchers.remove(watcher);
+    }
+
+    /**
+     * Closes the current connection if any expiring the current ZooKeeper session.  Any subsequent
+     * calls to this method will no-op until the next successful {@link #get}.
+     */
+    public synchronized void closeInternal() {
+        if (zooKeeper != null) {
+            try {
+                LOG.info("Closing zookeeper client {}.", name);
+                zooKeeper.close();
+                LOG.info("Closed zookeeper client {}.", name);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e);
+            } finally {
+                zooKeeper = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the the underlying zookeeper instance.
+     * Subsequent attempts to {@link #get} will fail
+     */
+    public synchronized void close() {
+        if (closed) {
+            return;
+        }
+        LOG.info("Close zookeeper client {}.", name);
+        closeInternal();
+        // unregister gauges to prevent GC spiral
+        this.watcherManager.unregisterGauges();
+        closed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java
new file mode 100644
index 0000000..0c200ce
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.ZooKeeperClient.Credentials;
+import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+/**
+ * Builder to build zookeeper client.
+ */
+public class ZooKeeperClientBuilder {
+
+    static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClientBuilder.class);
+
+    /**
+     * Create a zookeeper client builder to build zookeeper clients.
+     *
+     * @return zookeeper client builder.
+     */
+    public static ZooKeeperClientBuilder newBuilder() {
+        return new ZooKeeperClientBuilder();
+    }
+
+    // name
+    private String name = "default";
+    // sessionTimeoutMs
+    private int sessionTimeoutMs = -1;
+    // conectionTimeoutMs
+    private int conectionTimeoutMs = -1;
+    // zkServers
+    private String zkServers = null;
+    // retry policy
+    private RetryPolicy retryPolicy = null;
+    // stats logger
+    private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+    // retry executor thread count
+    private int retryThreadCount = 1;
+    // zookeeper access requestRateLimit limit
+    private double requestRateLimit = 0;
+    // Did call the zkAclId setter on the builder, used to ensure the setter is set.
+    private boolean zkAclIdSet = false;
+    private String zkAclId;
+
+    // Cached ZooKeeper Client
+    private ZooKeeperClient cachedClient = null;
+
+    private ZooKeeperClientBuilder() {}
+
+    /**
+     * Set zookeeper client name
+     *
+     * @param name zookeeper client name
+     * @return zookeeper client builder
+     */
+    public synchronized ZooKeeperClientBuilder name(String name) {
+        this.name = name;
+        return this;
+    }
+
+    /**
+     * Set zookeeper session timeout in milliseconds.
+     *
+     * @param sessionTimeoutMs
+     *          session timeout in milliseconds.
+     * @return zookeeper client builder.
+     */
+    public synchronized ZooKeeperClientBuilder sessionTimeoutMs(int sessionTimeoutMs) {
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        if (this.conectionTimeoutMs <= 0) {
+            this.conectionTimeoutMs = 2 * sessionTimeoutMs;
+        }
+        return this;
+    }
+
+    public synchronized ZooKeeperClientBuilder retryThreadCount(int retryThreadCount) {
+        this.retryThreadCount = retryThreadCount;
+        return this;
+    }
+
+    public synchronized ZooKeeperClientBuilder requestRateLimit(double requestRateLimit) {
+        this.requestRateLimit = requestRateLimit;
+        return this;
+    }
+
+    /**
+     * Set zookeeper connection timeout in milliseconds
+     *
+     * @param connectionTimeoutMs
+     *          connection timeout ms.
+     * @return builder
+     */
+    public synchronized ZooKeeperClientBuilder connectionTimeoutMs(int connectionTimeoutMs) {
+        this.conectionTimeoutMs = connectionTimeoutMs;
+        return this;
+    }
+
+    /**
+     * Set ZooKeeper Connect String.
+     *
+     * @param zkServers
+     *          zookeeper servers to connect.
+     * @return builder
+     */
+    public synchronized ZooKeeperClientBuilder zkServers(String zkServers) {
+        this.zkServers = zkServers;
+        return this;
+    }
+
+    /**
+     * Set DistributedLog URI.
+     *
+     * @param uri
+     *          distributedlog uri.
+     * @return builder.
+     */
+    public synchronized ZooKeeperClientBuilder uri(URI uri) {
+        this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+        return this;
+    }
+
+    /**
+     * Build zookeeper client using existing <i>zkc</i> client.
+     *
+     * @param zkc
+     *          zookeeper client.
+     * @return builder
+     */
+    public synchronized ZooKeeperClientBuilder zkc(ZooKeeperClient zkc) {
+        this.cachedClient = zkc;
+        return this;
+    }
+
+    /**
+     * Build zookeeper client with given retry policy <i>retryPolicy</i>.
+     *
+     * @param retryPolicy
+     *          retry policy
+     * @return builder
+     */
+    public synchronized ZooKeeperClientBuilder retryPolicy(RetryPolicy retryPolicy) {
+        this.retryPolicy = retryPolicy;
+        return this;
+    }
+
+    /**
+     * Build zookeeper client with given stats logger <i>statsLogger</i>.
+     *
+     * @param statsLogger
+     *          stats logger to expose zookeeper stats
+     * @return builder
+     */
+    public synchronized ZooKeeperClientBuilder statsLogger(StatsLogger statsLogger) {
+        this.statsLogger = statsLogger;
+        return this;
+    }
+
+    /**
+     * * Build zookeeper client with given zk acl digest id <i>zkAclId</i>.
+     */
+    public synchronized ZooKeeperClientBuilder zkAclId(String zkAclId) {
+        this.zkAclIdSet = true;
+        this.zkAclId = zkAclId;
+        return this;
+    }
+
+    private void validateParameters() {
+        Preconditions.checkNotNull(zkServers, "No zk servers provided.");
+        Preconditions.checkArgument(conectionTimeoutMs > 0,
+                "Invalid connection timeout : %d", conectionTimeoutMs);
+        Preconditions.checkArgument(sessionTimeoutMs > 0,
+                "Invalid session timeout : %d", sessionTimeoutMs);
+        Preconditions.checkNotNull(statsLogger, "No stats logger provided.");
+        Preconditions.checkArgument(zkAclIdSet, "Zookeeper acl id not set.");
+    }
+
+    /**
+     * Build a zookeeper client.
+     *
+     * @return zookeeper client.
+     */
+    public synchronized ZooKeeperClient build() {
+        if (null == cachedClient) {
+            cachedClient = buildClient();
+        }
+        return cachedClient;
+    }
+
+    private ZooKeeperClient buildClient() {
+        validateParameters();
+
+        Credentials credentials = Credentials.NONE;
+        if (null != zkAclId) {
+            credentials = new DigestCredentials(zkAclId, zkAclId);
+        }
+
+        return new ZooKeeperClient(
+                name,
+                sessionTimeoutMs,
+                conectionTimeoutMs,
+                zkServers,
+                retryPolicy,
+                statsLogger,
+                retryThreadCount,
+                requestRateLimit,
+                credentials
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java
new file mode 100644
index 0000000..2c3e738
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.acl;
+
+/**
+ * Access Control on stream operations
+ */
+public interface AccessControlManager {
+
+    /**
+     * Whether allowing writing to a stream.
+     *
+     * @param stream
+     *          Stream to write
+     * @return true if allowing writing to the given stream, otherwise false.
+     */
+    boolean allowWrite(String stream);
+
+    /**
+     * Whether allowing truncating a given stream.
+     *
+     * @param stream
+     *          Stream to truncate
+     * @return true if allowing truncating a given stream.
+     */
+    boolean allowTruncate(String stream);
+
+    /**
+     * Whether allowing deleting a given stream.
+     *
+     * @param stream
+     *          Stream to delete
+     * @return true if allowing deleting a given stream.
+     */
+    boolean allowDelete(String stream);
+
+    /**
+     * Whether allowing proxies to acquire a given stream.
+     *
+     * @param stream
+     *          stream to acquire
+     * @return true if allowing proxies to acquire the given stream.
+     */
+    boolean allowAcquire(String stream);
+
+    /**
+     * Whether allowing proxies to release ownership for a given stream.
+     *
+     * @param stream
+     *          stream to release
+     * @return true if allowing proxies to release a given stream.
+     */
+    boolean allowRelease(String stream);
+
+    /**
+     * Close the access control manager.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java
new file mode 100644
index 0000000..bf3352a
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.acl;
+
+public class DefaultAccessControlManager implements AccessControlManager {
+
+    public static final DefaultAccessControlManager INSTANCE = new DefaultAccessControlManager();
+
+    private DefaultAccessControlManager() {
+    }
+
+    @Override
+    public boolean allowWrite(String stream) {
+        return true;
+    }
+
+    @Override
+    public boolean allowTruncate(String stream) {
+        return true;
+    }
+
+    @Override
+    public boolean allowDelete(String stream) {
+        return true;
+    }
+
+    @Override
+    public boolean allowAcquire(String stream) {
+        return true;
+    }
+
+    @Override
+    public boolean allowRelease(String stream) {
+        return true;
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java
new file mode 100644
index 0000000..4218bfc
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Access Control for distributedlog streams.
+ */
+package org.apache.distributedlog.acl;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
new file mode 100644
index 0000000..4e94984
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java
@@ -0,0 +1,921 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.admin;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.ReadUtils;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.distributedlog.impl.acl.ZKAccessControl;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
+import org.apache.distributedlog.metadata.MetadataUpdater;
+import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.thrift.AccessControlEntry;
+import org.apache.distributedlog.tools.DistributedLogTool;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Admin Tool for DistributedLog.
+ */
+public class DistributedLogAdmin extends DistributedLogTool {
+
+    static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
+
+    /**
+     * Fix inprogress segment with lower ledger sequence number.
+     *
+     * @param namespace
+     *          dl namespace
+     * @param metadataUpdater
+     *          metadata updater.
+     * @param streamName
+     *          stream name.
+     * @param verbose
+     *          print verbose messages.
+     * @param interactive
+     *          is confirmation needed before executing actual action.
+     * @throws IOException
+     */
+    public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace,
+                                                                   final MetadataUpdater metadataUpdater,
+                                                                   final String streamName,
+                                                                   final boolean verbose,
+                                                                   final boolean interactive) throws IOException {
+        DistributedLogManager dlm = namespace.openLog(streamName);
+        try {
+            List<LogSegmentMetadata> segments = dlm.getLogSegments();
+            if (verbose) {
+                System.out.println("LogSegments for " + streamName + " : ");
+                for (LogSegmentMetadata segment : segments) {
+                    System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment);
+                }
+            }
+            LOG.info("Get log segments for {} : {}", streamName, segments);
+            // validate log segments
+            long maxCompletedLogSegmentSequenceNumber = -1L;
+            LogSegmentMetadata inprogressSegment = null;
+            for (LogSegmentMetadata segment : segments) {
+                if (!segment.isInProgress()) {
+                    maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber());
+                } else {
+                    // we already found an inprogress segment
+                    if (null != inprogressSegment) {
+                        throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments);
+                    }
+                    inprogressSegment = segment;
+                }
+            }
+            if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) {
+                // nothing to fix
+                return;
+            }
+            final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1;
+            if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
+                return;
+            }
+            final LogSegmentMetadata newSegment =
+                    FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber));
+            LOG.info("Fixed {} : {} -> {} ",
+                     new Object[] { streamName, inprogressSegment, newSegment });
+            if (verbose) {
+                System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName()
+                                   + " -> " + newSegment.getZNodeName());
+                System.out.println("\t old: " + inprogressSegment);
+                System.out.println("\t new: " + newSegment);
+                System.out.println();
+            }
+        } finally {
+            dlm.close();
+        }
+    }
+
+    private static class LogSegmentCandidate {
+        final LogSegmentMetadata metadata;
+        final LogRecordWithDLSN lastRecord;
+
+        LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) {
+            this.metadata = metadata;
+            this.lastRecord = lastRecord;
+        }
+
+        @Override
+        public String toString() {
+            return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]";
+        }
+
+    }
+
+    private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR =
+            new Comparator<LogSegmentCandidate>() {
+                @Override
+                public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) {
+                    return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata);
+                }
+            };
+
+    private static class StreamCandidate {
+
+        final String streamName;
+        final SortedSet<LogSegmentCandidate> segmentCandidates =
+                new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR);
+
+        StreamCandidate(String streamName) {
+            this.streamName = streamName;
+        }
+
+        synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) {
+            segmentCandidates.add(segmentCandidate);
+        }
+
+        @Override
+        public String toString() {
+            return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]";
+        }
+    }
+
+    public static void checkAndRepairDLNamespace(final URI uri,
+                                                 final DistributedLogNamespace namespace,
+                                                 final MetadataUpdater metadataUpdater,
+                                                 final OrderedScheduler scheduler,
+                                                 final boolean verbose,
+                                                 final boolean interactive) throws IOException {
+        checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
+    }
+
+    public static void checkAndRepairDLNamespace(final URI uri,
+                                                 final DistributedLogNamespace namespace,
+                                                 final MetadataUpdater metadataUpdater,
+                                                 final OrderedScheduler scheduler,
+                                                 final boolean verbose,
+                                                 final boolean interactive,
+                                                 final int concurrency) throws IOException {
+        Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
+        // 0. getting streams under a given uri.
+        Iterator<String> streamsIter = namespace.getLogs();
+        List<String> streams = Lists.newArrayList();
+        while (streamsIter.hasNext()) {
+            streams.add(streamsIter.next());
+        }
+        if (verbose) {
+            System.out.println("- 0. checking streams under " + uri);
+        }
+        if (streams.size() == 0) {
+            System.out.println("+ 0. nothing to check. quit.");
+            return;
+        }
+        Map<String, StreamCandidate> streamCandidates =
+                checkStreams(namespace, streams, scheduler, concurrency);
+        if (verbose) {
+            System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
+        }
+        if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) {
+            return;
+        }
+        if (verbose) {
+            System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams.");
+        }
+        for (StreamCandidate candidate : streamCandidates.values()) {
+            if (!repairStream(metadataUpdater, candidate, verbose, interactive)) {
+                if (verbose) {
+                    System.out.println("* 1. aborted repairing corrupted streams.");
+                }
+                return;
+            }
+        }
+        if (verbose) {
+            System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams.");
+        }
+    }
+
+    private static Map<String, StreamCandidate> checkStreams(
+            final DistributedLogNamespace namespace,
+            final Collection<String> streams,
+            final OrderedScheduler scheduler,
+            final int concurrency) throws IOException {
+        final LinkedBlockingQueue<String> streamQueue =
+                new LinkedBlockingQueue<String>();
+        streamQueue.addAll(streams);
+        final Map<String, StreamCandidate> candidateMap =
+                new ConcurrentSkipListMap<String, StreamCandidate>();
+        final AtomicInteger numPendingStreams = new AtomicInteger(streams.size());
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+        Runnable checkRunnable = new Runnable() {
+            @Override
+            public void run() {
+                while (!streamQueue.isEmpty()) {
+                    String stream;
+                    try {
+                        stream = streamQueue.take();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                    StreamCandidate candidate;
+                    try {
+                        LOG.info("Checking stream {}.", stream);
+                        candidate = checkStream(namespace, stream, scheduler);
+                        LOG.info("Checked stream {} - {}.", stream, candidate);
+                    } catch (IOException e) {
+                        LOG.error("Error on checking stream {} : ", stream, e);
+                        doneLatch.countDown();
+                        break;
+                    }
+                    if (null != candidate) {
+                        candidateMap.put(stream, candidate);
+                    }
+                    if (numPendingStreams.decrementAndGet() == 0) {
+                        doneLatch.countDown();
+                    }
+                }
+            }
+        };
+        Thread[] threads = new Thread[concurrency];
+        for (int i = 0; i < concurrency; i++) {
+            threads[i] = new Thread(checkRunnable, "check-thread-" + i);
+            threads[i].start();
+        }
+        try {
+            doneLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        if (numPendingStreams.get() != 0) {
+            throw new IOException(numPendingStreams.get() + " streams left w/o checked");
+        }
+        for (int i = 0; i < concurrency; i++) {
+            threads[i].interrupt();
+            try {
+                threads[i].join();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        return candidateMap;
+    }
+
+    private static StreamCandidate checkStream(
+            final DistributedLogNamespace namespace,
+            final String streamName,
+            final OrderedScheduler scheduler) throws IOException {
+        DistributedLogManager dlm = namespace.openLog(streamName);
+        try {
+            List<LogSegmentMetadata> segments = dlm.getLogSegments();
+            if (segments.isEmpty()) {
+                return null;
+            }
+            List<Future<LogSegmentCandidate>> futures =
+                    new ArrayList<Future<LogSegmentCandidate>>(segments.size());
+            for (LogSegmentMetadata segment : segments) {
+                futures.add(checkLogSegment(namespace, streamName, segment, scheduler));
+            }
+            List<LogSegmentCandidate> segmentCandidates;
+            try {
+                segmentCandidates = Await.result(Future.collect(futures));
+            } catch (Exception e) {
+                throw new IOException("Failed on checking stream " + streamName, e);
+            }
+            StreamCandidate streamCandidate = new StreamCandidate(streamName);
+            for (LogSegmentCandidate segmentCandidate: segmentCandidates) {
+                if (null != segmentCandidate) {
+                    streamCandidate.addLogSegmentCandidate(segmentCandidate);
+                }
+            }
+            if (streamCandidate.segmentCandidates.isEmpty()) {
+                return null;
+            }
+            return streamCandidate;
+        } finally {
+            dlm.close();
+        }
+    }
+
+    private static Future<LogSegmentCandidate> checkLogSegment(
+            final DistributedLogNamespace namespace,
+            final String streamName,
+            final LogSegmentMetadata metadata,
+            final OrderedScheduler scheduler) {
+        if (metadata.isInProgress()) {
+            return Future.value(null);
+        }
+
+        final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver()
+                .getLogSegmentEntryStore(NamespaceDriver.Role.READER);
+        return ReadUtils.asyncReadLastRecord(
+                streamName,
+                metadata,
+                true,
+                false,
+                true,
+                4,
+                16,
+                new AtomicInteger(0),
+                scheduler,
+                entryStore
+        ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() {
+            @Override
+            public LogSegmentCandidate apply(LogRecordWithDLSN record) {
+                if (null != record &&
+                    (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0 ||
+                     record.getTransactionId() > metadata.getLastTxId() ||
+                     !metadata.isRecordPositionWithinSegmentScope(record))) {
+                    return new LogSegmentCandidate(metadata, record);
+                } else {
+                    return null;
+                }
+            }
+        });
+    }
+
+    private static boolean repairStream(MetadataUpdater metadataUpdater,
+                                        StreamCandidate streamCandidate,
+                                        boolean verbose,
+                                        boolean interactive) throws IOException {
+        if (verbose) {
+            System.out.println("Stream " + streamCandidate.streamName + " : ");
+            for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
+                System.out.println("  " + segmentCandidate.metadata.getLogSegmentSequenceNumber()
+                        + " : metadata = " + segmentCandidate.metadata + ", last dlsn = "
+                        + segmentCandidate.lastRecord.getDlsn());
+            }
+            System.out.println("-------------------------------------------");
+        }
+        if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) {
+            return false;
+        }
+        for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
+            LogSegmentMetadata newMetadata = FutureUtils.result(
+                    metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord));
+            if (verbose) {
+                System.out.println("  Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : ");
+                System.out.println("    old metadata : " + segmentCandidate.metadata);
+                System.out.println("    new metadata : " + newMetadata);
+            }
+        }
+        if (verbose) {
+            System.out.println("-------------------------------------------");
+        }
+        return true;
+    }
+
+    //
+    // Commands
+    //
+
+    /**
+     * Unbind the bookkeeper environment for a given distributedlog uri.
+     *
+     * TODO: move unbind operation to namespace driver
+     */
+    class UnbindCommand extends OptsCommand {
+
+        Options options = new Options();
+
+        UnbindCommand() {
+            super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
+            options.addOption("f", "force", false, "Force unbinding without prompt.");
+        }
+
+        @Override
+        protected Options getOptions() {
+            return options;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "unbind [options] <distributedlog uri>";
+        }
+
+        @Override
+        protected int runCmd(CommandLine cmdline) throws Exception {
+            String[] args = cmdline.getArgs();
+            if (args.length <= 0) {
+                System.err.println("No distributedlog uri specified.");
+                printUsage();
+                return -1;
+            }
+            boolean force = cmdline.hasOption("f");
+            URI uri = URI.create(args[0]);
+            // resolving the uri to see if there is another bindings in this uri.
+            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri)
+                    .sessionTimeoutMs(10000).build();
+            BKDLConfig bkdlConfig;
+            try {
+                bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
+            } catch (IOException ie) {
+                bkdlConfig = null;
+            }
+            if (null == bkdlConfig) {
+                System.out.println("No bookkeeper is bound to " + uri);
+                return 0;
+            } else {
+                System.out.println("There is bookkeeper bound to " + uri + " : ");
+                System.out.println("");
+                System.out.println(bkdlConfig.toString());
+                System.out.println("");
+                if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) {
+                    return 0;
+                }
+            }
+            DLMetadata.unbind(uri);
+            System.out.println("Unbound on " + uri + ".");
+            return 0;
+        }
+    }
+
+    /**
+     * Bind Command to bind bookkeeper environment for a given distributed uri.
+     *
+     * TODO: move bind to namespace driver
+     */
+    class BindCommand extends OptsCommand {
+
+        Options options = new Options();
+
+        BindCommand() {
+            super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
+            options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
+            options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
+            options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers.");
+            options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers.");
+            options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers.");
+            options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
+            options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
+            options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade");
+            options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
+            options.addOption("f", "force", false, "Force binding without prompt.");
+            options.addOption("c", "creation", false, "Whether is it a creation binding.");
+            options.addOption("q", "query", false, "Query the bookkeeper bindings");
+        }
+
+        @Override
+        protected Options getOptions() {
+            return options;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "bind [options] <distributedlog uri>";
+        }
+
+        @Override
+        protected int runCmd(CommandLine cmdline) throws Exception {
+            boolean isQuery = cmdline.hasOption("q");
+            if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) {
+                System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
+                printUsage();
+                return -1;
+            }
+            String[] args = cmdline.getArgs();
+            if (args.length <= 0) {
+                System.err.println("No distributedlog uri specified.");
+                printUsage();
+                return -1;
+            }
+            boolean force = cmdline.hasOption("f");
+            boolean creation = cmdline.hasOption("c");
+            String bkLedgersPath = cmdline.getOptionValue("l");
+            String bkZkServersForWriter = cmdline.getOptionValue("s");
+            boolean sanityCheckTxnID =
+                    !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
+            boolean encodeRegionID =
+                    cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));
+
+            String bkZkServersForReader;
+            if (cmdline.hasOption("bkzr")) {
+                bkZkServersForReader = cmdline.getOptionValue("bkzr");
+            } else {
+                bkZkServersForReader = bkZkServersForWriter;
+            }
+
+            URI uri = URI.create(args[0]);
+
+            String dlZkServersForWriter;
+            String dlZkServersForReader;
+            if (cmdline.hasOption("dlzw")) {
+                dlZkServersForWriter = cmdline.getOptionValue("dlzw");
+            } else {
+                dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri);
+            }
+            if (cmdline.hasOption("dlzr")) {
+                dlZkServersForReader = cmdline.getOptionValue("dlzr");
+            } else {
+                dlZkServersForReader = dlZkServersForWriter;
+            }
+
+            // resolving the uri to see if there is another bindings in this uri.
+            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null)
+                    .sessionTimeoutMs(10000).build();
+            try {
+                BKDLConfig newBKDLConfig =
+                        new BKDLConfig(dlZkServersForWriter, dlZkServersForReader,
+                                       bkZkServersForWriter, bkZkServersForReader, bkLedgersPath)
+                                .setSanityCheckTxnID(sanityCheckTxnID)
+                                .setEncodeRegionID(encodeRegionID);
+
+                if (cmdline.hasOption("seqno")) {
+                    newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno")));
+                }
+
+                if (cmdline.hasOption("fns")) {
+                    newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
+                }
+
+                BKDLConfig bkdlConfig;
+                try {
+                    bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
+                } catch (IOException ie) {
+                    bkdlConfig = null;
+                }
+                if (null == bkdlConfig) {
+                    System.out.println("No bookkeeper is bound to " + uri);
+                } else {
+                    System.out.println("There is bookkeeper bound to " + uri + " : ");
+                    System.out.println("");
+                    System.out.println(bkdlConfig.toString());
+                    System.out.println("");
+                    if (!isQuery) {
+                        if (newBKDLConfig.equals(bkdlConfig)) {
+                            System.out.println("No bookkeeper binding needs to be updated. Quit.");
+                            return 0;
+                        } else if(!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
+                            System.out.println("You can't turn a federated namespace back to non-federated.");
+                            return 0;
+                        } else {
+                            if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri
+                                        + " with new bookkeeper instance :\n" + newBKDLConfig)) {
+                                return 0;
+                            }
+                        }
+                    }
+                }
+                if (isQuery) {
+                    System.out.println("Done.");
+                    return 0;
+                }
+                DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
+                if (creation) {
+                    try {
+                        dlMetadata.create(uri);
+                        System.out.println("Created binding on " + uri + ".");
+                    } catch (IOException ie) {
+                        System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
+                    }
+                } else {
+                    try {
+                        dlMetadata.update(uri);
+                        System.out.println("Updated binding on " + uri + " : ");
+                        System.out.println("");
+                        System.out.println(newBKDLConfig.toString());
+                        System.out.println("");
+                    } catch (IOException ie) {
+                        System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
+                    }
+                }
+                if (newBKDLConfig.isFederatedNamespace()) {
+                    try {
+                        FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
+                    } catch (KeeperException.NodeExistsException nee) {
+                        // ignore node exists exception
+                    }
+                }
+                return 0;
+            } finally {
+                zkc.close();
+            }
+        }
+    }
+
+    static class RepairSeqNoCommand extends PerDLCommand {
+
+        boolean dryrun = false;
+        boolean verbose = false;
+        final List<String> streams = new ArrayList<String>();
+
+        RepairSeqNoCommand() {
+            super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
+            options.addOption("d", "dryrun", false, "Dry run without repairing");
+            options.addOption("l", "list", true, "List of streams to repair, separated by comma");
+            options.addOption("v", "verbose", false, "Print verbose messages");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            dryrun = cmdline.hasOption("d");
+            verbose = cmdline.hasOption("v");
+            force = !dryrun && cmdline.hasOption("f");
+            if (!cmdline.hasOption("l")) {
+                throw new ParseException("No streams provided to repair");
+            }
+            String streamsList = cmdline.getOptionValue("l");
+            Collections.addAll(streams, streamsList.split(","));
+        }
+
+        @Override
+        protected int runCmd() throws Exception {
+            MetadataUpdater metadataUpdater = dryrun ?
+                    new DryrunLogSegmentMetadataStoreUpdater(getConf(),
+                            getLogSegmentMetadataStore()) :
+                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
+                            getLogSegmentMetadataStore());
+            System.out.println("List of streams : ");
+            System.out.println(streams);
+            if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
+                return -1;
+            }
+            for (String stream : streams) {
+                fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce());
+            }
+            return 0;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "repairseqno [options]";
+        }
+    }
+
+    static class DLCKCommand extends PerDLCommand {
+
+        boolean dryrun = false;
+        boolean verbose = false;
+        int concurrency = 1;
+
+        DLCKCommand() {
+            super("dlck", "Check and repair a distributedlog namespace");
+            options.addOption("d", "dryrun", false, "Dry run without repairing");
+            options.addOption("v", "verbose", false, "Print verbose messages");
+            options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            dryrun = cmdline.hasOption("d");
+            verbose = cmdline.hasOption("v");
+            if (cmdline.hasOption("cy")) {
+                try {
+                    concurrency = Integer.parseInt(cmdline.getOptionValue("cy"));
+                } catch (NumberFormatException nfe) {
+                    throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy"));
+                }
+            }
+        }
+
+        @Override
+        protected int runCmd() throws Exception {
+            MetadataUpdater metadataUpdater = dryrun ?
+                    new DryrunLogSegmentMetadataStoreUpdater(getConf(),
+                            getLogSegmentMetadataStore()) :
+                    LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(),
+                            getLogSegmentMetadataStore());
+            OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                    .name("dlck-scheduler")
+                    .corePoolSize(Runtime.getRuntime().availableProcessors())
+                    .build();
+            ExecutorService executorService = Executors.newCachedThreadPool();
+            try {
+                checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler,
+                                          verbose, !getForce(), concurrency);
+            } finally {
+                SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES);
+            }
+            return 0;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "dlck [options]";
+        }
+    }
+
+    static class DeleteStreamACLCommand extends PerDLCommand {
+
+        String stream = null;
+
+        DeleteStreamACLCommand() {
+            super("delete_stream_acl", "Delete ACL for a given stream");
+            options.addOption("s", "stream", true, "Stream to set ACL");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("s")) {
+                throw new ParseException("No stream to set ACL");
+            }
+            stream = cmdline.getOptionValue("s");
+        }
+
+        @Override
+        protected int runCmd() throws Exception {
+            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
+            if (null == bkdlConfig.getACLRootPath()) {
+                // acl isn't enabled for this namespace.
+                System.err.println("ACL isn't enabled for namespace " + getUri());
+                return -1;
+            }
+            String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream;
+            ZKAccessControl.delete(getZooKeeperClient(), zkPath);
+            return 0;
+        }
+
+        @Override
+        protected String getUsage() {
+            return null;
+        }
+    }
+
+    static class SetStreamACLCommand extends SetACLCommand {
+
+        String stream = null;
+
+        SetStreamACLCommand() {
+            super("set_stream_acl", "Set Default ACL for a given stream");
+            options.addOption("s", "stream", true, "Stream to set ACL");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("s")) {
+                throw new ParseException("No stream to set ACL");
+            }
+            stream = cmdline.getOptionValue("s");
+        }
+
+        @Override
+        protected String getZKPath(String zkRootPath) {
+            return zkRootPath + "/" + stream;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "set_stream_acl [options]";
+        }
+    }
+
+    static class SetDefaultACLCommand extends SetACLCommand {
+
+        SetDefaultACLCommand() {
+            super("set_default_acl", "Set Default ACL for a namespace");
+        }
+
+        @Override
+        protected String getZKPath(String zkRootPath) {
+            return zkRootPath;
+        }
+
+        @Override
+        protected String getUsage() {
+            return "set_default_acl [options]";
+        }
+    }
+
+    static abstract class SetACLCommand extends PerDLCommand {
+
+        boolean denyWrite = false;
+        boolean denyTruncate = false;
+        boolean denyDelete = false;
+        boolean denyAcquire = false;
+        boolean denyRelease = false;
+
+        protected SetACLCommand(String name, String description) {
+            super(name, description);
+            options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
+            options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
+            options.addOption("dd", "deny-delete", false, "Deny delete requests");
+            options.addOption("da", "deny-acquire", false, "Deny acquire requests");
+            options.addOption("dr", "deny-release", false, "Deny release requests");
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            denyWrite = cmdline.hasOption("dw");
+            denyTruncate = cmdline.hasOption("dt");
+            denyDelete = cmdline.hasOption("dd");
+            denyAcquire = cmdline.hasOption("da");
+            denyRelease = cmdline.hasOption("dr");
+        }
+
+        protected abstract String getZKPath(String zkRootPath);
+
+        protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
+            ZKAccessControl accessControl;
+            try {
+                accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+            } catch (KeeperException.NoNodeException nne) {
+                accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
+            }
+            return accessControl;
+        }
+
+        protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception {
+            String zkPath = accessControl.getZKPath();
+            if (null == zkc.get().exists(zkPath, false)) {
+                accessControl.create(zkc);
+            } else {
+                accessControl.update(zkc);
+            }
+        }
+
+        @Override
+        protected int runCmd() throws Exception {
+            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
+            if (null == bkdlConfig.getACLRootPath()) {
+                // acl isn't enabled for this namespace.
+                System.err.println("ACL isn't enabled for namespace " + getUri());
+                return -1;
+            }
+            String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath());
+            ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath);
+            AccessControlEntry acl = accessControl.getAccessControlEntry();
+            acl.setDenyWrite(denyWrite);
+            acl.setDenyTruncate(denyTruncate);
+            acl.setDenyDelete(denyDelete);
+            acl.setDenyAcquire(denyAcquire);
+            acl.setDenyRelease(denyRelease);
+            setZKAccessControl(getZooKeeperClient(), accessControl);
+            return 0;
+        }
+
+    }
+
+    public DistributedLogAdmin() {
+        super();
+        commands.clear();
+        addCommand(new HelpCommand());
+        addCommand(new BindCommand());
+        addCommand(new UnbindCommand());
+        addCommand(new RepairSeqNoCommand());
+        addCommand(new DLCKCommand());
+        addCommand(new SetDefaultACLCommand());
+        addCommand(new SetStreamACLCommand());
+        addCommand(new DeleteStreamACLCommand());
+    }
+
+    @Override
+    protected String getName() {
+        return "dlog_admin";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java
new file mode 100644
index 0000000..d708111
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Admin Tools for DistributedLog
+ */
+package org.apache.distributedlog.admin;


Mime
View raw message