bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [04/31] incubator-distributedlog git commit: DL-117: Stream metadata store
Date Fri, 30 Dec 2016 00:07:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
new file mode 100644
index 0000000..d89dddb
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
+import com.twitter.distributedlog.exceptions.LockCancelledException;
+import com.twitter.distributedlog.exceptions.LogExistsException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.lock.SessionLockFactory;
+import com.twitter.distributedlog.lock.ZKDistributedLock;
+import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.distributedlog.zk.LimitedPermitManager;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.distributedlog.zk.ZKTransaction;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+
+/**
+ * zookeeper based {@link LogStreamMetadataStore}
+ */
+public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
+
+    private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
+
+    private final String clientId;
+    private final DistributedLogConfiguration conf;
+    private final ZooKeeperClient zooKeeperClient;
+    private final OrderedScheduler scheduler;
+    private final StatsLogger statsLogger;
+    private final LogSegmentMetadataStore logSegmentStore;
+    private final LimitedPermitManager permitManager;
+    // lock
+    private SessionLockFactory lockFactory;
+    private OrderedScheduler lockStateExecutor;
+
+    public ZKLogStreamMetadataStore(String clientId,
+                                    DistributedLogConfiguration conf,
+                                    ZooKeeperClient zkc,
+                                    OrderedScheduler scheduler,
+                                    StatsLogger statsLogger) {
+        this.clientId = clientId;
+        this.conf = conf;
+        this.zooKeeperClient = zkc;
+        this.scheduler = scheduler;
+        this.statsLogger = statsLogger;
+        // create the log segment metadata store and the permit manager (used for log segment rolling)
+        this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
+        this.permitManager = new LimitedPermitManager(
+                conf.getLogSegmentRollingConcurrency(),
+                1,
+                TimeUnit.MINUTES,
+                scheduler);
+        this.zooKeeperClient.register(permitManager);
+    }
+
+    private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
+        if (createIfNull && null == lockStateExecutor) {
+            StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
+            lockStateExecutor = OrderedScheduler.newBuilder()
+                    .name("DLM-LockState")
+                    .corePoolSize(conf.getNumLockStateThreads())
+                    .statsLogger(lockStateStatsLogger)
+                    .perExecutorStatsLogger(lockStateStatsLogger)
+                    .traceTaskExecution(conf.getEnableTaskExecutionStats())
+                    .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
+                    .build();
+        }
+        return lockStateExecutor;
+    }
+
+    private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
+        if (createIfNull && null == lockFactory) {
+            lockFactory = new ZKSessionLockFactory(
+                    zooKeeperClient,
+                    clientId,
+                    getLockStateExecutor(createIfNull),
+                    conf.getZKNumRetries(),
+                    conf.getLockTimeoutMilliSeconds(),
+                    conf.getZKRetryBackoffStartMillis(),
+                    statsLogger);
+        }
+        return lockFactory;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.zooKeeperClient.unregister(permitManager);
+        this.permitManager.close();
+        this.logSegmentStore.close();
+        SchedulerUtils.shutdownScheduler(
+                getLockStateExecutor(false),
+                conf.getSchedulerShutdownTimeoutMs(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public LogSegmentMetadataStore getLogSegmentMetadataStore() {
+        return logSegmentStore;
+    }
+
+    @Override
+    public PermitManager getPermitManager() {
+        return this.permitManager;
+    }
+
+    @Override
+    public Transaction<Object> newTransaction() {
+        return new ZKTransaction(zooKeeperClient);
+    }
+
+    @Override
+    public Future<Void> logExists(URI uri, final String logName) {
+        final String logSegmentsPath = ZKLogMetadata.getLogSegmentsPath(
+                uri, logName, conf.getUnpartitionedStreamName());
+        final Promise<Void> promise = new Promise<Void>();
+        try {
+            final ZooKeeper zk = zooKeeperClient.get();
+            zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int syncRc, String path, Object syncCtx) {
+                    if (KeeperException.Code.NONODE.intValue() == syncRc) {
+                        promise.setException(new LogNotFoundException(
+                                String.format("Log %s does not exist or has been deleted", logName)));
+                        return;
+                    } else if (KeeperException.Code.OK.intValue() != syncRc){
+                        promise.setException(new ZKException("Error on checking log existence for " + logName,
+                                KeeperException.create(KeeperException.Code.get(syncRc))));
+                        return;
+                    }
+                    zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx, Stat stat) {
+                            if (KeeperException.Code.OK.intValue() == rc) {
+                                promise.setValue(null);
+                            } else if (KeeperException.Code.NONODE.intValue() == rc) {
+                                promise.setException(new LogNotFoundException(
+                                        String.format("Log %s does not exist or has been deleted", logName)));
+                            } else {
+                                promise.setException(new ZKException("Error on checking log existence for " + logName,
+                                        KeeperException.create(KeeperException.Code.get(rc))));
+                            }
+                        }
+                    }, null);
+                }
+            }, null);
+
+        } catch (InterruptedException ie) {
+            LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
+            promise.setException(new DLInterruptedException("Interrupted while checking "
+                    + logSegmentsPath, ie));
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    //
+    // Create Write Lock
+    //
+
+    @Override
+    public DistributedLock createWriteLock(ZKLogMetadataForWriter metadata) {
+        return new ZKDistributedLock(
+                getLockStateExecutor(true),
+                getLockFactory(true),
+                metadata.getLockPath(),
+                conf.getLockTimeoutMilliSeconds(),
+                statsLogger);
+    }
+
+    //
+    // Create Read Lock
+    //
+
+    private Future<Void> ensureReadLockPathExist(final ZKLogMetadata logMetadata,
+                                                 final String readLockPath) {
+        final Promise<Void> promise = new Promise<Void>();
+        promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                FutureUtils.setException(promise, new LockCancelledException(readLockPath,
+                        "Could not ensure read lock path", t));
+                return null;
+            }
+        });
+        Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
+        Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
+                new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
+                new org.apache.zookeeper.AsyncCallback.StringCallback() {
+                    @Override
+                    public void processResult(final int rc, final String path, Object ctx, String name) {
+                        if (KeeperException.Code.NONODE.intValue() == rc) {
+                            FutureUtils.setException(promise, new LogNotFoundException(
+                                    String.format("Log %s does not exist or has been deleted",
+                                            logMetadata.getFullyQualifiedName())));
+                        } else if (KeeperException.Code.OK.intValue() == rc) {
+                            FutureUtils.setValue(promise, null);
+                            LOG.trace("Created path {}.", path);
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            FutureUtils.setValue(promise, null);
+                            LOG.trace("Path {} is already existed.", path);
+                        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
+                            FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
+                        } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
+                            FutureUtils.setException(promise, new DLInterruptedException(path));
+                        } else {
+                            FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
+                        }
+                    }
+                }, null);
+        return promise;
+    }
+
+    @Override
+    public Future<DistributedLock> createReadLock(final ZKLogMetadataForReader metadata,
+                                                  Optional<String> readerId) {
+        final String readLockPath = metadata.getReadLockPath(readerId);
+        return ensureReadLockPathExist(metadata, readLockPath).flatMap(
+                new ExceptionalFunction<Void, Future<DistributedLock>>() {
+            @Override
+            public Future<DistributedLock> applyE(Void value) throws Throwable {
+                // Unfortunately this has a blocking call which we should not execute on the
+                // ZK completion thread
+                return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
+                    @Override
+                    public DistributedLock applyE() throws Throwable {
+                        return new ZKDistributedLock(
+                            getLockStateExecutor(true),
+                            getLockFactory(true),
+                            readLockPath,
+                            conf.getLockTimeoutMilliSeconds(),
+                            statsLogger.scope("read_lock"));
+                    }
+                });
+            }
+        });
+    }
+
+    //
+    // Create Log
+    //
+
+    static class MetadataIndex {
+        static final int LOG_ROOT_PARENT = 0;
+        static final int LOG_ROOT = 1;
+        static final int MAX_TXID = 2;
+        static final int VERSION = 3;
+        static final int LOCK = 4;
+        static final int READ_LOCK = 5;
+        static final int LOGSEGMENTS = 6;
+        static final int ALLOCATION = 7;
+    }
+
+    static int bytesToInt(byte[] b) {
+        assert b.length >= 4;
+        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
+    }
+
+    static byte[] intToBytes(int i) {
+        return new byte[]{
+            (byte) (i >> 24),
+            (byte) (i >> 16),
+            (byte) (i >> 8),
+            (byte) (i)};
+    }
+
+    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
+                                                                 String logRootPath,
+                                                                 boolean ownAllocator) {
+        // Note re. persistent lock state initialization: the read lock persistent state (path) is
+        // initialized here but only used in the read handler. The reason is its more convenient and
+        // less error prone to manage all stream structure in one place.
+        final String logRootParentPath = new File(logRootPath).getParent();
+        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        final String lockPath = logRootPath + LOCK_PATH;
+        final String readLockPath = logRootPath + READ_LOCK_PATH;
+        final String versionPath = logRootPath + VERSION_PATH;
+        final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
+        List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
+        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
+        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
+        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
+        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
+        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
+        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
+        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
+        if (ownAllocator) {
+            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
+        }
+
+        return Future.collect(checkFutures);
+    }
+
+    static boolean pathExists(Versioned<byte[]> metadata) {
+        return null != metadata.getValue() && null != metadata.getVersion();
+    }
+
+    static void ensureMetadataExist(Versioned<byte[]> metadata) {
+        Preconditions.checkNotNull(metadata.getValue());
+        Preconditions.checkNotNull(metadata.getVersion());
+    }
+
+    static void createMissingMetadata(final ZooKeeper zk,
+                                      final String logRootPath,
+                                      final List<Versioned<byte[]>> metadatas,
+                                      final List<ACL> acl,
+                                      final boolean ownAllocator,
+                                      final boolean createIfNotExists,
+                                      final Promise<List<Versioned<byte[]>>> promise) {
+        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
+        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
+        CreateMode createMode = CreateMode.PERSISTENT;
+
+        // log root parent path
+        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
+            pathsToCreate.add(null);
+        } else {
+            String logRootParentPath = new File(logRootPath).getParent();
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+
+        // log root path
+        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+
+        // max id
+        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
+            pathsToCreate.add(zeroTxnIdData);
+            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
+        }
+        // version
+        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] versionData = intToBytes(LAYOUT_VERSION);
+            pathsToCreate.add(versionData);
+            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
+        }
+        // lock path
+        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+        // read lock path
+        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
+            pathsToCreate.add(null);
+        } else {
+            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+        }
+        // log segments path
+        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
+            pathsToCreate.add(null);
+        } else {
+            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
+                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+            pathsToCreate.add(logSegmentsData);
+            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
+        }
+        // allocation path
+        if (ownAllocator) {
+            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
+                pathsToCreate.add(null);
+            } else {
+                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
+                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+            }
+        }
+        if (zkOps.isEmpty()) {
+            // nothing missed
+            promise.setValue(metadatas);
+            return;
+        }
+        if (!createIfNotExists) {
+            promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
+            return;
+        }
+
+        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    List<Versioned<byte[]>> finalMetadatas =
+                            Lists.newArrayListWithExpectedSize(metadatas.size());
+                    for (int i = 0; i < pathsToCreate.size(); i++) {
+                        byte[] dataCreated = pathsToCreate.get(i);
+                        if (null == dataCreated) {
+                            finalMetadatas.add(metadatas.get(i));
+                        } else {
+                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
+                        }
+                    }
+                    promise.setValue(finalMetadatas);
+                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                    promise.setException(new LogExistsException("Someone just created log "
+                            + logRootPath));
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        StringBuilder builder = new StringBuilder();
+                        for (OpResult result : resultList) {
+                            if (result instanceof OpResult.ErrorResult) {
+                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
+                                builder.append(errorResult.getErr()).append(",");
+                            } else {
+                                builder.append(0).append(",");
+                            }
+                        }
+                        String resultCodeList = builder.substring(0, builder.length() - 1);
+                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
+                    }
+
+                    promise.setException(new ZKException("Failed to create log " + logRootPath,
+                            KeeperException.Code.get(rc)));
+                }
+            }
+        }, null);
+    }
+
+    static ZKLogMetadataForWriter processLogMetadatas(URI uri,
+                                                      String logName,
+                                                      String logIdentifier,
+                                                      List<Versioned<byte[]>> metadatas,
+                                                      boolean ownAllocator)
+            throws UnexpectedException {
+        try {
+            // max id
+            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
+            ensureMetadataExist(maxTxnIdData);
+            // version
+            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
+            ensureMetadataExist(maxTxnIdData);
+            Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
+            // lock path
+            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
+            // read lock path
+            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
+            // max lssn
+            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
+            ensureMetadataExist(maxLSSNData);
+            try {
+                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
+            } catch (NumberFormatException nfe) {
+                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
+            }
+            // allocation path
+            Versioned<byte[]>  allocationData;
+            if (ownAllocator) {
+                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
+                ensureMetadataExist(allocationData);
+            } else {
+                allocationData = new Versioned<byte[]>(null, null);
+            }
+            return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
+                    maxLSSNData, maxTxnIdData, allocationData);
+        } catch (IllegalArgumentException iae) {
+            throw new UnexpectedException("Invalid log " + logName, iae);
+        } catch (NullPointerException npe) {
+            throw new UnexpectedException("Invalid log " + logName, npe);
+        }
+    }
+
+    static Future<ZKLogMetadataForWriter> getLog(final URI uri,
+                                                 final String logName,
+                                                 final String logIdentifier,
+                                                 final ZooKeeperClient zooKeeperClient,
+                                                 final boolean ownAllocator,
+                                                 final boolean createIfNotExists) {
+        final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
+        try {
+            PathUtils.validatePath(logRootPath);
+        } catch (IllegalArgumentException e) {
+            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
+            return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
+        }
+
+        try {
+            final ZooKeeper zk = zooKeeperClient.get();
+            return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
+                    .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
+                        @Override
+                        public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
+                            Promise<List<Versioned<byte[]>>> promise =
+                                    new Promise<List<Versioned<byte[]>>>();
+                            createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
+                                    ownAllocator, createIfNotExists, promise);
+                            return promise;
+                        }
+                    }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
+                        @Override
+                        public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
+                            return processLogMetadatas(
+                                    uri,
+                                    logName,
+                                    logIdentifier,
+                                    metadatas,
+                                    ownAllocator);
+                        }
+                    });
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
+                    KeeperException.Code.CONNECTIONLOSS));
+        } catch (InterruptedException e) {
+            return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
+        }
+    }
+
+    @Override
+    public Future<ZKLogMetadataForWriter> getLog(final URI uri,
+                                                 final String logName,
+                                                 final boolean ownAllocator,
+                                                 final boolean createIfNotExists) {
+        return getLog(
+                uri,
+                logName,
+                conf.getUnpartitionedStreamName(),
+                zooKeeperClient,
+                ownAllocator,
+                createIfNotExists);
+    }
+
+    //
+    // Delete Log
+    //
+
+    @Override
+    public Future<Void> deleteLog(URI uri, final String logName) {
+        final Promise<Void> promise = new Promise<Void>();
+        try {
+            String streamPath = ZKLogMetadata.getLogStreamPath(uri, logName);
+            ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (KeeperException.Code.OK.intValue() != rc) {
+                        FutureUtils.setException(promise,
+                                new ZKException("Encountered zookeeper issue on deleting log stream "
+                                        + logName, KeeperException.Code.get(rc)));
+                        return;
+                    }
+                    FutureUtils.setValue(promise, null);
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+                    + logName, KeeperException.Code.CONNECTIONLOSS));
+        } catch (InterruptedException e) {
+            FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
+                    + logName));
+        } catch (KeeperException e) {
+            FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+                    + logName, e));
+        }
+        return promise;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
index 2ea1671..5144634 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java
@@ -20,6 +20,8 @@ package com.twitter.distributedlog.logsegment;
 import com.google.common.annotations.Beta;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.util.Transaction;
 import com.twitter.distributedlog.util.Transaction.OpListener;
 import com.twitter.util.Future;
@@ -52,15 +54,15 @@ public interface LogSegmentMetadataStore extends Closeable {
      *
      * @param txn
      *          transaction to execute for storing log segment sequence number.
-     * @param path
-     *          path to store sequence number
+     * @param logMetadata
+     *          metadata of the log stream
      * @param sequenceNumber
      *          log segment sequence number to store
      * @param listener
      *          listener on the result to this operation
      */
     void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                          String path,
+                                          ZKLogMetadata logMetadata,
                                           Versioned<Long> sequenceNumber,
                                           OpListener<Version> listener);
 
@@ -69,15 +71,15 @@ public interface LogSegmentMetadataStore extends Closeable {
      *
      * @param txn
      *          transaction to execute for storing transaction id
-     * @param path
-     *          path to store sequence number
+     * @param logMetadata
+     *          metadata of the log stream
      * @param transactionId
      *          transaction id to store
      * @param listener
      *          listener on the result to this operation
      */
     void storeMaxTxnId(Transaction<Object> txn,
-                       String path,
+                       ZKLogMetadataForWriter logMetadata,
                        Versioned<Long> transactionId,
                        OpListener<Version> listener);
 
@@ -91,8 +93,12 @@ public interface LogSegmentMetadataStore extends Closeable {
      *          transaction to execute for this operation
      * @param segment
      *          segment to create
+     * @param opListener
+     *          the listener on the operation result
      */
-    void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+    void createLogSegment(Transaction<Object> txn,
+                          LogSegmentMetadata segment,
+                          OpListener<Void> opListener);
 
     /**
      * Delete a log segment <code>segment</code> under transaction <code>txn</code>.
@@ -105,7 +111,9 @@ public interface LogSegmentMetadataStore extends Closeable {
      * @param segment
      *          segment to delete
      */
-    void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment);
+    void deleteLogSegment(Transaction<Object> txn,
+                          LogSegmentMetadata segment,
+                          OpListener<Void> opListener);
 
     /**
      * Update a log segment <code>segment</code> under transaction <code>txn</code>.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
index baa3240..ac36ef2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java
@@ -51,16 +51,15 @@ public class BKDLConfig implements DLConfig {
             new ConcurrentHashMap<URI, DLConfig>();
 
     public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) {
-        dlConf.setSanityCheckTxnID(bkdlConfig.getSanityCheckTxnID());
         dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
         dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
         if (bkdlConfig.isFederatedNamespace()) {
             dlConf.setCreateStreamIfNotExists(false);
             LOG.info("Disabled createIfNotExists for federated namespace.");
         }
-        LOG.info("Propagate BKDLConfig to DLConfig : sanityCheckTxnID = {}, encodeRegionID = {}," +
+        LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
                         " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.",
-                new Object[] { dlConf.getSanityCheckTxnID(), dlConf.getEncodeRegionIDInLogSegmentMetadata(),
+                new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
                         dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(),
                         bkdlConfig.isFederatedNamespace() });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
index d205b3a..0e5e6d4 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
@@ -177,8 +177,8 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater {
     protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn,
                                                     LogSegmentMetadata newSegment,
                                                     LogSegmentMetadata oldSegment) {
-        metadataStore.deleteLogSegment(txn, oldSegment);
-        metadataStore.createLogSegment(txn, newSegment);
+        metadataStore.deleteLogSegment(txn, oldSegment, null);
+        metadataStore.createLogSegment(txn, newSegment, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
new file mode 100644
index 0000000..db7812e
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.metadata;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
+import com.twitter.distributedlog.lock.DistributedLock;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.util.PermitManager;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.net.URI;
+
+/**
+ * The interface to manage the log stream metadata. The implementation is responsible
+ * for creating the metadata layout.
+ */
+@Beta
+public interface LogStreamMetadataStore extends Closeable {
+
+    /**
+     * Create a transaction for the metadata operations happening in the metadata store.
+     *
+     * @return transaction for the metadata operations
+     */
+    Transaction<Object> newTransaction();
+
+    /**
+     * Ensure the existence of a log stream
+     *
+     * @param uri the location of the log stream
+     * @param logName the name of the log stream
+     * @return future represents the existence of a log stream. {@link com.twitter.distributedlog.LogNotFoundException}
+     *         is thrown if the log doesn't exist
+     */
+    Future<Void> logExists(URI uri, String logName);
+
+    /**
+     * Create the read lock for the log stream.
+     *
+     * @param metadata the metadata for a log stream
+     * @param readerId the reader id used for lock
+     * @return the read lock
+     */
+    Future<DistributedLock> createReadLock(ZKLogMetadataForReader metadata,
+                                           Optional<String> readerId);
+
+    /**
+     * Create the write lock for the log stream.
+     *
+     * @param metadata the metadata for a log stream
+     * @return the write lock
+     */
+    DistributedLock createWriteLock(ZKLogMetadataForWriter metadata);
+
+    /**
+     * Create the metadata of a log.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param streamName the name of the log stream
+     * @param ownAllocator whether to use its own allocator or external allocator
+     * @param createIfNotExists flag to create the stream if it doesn't exist
+     * @return the metadata of the log
+     */
+    Future<ZKLogMetadataForWriter> getLog(URI uri,
+                                          String streamName,
+                                          boolean ownAllocator,
+                                          boolean createIfNotExists);
+
+    /**
+     * Delete the metadata of a log.
+     *
+     * @param uri the location to store the metadata of the log
+     * @param streamName the name of the log stream
+     * @return future represents the result of the deletion.
+     */
+    Future<Void> deleteLog(URI uri, String streamName);
+
+    /**
+     * Get the log segment metadata store.
+     *
+     * @return the log segment metadata store.
+     */
+    LogSegmentMetadataStore getLogSegmentMetadataStore();
+
+    /**
+     * Get the permit manager for this metadata store. It can be used for limiting the concurrent
+     * metadata operations. The implementation can disable handing over the permits when the metadata
+     * store is unavailable (for example zookeeper session expired).
+     *
+     * @return the permit manager
+     */
+    PermitManager getPermitManager();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index bed2fcd..23d8e40 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -267,7 +267,8 @@ public class DistributedLogTool extends Tool {
         protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException {
             DistributedLogNamespace namespace = getFactory().getNamespace();
             assert(namespace instanceof BKDistributedLogNamespace);
-            return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+            return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+                    .getLogSegmentMetadataStore();
         }
 
         protected ZooKeeperClient getZooKeeperClient() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
deleted file mode 100644
index dc25023..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Manager to control all the log segments rolling.
- */
-public class LimitedPermitManager implements PermitManager, Runnable, Watcher {
-
-    static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class);
-
-    static enum PermitState {
-        ALLOWED, DISALLOWED, DISABLED
-    }
-
-    class EpochPermit implements Permit {
-
-        final PermitState state;
-        final int epoch;
-
-        EpochPermit(PermitState state) {
-            this.state = state;
-            this.epoch = LimitedPermitManager.this.epoch.get();
-        }
-
-        int getEpoch() {
-            return epoch;
-        }
-
-        @Override
-        public boolean isAllowed() {
-            return PermitState.ALLOWED == state;
-        }
-    }
-
-    boolean enablePermits = true;
-    final Semaphore semaphore;
-    final int period;
-    final TimeUnit timeUnit;
-    final ScheduledExecutorService executorService;
-    final AtomicInteger epoch = new AtomicInteger(0);
-    private StatsLogger statsLogger = null;
-    private Gauge<Number> outstandingGauge = null;
-
-    public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit,
-                                ScheduledExecutorService executorService) {
-        this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE);
-    }
-
-    public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit,
-            ScheduledExecutorService executorService, StatsLogger statsLogger) {
-        if (concurrency > 0) {
-            this.semaphore = new Semaphore(concurrency);
-        } else {
-            this.semaphore = null;
-        }
-        this.period = period;
-        this.timeUnit = timeUnit;
-        this.executorService = executorService;
-        this.statsLogger = statsLogger;
-        this.outstandingGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return null == semaphore ? 0 : concurrency - semaphore.availablePermits();
-            }
-        };
-        this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge);
-    }
-
-    @Override
-    synchronized public Permit acquirePermit() {
-        if (!enablePermits) {
-            return new EpochPermit(PermitState.DISABLED);
-        }
-        if (null != semaphore) {
-            return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) :
-                    new EpochPermit(PermitState.DISALLOWED);
-        } else {
-            return new EpochPermit(PermitState.ALLOWED);
-        }
-    }
-
-    @Override
-    synchronized public void releasePermit(Permit permit) {
-        if (null != semaphore && permit.isAllowed()) {
-            if (period <= 0) {
-                semaphore.release();
-            } else {
-                try {
-                    executorService.schedule(this, period, timeUnit);
-                } catch (RejectedExecutionException ree) {
-                    LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." +
-                            " Release it immediately : ", timeUnit.toMillis(period), ree);
-                    semaphore.release();
-                }
-            }
-        }
-    }
-
-    @Override
-    synchronized public boolean disallowObtainPermits(Permit permit) {
-        if (!(permit instanceof EpochPermit)) {
-            return false;
-        }
-        if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) {
-            this.enablePermits = false;
-            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public void close() {
-        unregisterGauge();
-    }
-
-    @Override
-    synchronized public boolean allowObtainPermits() {
-        forceSetAllowPermits(true);
-        return true;
-    }
-
-    synchronized void forceSetAllowPermits(boolean allowPermits) {
-        epoch.getAndIncrement();
-        this.enablePermits = allowPermits;
-        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
-    }
-
-    @Override
-    public void run() {
-        semaphore.release();
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (event.getType().equals(Event.EventType.None)) {
-            switch (event.getState()) {
-            case SyncConnected:
-                forceSetAllowPermits(true);
-                break;
-            case Disconnected:
-                forceSetAllowPermits(false);
-                break;
-            case Expired:
-                forceSetAllowPermits(false);
-                break;
-            default:
-                break;
-            }
-        }
-    }
-
-    public void unregisterGauge() {
-        if(this.statsLogger != null && this.outstandingGauge != null) {
-            this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
index 7d76f29..78292e9 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
@@ -17,29 +17,39 @@
  */
 package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.util.Transaction.OpListener;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
 
+import javax.annotation.Nullable;
+
 /**
  * Default zookeeper operation. No action on commiting or aborting.
  */
 public class DefaultZKOp extends ZKOp {
 
-    public static DefaultZKOp of(Op op) {
-        return new DefaultZKOp(op);
+    public static DefaultZKOp of(Op op, OpListener<Void> listener) {
+        return new DefaultZKOp(op, listener);
     }
 
-    private DefaultZKOp(Op op) {
+    private final OpListener<Void> listener;
+
+    private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) {
         super(op);
+        this.listener = opListener;
     }
 
     @Override
     protected void commitOpResult(OpResult opResult) {
-        // no-op
+        if (null != listener) {
+            listener.onCommit(null);
+        }
     }
 
     @Override
     protected void abortOpResult(Throwable t, OpResult opResult) {
-        // no-op
+        if (null != listener) {
+            listener.onAbort(t);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
new file mode 100644
index 0000000..78ff0a2
--- /dev/null
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.zk;
+
+import com.twitter.distributedlog.util.PermitManager;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manager to control all the log segments rolling.
+ */
+public class LimitedPermitManager implements PermitManager, Runnable, Watcher {
+
+    static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class);
+
+    static enum PermitState {
+        ALLOWED, DISALLOWED, DISABLED
+    }
+
+    class EpochPermit implements Permit {
+
+        final PermitState state;
+        final int epoch;
+
+        EpochPermit(PermitState state) {
+            this.state = state;
+            this.epoch = LimitedPermitManager.this.epoch.get();
+        }
+
+        int getEpoch() {
+            return epoch;
+        }
+
+        @Override
+        public boolean isAllowed() {
+            return PermitState.ALLOWED == state;
+        }
+    }
+
+    boolean enablePermits = true;
+    final Semaphore semaphore;
+    final int period;
+    final TimeUnit timeUnit;
+    final ScheduledExecutorService executorService;
+    final AtomicInteger epoch = new AtomicInteger(0);
+    private StatsLogger statsLogger = null;
+    private Gauge<Number> outstandingGauge = null;
+
+    public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit,
+                                ScheduledExecutorService executorService) {
+        this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE);
+    }
+
+    public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit,
+            ScheduledExecutorService executorService, StatsLogger statsLogger) {
+        if (concurrency > 0) {
+            this.semaphore = new Semaphore(concurrency);
+        } else {
+            this.semaphore = null;
+        }
+        this.period = period;
+        this.timeUnit = timeUnit;
+        this.executorService = executorService;
+        this.statsLogger = statsLogger;
+        this.outstandingGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return null == semaphore ? 0 : concurrency - semaphore.availablePermits();
+            }
+        };
+        this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge);
+    }
+
+    @Override
+    synchronized public Permit acquirePermit() {
+        if (!enablePermits) {
+            return new EpochPermit(PermitState.DISABLED);
+        }
+        if (null != semaphore) {
+            return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) :
+                    new EpochPermit(PermitState.DISALLOWED);
+        } else {
+            return new EpochPermit(PermitState.ALLOWED);
+        }
+    }
+
+    @Override
+    synchronized public void releasePermit(Permit permit) {
+        if (null != semaphore && permit.isAllowed()) {
+            if (period <= 0) {
+                semaphore.release();
+            } else {
+                try {
+                    executorService.schedule(this, period, timeUnit);
+                } catch (RejectedExecutionException ree) {
+                    LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." +
+                            " Release it immediately : ", timeUnit.toMillis(period), ree);
+                    semaphore.release();
+                }
+            }
+        }
+    }
+
+    @Override
+    synchronized public boolean disallowObtainPermits(Permit permit) {
+        if (!(permit instanceof EpochPermit)) {
+            return false;
+        }
+        if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) {
+            this.enablePermits = false;
+            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void close() {
+        unregisterGauge();
+    }
+
+    @Override
+    synchronized public boolean allowObtainPermits() {
+        forceSetAllowPermits(true);
+        return true;
+    }
+
+    synchronized void forceSetAllowPermits(boolean allowPermits) {
+        epoch.getAndIncrement();
+        this.enablePermits = allowPermits;
+        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
+    }
+
+    @Override
+    public void run() {
+        semaphore.release();
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (event.getType().equals(Event.EventType.None)) {
+            switch (event.getState()) {
+            case SyncConnected:
+                forceSetAllowPermits(true);
+                break;
+            case Disconnected:
+                forceSetAllowPermits(false);
+                break;
+            case Expired:
+                forceSetAllowPermits(false);
+                break;
+            default:
+                break;
+            }
+        }
+    }
+
+    public void unregisterGauge() {
+        if(this.statsLogger != null && this.outstandingGauge != null) {
+            this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
index d885593..5b788e2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
@@ -33,7 +33,8 @@ public class ZKVersionedSetOp extends ZKOp {
 
     private final OpListener<Version> listener;
 
-    public ZKVersionedSetOp(Op op, OpListener<Version> opListener) {
+    public ZKVersionedSetOp(Op op,
+                            @Nullable OpListener<Version> opListener) {
         super(op);
         this.listener = opListener;
     }
@@ -42,7 +43,9 @@ public class ZKVersionedSetOp extends ZKOp {
     protected void commitOpResult(OpResult opResult) {
         assert(opResult instanceof OpResult.SetDataResult);
         OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
-        listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+        if (null != listener) {
+            listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
+        }
     }
 
     @Override
@@ -60,7 +63,9 @@ public class ZKVersionedSetOp extends ZKOp {
                 cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
             }
         }
-        listener.onAbort(cause);
+        if (null != listener) {
+            listener.onAbort(cause);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index c588cd7..1485ae6 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -429,7 +430,7 @@ public class DLMTestUtil {
                 .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
                 .build();
         l.write(dlm.writerZKC);
-        writeHandler.maxTxId.store(startTxID);
+        writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(
                 writeHandler.getFullyQualifiedName(),
@@ -479,7 +480,7 @@ public class DLMTestUtil {
             .setInprogress(false)
             .build();
         l.write(dlm.writerZKC);
-        writeHandler.maxTxId.store(startTxID);
+        writeHandler.maxTxId.update(Version.ANY, startTxID);
         writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
         BKLogSegmentWriter writer = new BKLogSegmentWriter(
                 writeHandler.getFullyQualifiedName(),

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
index d45a727..6aa38c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAppendOnlyStreamWriter.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream;
 import java.net.URI;
 
 import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.util.FutureUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -203,7 +204,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase {
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
@@ -264,7 +265,7 @@ public class TestAppendOnlyStreamWriter extends TestDistributedLogBase {
         BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        BKDistributedLogManager.createLog(conf, dlm.getReaderZKC(), uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index 6ad9950..c8a1c74 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -148,61 +148,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
-    public void testSanityCheckTxnID() throws Exception {
-        String name = "distrlog-sanity-check-txnid";
-        BKDistributedLogManager dlm = createNewDLM(conf, name);
-        BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
-        long txid = 1;
-        for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
-            LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
-            out.write(op);
-        }
-        out.closeAndComplete();
-
-        BKSyncLogWriter out1 = dlm.startLogSegmentNonPartitioned();
-        LogRecord op1 = DLMTestUtil.getLogRecordInstance(1);
-        try {
-            out1.write(op1);
-            fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
-        } catch (TransactionIdOutOfOrderException tioooe) {
-            // expected
-        }
-        out1.closeAndComplete();
-        dlm.close();
-
-        DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), false);
-        LOG.info("Disable sanity check txn id.");
-        BKDLConfig.clearCachedDLConfigs();
-
-        DistributedLogConfiguration newConf = new DistributedLogConfiguration();
-        newConf.addConfiguration(conf);
-        BKDistributedLogManager newDLM = createNewDLM(newConf, name);
-        BKSyncLogWriter out2 = newDLM.startLogSegmentNonPartitioned();
-        LogRecord op2 = DLMTestUtil.getLogRecordInstance(1);
-        out2.write(op2);
-        out2.closeAndComplete();
-        newDLM.close();
-
-        DLMTestUtil.updateBKDLConfig(bkutil.getUri(), bkutil.getZkServers(), bkutil.getBkLedgerPath(), true);
-        LOG.info("Enable sanity check txn id.");
-        BKDLConfig.clearCachedDLConfigs();
-
-        DistributedLogConfiguration conf3 = new DistributedLogConfiguration();
-        conf3.addConfiguration(conf);
-        BKDistributedLogManager dlm3 = createNewDLM(newConf, name);
-        BKSyncLogWriter out3 = dlm3.startLogSegmentNonPartitioned();
-        LogRecord op3 = DLMTestUtil.getLogRecordInstance(1);
-        try {
-            out3.write(op3);
-            fail("Should fail writing lower txn id if sanityCheckTxnID is enabled.");
-        } catch (TransactionIdOutOfOrderException tioooe) {
-            // expected
-        }
-        out3.closeAndComplete();
-        dlm3.close();
-    }
-
-    @Test(timeout = 60000)
     public void testContinuousReaders() throws Exception {
         String name = "distrlog-continuous";
         BKDistributedLogManager dlm = createNewDLM(conf, name);
@@ -958,12 +903,9 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams =
                 new AtomicReference<Collection<LogSegmentMetadata>>();
 
-        DistributedLogManager dlm = createNewDLM(conf, name);
-        ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder()
-                .uri(createDLMURI("/"))
-                .build();
+        BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
 
-        BKDistributedLogManager.createLog(conf, zkClient, ((BKDistributedLogManager) dlm).uri, name);
+        FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
         dlm.registerListener(new LogSegmentListener() {
             @Override
             public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
@@ -992,7 +934,6 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
                 // no-op
             }
         });
-        LOG.info("Registered listener for stream {}.", name);
         long txid = 1;
         for (int i = 0; i < numSegments; i++) {
             LOG.info("Waiting for creating log segment {}.", i);
@@ -1018,6 +959,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
             assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId());
             ++seqno;
         }
+
+        dlm.close();
     }
 
     @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
index e68b916..ecc20e0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
@@ -102,7 +102,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
         dlm.close();
 
         // create the stream
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, streamName);
+        namespace.createLog(streamName);
 
         DistributedLogManager newDLM = namespace.openLog(streamName);
         LogWriter newWriter = newDLM.startLogSegmentNonPartitioned();
@@ -273,9 +273,9 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
             }
         });
         latches[0].await();
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test1");
+        namespace.createLog("test1");
         latches[1].await();
-        BKDistributedLogManager.createLog(conf, zooKeeperClient, uri, "test2");
+        namespace.createLog("test2");
         latches[2].await();
         assertEquals(0, numFailures.get());
         assertNotNull(receivedStreams.get());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index 604be0e..8a734b5 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -191,7 +191,8 @@ public class TestDistributedLogBase {
     protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) {
         DistributedLogNamespace namespace = factory.getNamespace();
         assertTrue(namespace instanceof BKDistributedLogNamespace);
-        return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
+        return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore()
+                .getLogSegmentMetadataStore();
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index 54b1ab8..027b012 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -253,6 +253,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
         confLocal.setOutputBufferSize(0);
         confLocal.setLogSegmentRollingIntervalMinutes(0);
         confLocal.setMaxLogSegmentBytes(1);
+        confLocal.setLogSegmentRollingConcurrency(Integer.MAX_VALUE);
 
         int numLogSegments = 10;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
index 625742e..66b97be 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/bk/TestLedgerAllocator.java
@@ -140,7 +140,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         logger.info("Try obtaining ledger handle {}", lh.getId());
         byte[] data = zkc.get().getData(allocationPath, false, null);
         assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
-        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
             FutureUtils.result(txn.execute());
             fail("Should fail the transaction when setting unexisted path");
@@ -337,7 +337,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
         ZKTransaction txn = newTxn();
         // close during obtaining ledger.
         LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
-        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1)));
+        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
         try {
             FutureUtils.result(txn.execute());
             fail("Should fail the transaction when setting unexisted path");


Mime
View raw message