distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [05/31] incubator-distributedlog git commit: DL-117: Stream metadata store
Date Fri, 30 Dec 2016 00:07:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 5d3be7d..f2e30ce 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -23,22 +23,21 @@ import com.google.common.collect.Lists;
 import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.LockingException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.logsegment.RollingPolicy;
 import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
 import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
+import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.util.DLUtils;
@@ -49,9 +48,6 @@ import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Transaction;
 import com.twitter.distributedlog.util.PermitLimiter;
 import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKOp;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -60,19 +56,11 @@ import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -84,7 +72,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Charsets.UTF_8;
 import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
 
 /**
@@ -102,11 +89,11 @@ import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F
 class BKLogWriteHandler extends BKLogHandler {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
+    protected final ZKLogMetadataForWriter logMetadataForWriter;
     protected final DistributedLock lock;
     protected final LedgerAllocator ledgerAllocator;
     protected final MaxTxId maxTxId;
     protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
-    protected final boolean sanityCheckTxnId;
     protected final boolean validateLogSegmentSequenceNumber;
     protected final int regionId;
     protected final RollingPolicy rollingPolicy;
@@ -164,9 +151,8 @@ class BKLogWriteHandler extends BKLogHandler {
      */
     BKLogWriteHandler(ZKLogMetadataForWriter logMetadata,
                       DistributedLogConfiguration conf,
-                      ZooKeeperClientBuilder zkcBuilder,
                       BookKeeperClientBuilder bkcBuilder,
-                      LogSegmentMetadataStore metadataStore,
+                      LogStreamMetadataStore streamMetadataStore,
                       LogSegmentMetadataCache metadataCache,
                       OrderedScheduler scheduler,
                       LedgerAllocator allocator,
@@ -181,14 +167,14 @@ class BKLogWriteHandler extends BKLogHandler {
                       DistributedLock lock /** owned by handler **/) {
         super(logMetadata,
                 conf,
-                zkcBuilder,
                 bkcBuilder,
-                metadataStore,
+                streamMetadataStore,
                 metadataCache,
                 scheduler,
                 statsLogger,
                 alertStatsLogger,
                 clientId);
+        this.logMetadataForWriter = logMetadata;
         this.perLogStatsLogger = perLogStatsLogger;
         this.writeLimiter = writeLimiter;
         this.featureProvider = featureProvider;
@@ -202,15 +188,13 @@ class BKLogWriteHandler extends BKLogHandler {
         } else {
             this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
         }
-        this.sanityCheckTxnId = conf.getSanityCheckTxnID();
         this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
 
         // Construct the max sequence no
         maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
         inprogressLSSNs = new LinkedList<Long>();
         // Construct the max txn id.
-        maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(),
-                conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData());
+        maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
 
         // Schedule fetching log segment list in background before we access it.
         // We don't need to watch the log segment list changes for writer, as it manages log segment list.
@@ -291,13 +275,12 @@ class BKLogWriteHandler extends BKLogHandler {
     }
 
     // Transactional operations for MaxLogSegmentSequenceNo
-    void storeMaxSequenceNumber(final Transaction txn,
+    void storeMaxSequenceNumber(final Transaction<Object> txn,
                                 final MaxLogSegmentSequenceNo maxSeqNo,
                                 final long seqNo,
                                 final boolean isInprogress) {
-        byte[] data = DLUtils.serializeLogSegmentSequenceNumber(seqNo);
-        Op zkOp = Op.setData(logMetadata.getLogSegmentsPath(), data, maxSeqNo.getZkVersion());
-        txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
+        metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
+                new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version version) {
                 if (validateLogSegmentSequenceNumber) {
@@ -309,69 +292,60 @@ class BKLogWriteHandler extends BKLogHandler {
                         }
                     }
                 }
-                maxSeqNo.update((ZkVersion) version, seqNo);
+                maxSeqNo.update(version, seqNo);
             }
 
             @Override
             public void onAbort(Throwable t) {
                 // no-op
             }
-        }));
+        });
     }
 
     // Transactional operations for MaxTxId
-    void storeMaxTxId(final ZKTransaction txn,
+    void storeMaxTxId(final Transaction<Object> txn,
                       final MaxTxId maxTxId,
                       final long txId) {
-        byte[] data = maxTxId.couldStore(txId);
-        if (null != data) {
-            Op zkOp = Op.setData(maxTxId.getZkPath(), data, -1);
-            txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
-                @Override
-                public void onCommit(Version version) {
-                    maxTxId.setMaxTxId(txId);
-                }
-
-                @Override
-                public void onAbort(Throwable t) {
+        metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
+                new Transaction.OpListener<Version>() {
+                    @Override
+                    public void onCommit(Version version) {
+                                                        maxTxId.update(version, txId);
+                                                                                      }
 
-                }
-            }));
-        }
+                    @Override
+                    public void onAbort(Throwable t) {
+                        // no-op
+                    }
+                });
     }
 
     // Transactional operations for logsegment
-    void writeLogSegment(final ZKTransaction txn,
-                         final List<ACL> acl,
-                         final String inprogressSegmentName,
-                         final LogSegmentMetadata metadata,
-                         final String path) {
-        byte[] finalisedData = metadata.getFinalisedData().getBytes(UTF_8);
-        Op zkOp = Op.create(path, finalisedData, acl, CreateMode.PERSISTENT);
-        txn.addOp(new ZKOp(zkOp) {
+    void writeLogSegment(final Transaction<Object> txn,
+                         final LogSegmentMetadata metadata) {
+        metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
             @Override
-            protected void commitOpResult(OpResult opResult) {
-                addLogSegmentToCache(inprogressSegmentName, metadata);
+            public void onCommit(Void r) {
+                addLogSegmentToCache(metadata.getSegmentName(), metadata);
             }
 
             @Override
-            protected void abortOpResult(Throwable t, OpResult opResult) {
+            public void onAbort(Throwable t) {
                 // no-op
             }
         });
     }
 
-    void deleteLogSegment(final ZKTransaction txn,
-                          final String logSegmentName,
-                          final String logSegmentPath) {
-        Op zkOp = Op.delete(logSegmentPath, -1);
-        txn.addOp(new ZKOp(zkOp) {
+    void deleteLogSegment(final Transaction<Object> txn,
+                          final LogSegmentMetadata metadata) {
+        metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
             @Override
-            protected void commitOpResult(OpResult opResult) {
-                removeLogSegmentFromCache(logSegmentName);
+            public void onCommit(Void r) {
+                removeLogSegmentFromCache(metadata.getSegmentName());
             }
+
             @Override
-            protected void abortOpResult(Throwable t, OpResult opResult) {
+            public void onAbort(Throwable t) {
                 // no-op
             }
         });
@@ -405,10 +379,6 @@ class BKLogWriteHandler extends BKLogHandler {
         }
     }
 
-    void register(Watcher watcher) {
-        this.zooKeeperClient.register(watcher);
-    }
-
     /**
      * Start a new log segment in a BookKeeper ledger.
      * First ensure that we have the write lock for this journal.
@@ -539,19 +509,17 @@ class BKLogWriteHandler extends BKLogHandler {
             FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
             return;
         }
-        if (this.sanityCheckTxnId) {
-            long highestTxIdWritten = maxTxId.get();
-            if (txId < highestTxIdWritten) {
-                if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
-                    LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
-                    FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
-                    return;
-                }
-                else {
-                    LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
-                    FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
-                    return;
-                }
+
+        long highestTxIdWritten = maxTxId.get();
+        if (txId < highestTxIdWritten) {
+            if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
+                LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
+                FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+                return;
+            } else {
+                LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
+                FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+                return;
             }
         }
 
@@ -564,7 +532,7 @@ class BKLogWriteHandler extends BKLogHandler {
         }
 
         // start the transaction from zookeeper
-        final ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+        final Transaction<Object> txn = streamMetadataStore.newTransaction();
 
         // failpoint injected before creating ledger
         try {
@@ -617,7 +585,7 @@ class BKLogWriteHandler extends BKLogHandler {
     // once the ledger handle is obtained from allocator, this function should guarantee
     // either the transaction is executed or aborted. Otherwise, the ledger handle will
     // just leak from the allocation pool - hence cause "No Ledger Allocator"
-    private void createInprogressLogSegment(ZKTransaction txn,
+    private void createInprogressLogSegment(Transaction<Object> txn,
                                             final long txId,
                                             final LedgerHandle lh,
                                             boolean bestEffort,
@@ -634,7 +602,6 @@ class BKLogWriteHandler extends BKLogHandler {
             return;
         }
 
-        final String inprogressZnodeName = inprogressZNodeName(lh.getId(), txId, logSegmentSeqNo);
         final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo);
         final LogSegmentMetadata l =
             new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
@@ -645,12 +612,7 @@ class BKLogWriteHandler extends BKLogHandler {
                     .build();
 
         // Create an inprogress segment
-        writeLogSegment(
-                txn,
-                zooKeeperClient.getDefaultACL(),
-                inprogressZnodeName,
-                l,
-                inprogressZnodePath);
+        writeLogSegment(txn, l);
 
         // Try storing max sequence number.
         LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
@@ -667,7 +629,7 @@ class BKLogWriteHandler extends BKLogHandler {
                 try {
                     FutureUtils.setValue(promise, new BKLogSegmentWriter(
                             getFullyQualifiedName(),
-                            inprogressZnodeName,
+                            l.getSegmentName(),
                             conf,
                             conf.getDLLedgerMetadataLayoutVersion(),
                             new BKLogSegmentEntryWriter(lh),
@@ -888,7 +850,6 @@ class BKLogWriteHandler extends BKLogHandler {
         }
 
         LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
-        final String inprogressZnodePath = inprogressZNode(inprogressZnodeName);
         LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
 
         // validate log segment
@@ -936,7 +897,7 @@ class BKLogWriteHandler extends BKLogHandler {
             // ignore the case that a new inprogress log segment is pre-allocated
             // before completing current inprogress one
             LOG.info("Try storing max sequence number {} in completing {}.",
-                    new Object[] { logSegmentSeqNo, inprogressZnodePath });
+                    new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
         } else {
             LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
                     new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
@@ -949,7 +910,6 @@ class BKLogWriteHandler extends BKLogHandler {
         }
 
         // Prepare the completion
-        final String nameForCompletedLedger = completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo);
         final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
         long startSequenceId;
         try {
@@ -970,17 +930,12 @@ class BKLogWriteHandler extends BKLogHandler {
         setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
 
         // prepare the transaction
-        ZKTransaction txn = new ZKTransaction(zooKeeperClient);
+        Transaction<Object> txn = streamMetadataStore.newTransaction();
 
         // create completed log segment
-        writeLogSegment(
-                txn,
-                zooKeeperClient.getDefaultACL(),
-                nameForCompletedLedger,
-                completedLogSegment,
-                pathForCompletedLedger);
+        writeLogSegment(txn, completedLogSegment);
         // delete inprogress log segment
-        deleteLogSegment(txn, inprogressZnodeName, inprogressZnodePath);
+        deleteLogSegment(txn, inprogressLogSegment);
         // store max sequence number
         storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
         // update max txn id.
@@ -991,7 +946,8 @@ class BKLogWriteHandler extends BKLogHandler {
             @Override
             public void onSuccess(Void value) {
                 LOG.info("Completed {} to {} for {} : {}",
-                        new Object[] { inprogressZnodeName, nameForCompletedLedger, getFullyQualifiedName(), completedLogSegment });
+                        new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
+                                getFullyQualifiedName(), completedLogSegment });
                 FutureUtils.setValue(promise, completedLogSegment);
             }
 
@@ -1072,27 +1028,6 @@ class BKLogWriteHandler extends BKLogHandler {
 
     }
 
-    public void deleteLog() throws IOException {
-        lock.checkOwnershipAndReacquire();
-        FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1));
-
-        try {
-            Utils.closeQuietly(lock);
-            zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false);
-            zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false);
-            if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) {
-                ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath());
-            } else {
-                LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath());
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while deleting log znodes", ie);
-            throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke);
-        }
-    }
-
     Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
         if (DLSN.InvalidDLSN == dlsn) {
             List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
@@ -1321,33 +1256,29 @@ class BKLogWriteHandler extends BKLogHandler {
     private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
                                           final Promise<LogSegmentMetadata> promise) {
         Transaction<Object> deleteTxn = metadataStore.transaction();
-        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata);
-        deleteTxn.execute().addEventListener(new FutureEventListener<Void>() {
+        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
             @Override
-            public void onSuccess(Void result) {
+            public void onCommit(Void r) {
                 // purge log segment
                 removeLogSegmentFromCache(segmentMetadata.getZNodeName());
                 promise.setValue(segmentMetadata);
             }
 
             @Override
-            public void onFailure(Throwable cause) {
-                if (cause instanceof ZKException) {
-                    ZKException zke = (ZKException) cause;
-                    if (KeeperException.Code.NONODE == zke.getKeeperExceptionCode()) {
-                        LOG.error("No log segment {} found for {}.",
-                                segmentMetadata, getFullyQualifiedName());
-                        // purge log segment
-                        removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                        promise.setValue(segmentMetadata);
-                        return;
-                    }
+            public void onAbort(Throwable t) {
+                if (t instanceof LogSegmentNotFoundException) {
+                    // purge log segment
+                    removeLogSegmentFromCache(segmentMetadata.getZNodeName());
+                    promise.setValue(segmentMetadata);
+                    return;
+                } else {
+                    LOG.error("Couldn't purge {} for {}: with error {}",
+                            new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
+                    promise.setException(t);
                 }
-                LOG.error("Couldn't purge {} for {}: with error {}",
-                        new Object[]{ segmentMetadata, getFullyQualifiedName(), cause });
-                promise.setException(cause);
             }
         });
+        deleteTxn.execute();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
index f4ca45e..0f6db75 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java
@@ -62,7 +62,6 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification {
                         StatsLogger statsLogger) {
         this.readHandler = bkdlm.createReadHandler(
                 Optional.<String>absent(),
-                bkdlm.getLockStateExecutor(true),
                 this,
                 conf.getDeserializeRecordSetOnReads(),
                 true);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 46a056b..6f37a59 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -1557,6 +1557,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      *
      * @return true if should check txn id with max txn id, otherwise false.
      */
+    @Deprecated
     public boolean getSanityCheckTxnID() {
         return getBoolean(BKDL_MAXID_SANITYCHECK, BKDL_MAXID_SANITYCHECK_DEFAULT);
     }
@@ -1569,6 +1570,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * @return configuration.
      * @see #getSanityCheckTxnID()
      */
+    @Deprecated
     public DistributedLogConfiguration setSanityCheckTxnID(boolean enabled) {
         setProperty(BKDL_MAXID_SANITYCHECK, enabled);
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
index 80cf350..9bfaaba 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxLogSegmentSequenceNo.java
@@ -17,26 +17,15 @@
  */
 package com.twitter.distributedlog;
 
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 
 /**
  * Utility class for storing and reading max ledger sequence number
  */
 class MaxLogSegmentSequenceNo {
 
-    static final Logger LOG = LoggerFactory.getLogger(MaxLogSegmentSequenceNo.class);
-
     Version version;
     long maxSeqNo;
 
@@ -55,24 +44,20 @@ class MaxLogSegmentSequenceNo {
             if (null != logSegmentsData && null != logSegmentsData.getVersion()) {
                 version = logSegmentsData.getVersion();
             } else {
-                version = new ZkVersion(-1);
+                throw new IllegalStateException("Invalid MaxLogSegmentSequenceNo found - " + logSegmentsData);
             }
         }
     }
 
-    synchronized int getZkVersion() {
-        return ((ZkVersion) version).getZnodeVersion();
+    synchronized Version getVersion() {
+        return version;
     }
 
     synchronized long getSequenceNumber() {
         return maxSeqNo;
     }
 
-    synchronized MaxLogSegmentSequenceNo update(int zkVersion, long logSegmentSeqNo) {
-        return update(new ZkVersion(zkVersion), logSegmentSeqNo);
-    }
-
-    synchronized MaxLogSegmentSequenceNo update(ZkVersion version, long logSegmentSeqNo) {
+    synchronized MaxLogSegmentSequenceNo update(Version version, long logSegmentSeqNo) {
         if (version.compare(this.version) == Version.Occurred.AFTER) {
             this.version = version;
             this.maxSeqNo = logSegmentSeqNo;
@@ -80,21 +65,8 @@ class MaxLogSegmentSequenceNo {
         return this;
     }
 
-    synchronized void store(ZooKeeperClient zkc, String path, long logSegmentSeqNo) throws IOException {
-        try {
-            Stat stat = zkc.get().setData(path,
-                    DLUtils.serializeLogSegmentSequenceNumber(logSegmentSeqNo), getZkVersion());
-            update(stat.getVersion(), logSegmentSeqNo);
-        } catch (KeeperException ke) {
-            throw new ZKException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                                  + path + " : ", ke);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zce) {
-            throw new IOException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                    + path + " : ", zce);
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Error writing max ledger sequence number " + logSegmentSeqNo + " to "
-                    + path + " : ", e);
-        }
+    public synchronized Versioned<Long> getVersionedData(long seqNo) {
+        return new Versioned<Long>(seqNo, version);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
index c446a8b..ed7218e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java
@@ -18,13 +18,11 @@
 package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Utility class for storing and reading
  * the max seen txid in zookeeper
@@ -32,73 +30,43 @@ import java.io.IOException;
 class MaxTxId {
     static final Logger LOG = LoggerFactory.getLogger(MaxTxId.class);
 
-    private final ZooKeeperClient zkc;
-    private final String path;
-    private final boolean enabled;
-
+    private Version version;
     private long currentMax;
 
-    MaxTxId(ZooKeeperClient zkc, String path, boolean enabled,
-            Versioned<byte[]> maxTxIdData) {
-        this.zkc = zkc;
-        this.path = path;
-        this.enabled = enabled && null != maxTxIdData && null != maxTxIdData.getVersion()
-                && null != maxTxIdData.getValue();
-        if (this.enabled) {
+    MaxTxId(Versioned<byte[]> maxTxIdData) {
+        if (null != maxTxIdData
+                && null != maxTxIdData.getValue()
+                && null != maxTxIdData.getVersion()) {
+            this.version = maxTxIdData.getVersion();
             try {
                 this.currentMax = DLUtils.deserializeTransactionId(maxTxIdData.getValue());
             } catch (NumberFormatException e) {
-                LOG.warn("Invalid txn id stored in {}", path, e);
-                this.currentMax = 0L;
+                LOG.warn("Invalid txn id stored in {}", e);
+                this.currentMax = DistributedLogConstants.INVALID_TXID;
             }
         } else {
-            this.currentMax = -1L;
+            this.currentMax = DistributedLogConstants.INVALID_TXID;
+            if (null != maxTxIdData && null != maxTxIdData.getVersion()) {
+                this.version = maxTxIdData.getVersion();
+            } else {
+                throw new IllegalStateException("Invalid MaxTxId found - " + maxTxIdData);
+            }
         }
     }
 
-    String getZkPath() {
-        return path;
-    }
-
-    synchronized void setMaxTxId(long txId) {
-        if (enabled && this.currentMax < txId) {
+    synchronized void update(Version version, long txId) {
+        if (version.compare(this.version) == Version.Occurred.AFTER) {
+            this.version = version;
             this.currentMax = txId;
         }
     }
 
-    synchronized byte[] couldStore(long maxTxId) {
-        if (enabled && currentMax < maxTxId) {
-            return DLUtils.serializeTransactionId(maxTxId);
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Store the highest TxID encountered so far so that we
-     * can enforce the monotonically non-decreasing property
-     * This is best effort as this enforcement is only done
-     *
-     * @param maxTxId - the maximum transaction id seen so far
-     * @throws IOException
-     */
-    synchronized void store(long maxTxId) throws IOException {
-        if (enabled && currentMax < maxTxId) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Setting maxTxId to " + maxTxId);
-            }
-            String txidStr = Long.toString(maxTxId);
-            try {
-                zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
-                currentMax = maxTxId;
-            } catch (Exception e) {
-                LOG.error("Error writing new MaxTxId value {}", maxTxId, e);
-            }
-        }
-    }
-
     synchronized long get() {
         return currentMax;
     }
 
+    public Versioned<Long> getVersionedData(long txId) {
+        return new Versioned<Long>(Math.max(txId, currentMax), version);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index f0d2797..1b831ea 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -23,12 +23,16 @@ import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Transaction;
+import com.twitter.distributedlog.util.Transaction.OpListener;
 import com.twitter.distributedlog.zk.DefaultZKOp;
 import com.twitter.distributedlog.zk.ZKOp;
 import com.twitter.distributedlog.zk.ZKTransaction;
@@ -48,10 +52,8 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -220,30 +222,28 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     @Override
     public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn,
-                                                 String path,
+                                                 ZKLogMetadata logMetadata,
                                                  Versioned<Long> lssn,
                                                  Transaction.OpListener<Version> listener) {
         Version version = lssn.getVersion();
         assert(version instanceof ZkVersion);
-
         ZkVersion zkVersion = (ZkVersion) version;
         byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue());
-        Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
 
     @Override
     public void storeMaxTxnId(Transaction<Object> txn,
-                              String path,
+                              ZKLogMetadataForWriter logMetadata,
                               Versioned<Long> transactionId,
                               Transaction.OpListener<Version> listener) {
         Version version = transactionId.getVersion();
         assert(version instanceof ZkVersion);
-
         ZkVersion zkVersion = (ZkVersion) version;
         byte[] data = DLUtils.serializeTransactionId(transactionId.getValue());
-        Op setDataOp = Op.setData(path, data, zkVersion.getZnodeVersion());
+        Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion());
         ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener);
         txn.addOp(zkOp);
     }
@@ -256,29 +256,66 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
     }
 
     @Override
-    public void createLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+    public void createLogSegment(Transaction<Object> txn,
+                                 LogSegmentMetadata segment,
+                                 OpListener<Void> listener) {
         byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
         Op createOp = Op.create(
                 segment.getZkPath(),
                 finalisedData,
                 zkc.getDefaultACL(),
                 CreateMode.PERSISTENT);
-        txn.addOp(DefaultZKOp.of(createOp));
+        txn.addOp(DefaultZKOp.of(createOp, listener));
     }
 
     @Override
-    public void deleteLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
+    public void deleteLogSegment(Transaction<Object> txn,
+                                 final LogSegmentMetadata segment,
+                                 final OpListener<Void> listener) {
         Op deleteOp = Op.delete(
                 segment.getZkPath(),
                 -1);
-        txn.addOp(DefaultZKOp.of(deleteOp));
+        logger.info("Delete segment : {}", segment);
+        txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() {
+            @Override
+            public void onCommit(Void r) {
+                if (null != listener) {
+                    listener.onCommit(r);
+                }
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                logger.info("Aborted transaction on deleting segment {}", segment);
+                KeeperException.Code kc;
+                if (t instanceof KeeperException) {
+                    kc = ((KeeperException) t).code();
+                } else if (t instanceof ZKException) {
+                    kc = ((ZKException) t).getKeeperExceptionCode();
+                } else {
+                    abortListener(t);
+                    return;
+                }
+                if (KeeperException.Code.NONODE == kc) {
+                    abortListener(new LogSegmentNotFoundException(segment.getZkPath()));
+                    return;
+                }
+                abortListener(t);
+            }
+
+            private void abortListener(Throwable t) {
+                if (null != listener) {
+                    listener.onAbort(t);
+                }
+            }
+        }));
     }
 
     @Override
     public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) {
         byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
         Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1);
-        txn.addOp(DefaultZKOp.of(updateOp));
+        txn.addOp(DefaultZKOp.of(updateOp, null));
     }
 
     // reads

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
index 078c040..37beb16 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadata.java
@@ -29,6 +29,17 @@ public class ZKLogMetadata {
     }
 
     /**
+     * Get the top stream path for a given log.
+     *
+     * @param uri namespace to store the log
+     * @param logName name of the log
+     * @return top stream path
+     */
+    public static String getLogStreamPath(URI uri, String logName) {
+        return String.format("%s/%s", uri.getPath(), logName);
+    }
+
+    /**
      * Get the log root path for a given log.
      *
      * @param uri
@@ -59,14 +70,14 @@ public class ZKLogMetadata {
     }
 
     protected static final int LAYOUT_VERSION = -1;
-    protected final static String LOGSEGMENTS_PATH = "/ledgers";
-    protected final static String VERSION_PATH = "/version";
+    public final static String LOGSEGMENTS_PATH = "/ledgers";
+    public final static String VERSION_PATH = "/version";
     // writer znodes
-    protected final static String MAX_TXID_PATH = "/maxtxid";
-    protected final static String LOCK_PATH = "/lock";
-    protected final static String ALLOCATION_PATH = "/allocation";
+    public final static String MAX_TXID_PATH = "/maxtxid";
+    public final static String LOCK_PATH = "/lock";
+    public final static String ALLOCATION_PATH = "/allocation";
     // reader znodes
-    protected final static String READ_LOCK_PATH = "/readLock";
+    public final static String READ_LOCK_PATH = "/readLock";
 
     protected final URI uri;
     protected final String logName;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
index 1de712f..9a1548c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogMetadataForWriter.java
@@ -17,312 +17,15 @@
  */
 package com.twitter.distributedlog.impl.metadata;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
-import java.io.File;
 import java.net.URI;
-import java.util.List;
 
 /**
  * Log Metadata for writer
  */
 public class ZKLogMetadataForWriter extends ZKLogMetadata {
 
-    static final Logger LOG = LoggerFactory.getLogger(ZKLogMetadataForWriter.class);
-
-    static class MetadataIndex {
-        static final int LOG_ROOT_PARENT = 0;
-        static final int LOG_ROOT = 1;
-        static final int MAX_TXID = 2;
-        static final int VERSION = 3;
-        static final int LOCK = 4;
-        static final int READ_LOCK = 5;
-        static final int LOGSEGMENTS = 6;
-        static final int ALLOCATION = 7;
-    }
-
-    static int bytesToInt(byte[] b) {
-        assert b.length >= 4;
-        return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
-    }
-
-    static byte[] intToBytes(int i) {
-        return new byte[]{
-            (byte) (i >> 24),
-            (byte) (i >> 16),
-            (byte) (i >> 8),
-            (byte) (i)};
-    }
-
-    static boolean pathExists(Versioned<byte[]> metadata) {
-        return null != metadata.getValue() && null != metadata.getVersion();
-    }
-
-    static void ensureMetadataExist(Versioned<byte[]> metadata) {
-        Preconditions.checkNotNull(metadata.getValue());
-        Preconditions.checkNotNull(metadata.getVersion());
-    }
-
-    public static Future<ZKLogMetadataForWriter> of(
-            final URI uri,
-            final String logName,
-            final String logIdentifier,
-            final ZooKeeper zk,
-            final List<ACL> acl,
-            final boolean ownAllocator,
-            final boolean createIfNotExists) {
-        final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
-        try {
-            PathUtils.validatePath(logRootPath);
-        } catch (IllegalArgumentException e) {
-            LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
-            return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
-        }
-
-        return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
-                    @Override
-                    public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
-                        Promise<List<Versioned<byte[]>>> promise =
-                                new Promise<List<Versioned<byte[]>>>();
-                        createMissingMetadata(zk, logRootPath, metadatas, acl,
-                                ownAllocator, createIfNotExists, promise);
-                        return promise;
-                    }
-                }).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
-                    @Override
-                    public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
-                        return processLogMetadatas(uri, logName, logIdentifier, metadatas, ownAllocator);
-                    }
-                });
-    }
-
-    static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
-                                                                 String logRootPath,
-                                                                 boolean ownAllocator) {
-        // Note re. persistent lock state initialization: the read lock persistent state (path) is
-        // initialized here but only used in the read handler. The reason is its more convenient and
-        // less error prone to manage all stream structure in one place.
-        final String logRootParentPath = new File(logRootPath).getParent();
-        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        final String lockPath = logRootPath + LOCK_PATH;
-        final String readLockPath = logRootPath + READ_LOCK_PATH;
-        final String versionPath = logRootPath + VERSION_PATH;
-        final String allocationPath = logRootPath + ALLOCATION_PATH;
-
-        int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
-        List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
-        checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
-        checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
-        checkFutures.add(Utils.zkGetData(zk, versionPath, false));
-        checkFutures.add(Utils.zkGetData(zk, lockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
-        checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
-        if (ownAllocator) {
-            checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
-        }
-
-        return Future.collect(checkFutures);
-    }
-
-    static void createMissingMetadata(final ZooKeeper zk,
-                                      final String logRootPath,
-                                      final List<Versioned<byte[]>> metadatas,
-                                      final List<ACL> acl,
-                                      final boolean ownAllocator,
-                                      final boolean createIfNotExists,
-                                      final Promise<List<Versioned<byte[]>>> promise) {
-        final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
-        final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
-        CreateMode createMode = CreateMode.PERSISTENT;
-
-        // log root parent path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
-            pathsToCreate.add(null);
-        } else {
-            String logRootParentPath = new File(logRootPath).getParent();
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // log root path
-        if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-
-        // max id
-        if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
-            pathsToCreate.add(zeroTxnIdData);
-            zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
-        }
-        // version
-        if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] versionData = intToBytes(LAYOUT_VERSION);
-            pathsToCreate.add(versionData);
-            zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
-        }
-        // lock path
-        if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // read lock path
-        if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
-            pathsToCreate.add(null);
-        } else {
-            pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-            zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-        }
-        // log segments path
-        if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
-            pathsToCreate.add(null);
-        } else {
-            byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
-                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
-            pathsToCreate.add(logSegmentsData);
-            zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
-        }
-        // allocation path
-        if (ownAllocator) {
-            if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
-                pathsToCreate.add(null);
-            } else {
-                pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
-                zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        DistributedLogConstants.EMPTY_BYTES, acl, createMode));
-            }
-        }
-        if (zkOps.isEmpty()) {
-            // nothing missed
-            promise.setValue(metadatas);
-            return;
-        }
-        if (!createIfNotExists) {
-            promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
-            return;
-        }
-
-        zk.multi(zkOps, new AsyncCallback.MultiCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    List<Versioned<byte[]>> finalMetadatas =
-                            Lists.newArrayListWithExpectedSize(metadatas.size());
-                    for (int i = 0; i < pathsToCreate.size(); i++) {
-                        byte[] dataCreated = pathsToCreate.get(i);
-                        if (null == dataCreated) {
-                            finalMetadatas.add(metadatas.get(i));
-                        } else {
-                            finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
-                        }
-                    }
-                    promise.setValue(finalMetadatas);
-                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    promise.setException(new LogExistsException("Someone just created log "
-                            + logRootPath));
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        StringBuilder builder = new StringBuilder();
-                        for (OpResult result : resultList) {
-                            if (result instanceof OpResult.ErrorResult) {
-                                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
-                                builder.append(errorResult.getErr()).append(",");
-                            } else {
-                                builder.append(0).append(",");
-                            }
-                        }
-                        String resultCodeList = builder.substring(0, builder.length() - 1);
-                        LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
-                    }
-
-                    promise.setException(new ZKException("Failed to create log " + logRootPath,
-                            KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-    }
-
-    static ZKLogMetadataForWriter processLogMetadatas(URI uri,
-                                                      String logName,
-                                                      String logIdentifier,
-                                                      List<Versioned<byte[]>> metadatas,
-                                                      boolean ownAllocator)
-            throws UnexpectedException {
-        try {
-            // max id
-            Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
-            ensureMetadataExist(maxTxnIdData);
-            // version
-            Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
-            ensureMetadataExist(maxTxnIdData);
-            Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
-            // lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
-            // read lock path
-            ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
-            // max lssn
-            Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
-            ensureMetadataExist(maxLSSNData);
-            try {
-                DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
-            } catch (NumberFormatException nfe) {
-                throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
-            }
-            // allocation path
-            Versioned<byte[]>  allocationData;
-            if (ownAllocator) {
-                allocationData = metadatas.get(MetadataIndex.ALLOCATION);
-                ensureMetadataExist(allocationData);
-            } else {
-                allocationData = new Versioned<byte[]>(null, null);
-            }
-            return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
-                    maxLSSNData, maxTxnIdData, allocationData);
-        } catch (IllegalArgumentException iae) {
-            throw new UnexpectedException("Invalid log " + logName, iae);
-        } catch (NullPointerException npe) {
-            throw new UnexpectedException("Invalid log " + logName, npe);
-        }
-    }
-
     private final Versioned<byte[]> maxLSSNData;
     private final Versioned<byte[]> maxTxIdData;
     private final Versioned<byte[]> allocationData;
@@ -334,12 +37,12 @@ public class ZKLogMetadataForWriter extends ZKLogMetadata {
      * @param logName       name of the log
      * @param logIdentifier identifier of the log
      */
-    private ZKLogMetadataForWriter(URI uri,
-                                   String logName,
-                                   String logIdentifier,
-                                   Versioned<byte[]> maxLSSNData,
-                                   Versioned<byte[]> maxTxIdData,
-                                   Versioned<byte[]> allocationData) {
+    public ZKLogMetadataForWriter(URI uri,
+                                  String logName,
+                                  String logIdentifier,
+                                  Versioned<byte[]> maxLSSNData,
+                                  Versioned<byte[]> maxTxIdData,
+                                  Versioned<byte[]> allocationData) {
         super(uri, logName, logIdentifier);
         this.maxLSSNData = maxLSSNData;
         this.maxTxIdData = maxTxIdData;


Mime
View raw message