Repository: zookeeper
Updated Branches:
refs/heads/master e7ac12c95 -> e501d9cc6
ZOOKEEPER-3127: Fixing potential data inconsistency due to update last processed zxid with
partial multi-op txn
Author: Fangmin Lyu <allenlyu@fb.com>
Reviewers: Benjamin Reed <breed@apache.org>, Michael Han <hanm@apache.org>
Closes #606 from lvfangmin/ZOOKEEPER-3127
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e501d9cc
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e501d9cc
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e501d9cc
Branch: refs/heads/master
Commit: e501d9cc67fbaa6e825292fd838711259b6c9789
Parents: e7ac12c
Author: Fangmin Lyu <allenlyu@fb.com>
Authored: Wed Sep 5 13:35:38 2018 -0700
Committer: Michael Han <hanm@apache.org>
Committed: Wed Sep 5 13:35:38 2018 -0700
----------------------------------------------------------------------
.../org/apache/zookeeper/server/DataTree.java | 53 +++--
.../org/apache/zookeeper/server/ZKDatabase.java | 9 +-
.../server/quorum/FuzzySnapshotRelatedTest.java | 212 +++++++++++++++++++
.../server/quorum/QuorumPeerMainTest.java | 6 +-
4 files changed, 260 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/DataTree.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java
index 9a4f1a7..e0e6661 100644
--- a/src/java/main/org/apache/zookeeper/server/DataTree.java
+++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
@@ -788,7 +788,11 @@ public class DataTree {
public volatile long lastProcessedZxid = 0;
- public ProcessTxnResult processTxn(TxnHeader header, Record txn)
+ public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
+ return this.processTxn(header, txn, false);
+ }
+
+ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
ProcessTxnResult rc = new ProcessTxnResult();
@@ -943,7 +947,7 @@ public class DataTree {
TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
header.getZxid(), header.getTime(),
subtxn.getType());
- ProcessTxnResult subRc = processTxn(subHdr, record);
+ ProcessTxnResult subRc = processTxn(subHdr, record, true);
rc.multiResult.add(subRc);
if (subRc.err != 0 && rc.err == 0) {
rc.err = subRc.err ;
@@ -961,22 +965,41 @@ public class DataTree {
LOG.debug("Failed: " + header + ":" + txn, e);
}
}
+
+
/*
- * A snapshot might be in progress while we are modifying the data
- * tree. If we set lastProcessedZxid prior to making corresponding
- * change to the tree, then the zxid associated with the snapshot
- * file will be ahead of its contents. Thus, while restoring from
- * the snapshot, the restore method will not apply the transaction
- * for zxid associated with the snapshot file, since the restore
- * method assumes that transaction to be present in the snapshot.
+ * Things we can only update after the whole txn is applied to data
+ * tree.
*
- * To avoid this, we first apply the transaction and then modify
- * lastProcessedZxid. During restore, we correctly handle the
- * case where the snapshot contains data ahead of the zxid associated
- * with the file.
+ * If we update the lastProcessedZxid with the first sub txn in multi
+ * and there is a snapshot in progress, it's possible that the zxid
+ * associated with the snapshot only include partial of the multi op.
+ *
+ * When loading snapshot, it will only load the txns after the zxid
+ * associated with snapshot file, which could cause data inconsistency
+ * due to missing sub txns.
+ *
+ * To avoid this, we only update the lastProcessedZxid when the whole
+ * multi-op txn is applied to DataTree.
*/
- if (rc.zxid > lastProcessedZxid) {
- lastProcessedZxid = rc.zxid;
+ if (!isSubTxn) {
+ /*
+ * A snapshot might be in progress while we are modifying the data
+ * tree. If we set lastProcessedZxid prior to making corresponding
+ * change to the tree, then the zxid associated with the snapshot
+ * file will be ahead of its contents. Thus, while restoring from
+ * the snapshot, the restore method will not apply the transaction
+ * for zxid associated with the snapshot file, since the restore
+ * method assumes that transaction to be present in the snapshot.
+ *
+ * To avoid this, we first apply the transaction and then modify
+ * lastProcessedZxid. During restore, we correctly handle the
+ * case where the snapshot contains data ahead of the zxid associated
+ * with the file.
+ */
+ if (rc.zxid > lastProcessedZxid) {
+ lastProcessedZxid = rc.zxid;
+ }
}
/*
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 7b00715..04145cb 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -95,7 +95,7 @@ public class ZKDatabase {
* @param snapLog the FileTxnSnapLog mapping this zkdatabase
*/
public ZKDatabase(FileTxnSnapLog snapLog) {
- dataTree = new DataTree();
+ dataTree = createDataTree();
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
@@ -138,7 +138,7 @@ public class ZKDatabase {
/* to be safe we just create a new
* datatree.
*/
- dataTree = new DataTree();
+ dataTree = createDataTree();
sessionsWithTimeouts.clear();
WriteLock lock = logLock.writeLock();
try {
@@ -644,4 +644,9 @@ public class ZKDatabase {
public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
return dataTree.removeWatch(path, type, watcher);
}
+
+ // visible for testing
+ public DataTree createDataTree() {
+ return new DataTree();
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
new file mode 100644
index 0000000..0e3b230
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.security.sasl.SaslException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test cases used to catch corner cases due to fuzzy snapshot.
+ */
+public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FuzzySnapshotRelatedTest.class);
+
+ MainThread[] mt = null;
+ ZooKeeper[] zk = null;
+ int leaderId;
+ int followerA;
+
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Start up a 3 server quorum");
+ final int ENSEMBLE_SERVERS = 3;
+ final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+ + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+
+ // start servers
+ mt = new MainThread[ENSEMBLE_SERVERS];
+ zk = new ZooKeeper[ENSEMBLE_SERVERS];
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+ false) {
+ @Override
+ public TestQPMain getTestQPMain() {
+ return new CustomizedQPMain();
+ }
+ };
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ QuorumPeerMainTest.waitForAll(zk, States.CONNECTED);
+ LOG.info("all servers started");
+
+ leaderId = -1;
+ followerA = -1;
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ leaderId = i;
+ } else if (followerA == -1) {
+ followerA = i;
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mt != null) {
+ for (MainThread t: mt) {
+ t.shutdown();
+ }
+ }
+
+ if (zk != null) {
+ for (ZooKeeper z: zk) {
+ z.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiOpConsistency() throws Exception {
+ LOG.info("Create a parent node");
+ final String path = "/testMultiOpConsistency";
+ createEmptyNode(zk[followerA], path);
+
+ LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
+ CustomDataTree dt =
+ (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
+
+ final ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
+
+ String node1 = path + "/1";
+ String node2 = path + "/2";
+
+ dt.addNodeCreateListener(node2, new NodeCreateListener() {
+ @Override
+ public void process(String path) {
+ LOG.info("Take a snapshot");
+ zkServer.takeSnapshot(true);
+ }
+ });
+
+ LOG.info("Issue a multi op to create 2 nodes");
+ zk[followerA].multi(Arrays.asList(
+ Op.create(node1, node1.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(node2, node2.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT))
+ );
+
+ LOG.info("Restart the server");
+ mt[followerA].shutdown();
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+
+ mt[followerA].start();
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
+
+ LOG.info("Make sure the node consistent with leader");
+ Assert.assertEquals(new String(zk[leaderId].getData(node2, null, null)),
+ new String(zk[followerA].getData(node2, null, null)));
+ }
+
+ private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
+ zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ static interface NodeCreateListener {
+ public void process(String path);
+
+ }
+
+ static class CustomDataTree extends DataTree {
+ Map<String, NodeCreateListener> nodeCreateListeners =
+ new HashMap<String, NodeCreateListener>();
+
+ @Override
+ public void createNode(final String path, byte data[], List<ACL> acl,
+ long ephemeralOwner, int parentCVersion, long zxid,
+ long time, Stat outputStat)
+ throws NoNodeException, NodeExistsException {
+ NodeCreateListener listener = nodeCreateListeners.get(path);
+ if (listener != null) {
+ listener.process(path);
+ }
+ super.createNode(path, data, acl, ephemeralOwner, parentCVersion,
+ zxid, time, outputStat);
+ }
+
+ public void addNodeCreateListener(String path, NodeCreateListener listener) {
+ nodeCreateListeners.put(path, listener);
+ }
+ }
+
+ static class CustomizedQPMain extends TestQPMain {
+ @Override
+ protected QuorumPeer getQuorumPeer() throws SaslException {
+ return new QuorumPeer() {
+ @Override
+ public void setZKDatabase(ZKDatabase database) {
+ super.setZKDatabase(new ZKDatabase(this.getTxnFactory()) {
+ @Override
+ public DataTree createDataTree() {
+ return new CustomDataTree();
+ }
+ });
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e501d9cc/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 5928ea9..d48ea04 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -452,7 +452,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing);
}
- private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+ public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException
{
int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
while (zk.getState() != state) {
if (iterations-- == 0) {
@@ -466,7 +466,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
waitForAll(servers.zk, state);
}
- private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
+ public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException
{
int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
boolean someoneNotConnected = true;
while (someoneNotConnected) {
@@ -487,7 +487,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
}
- private void logStates(ZooKeeper[] zks) {
+ public static void logStates(ZooKeeper[] zks) {
StringBuilder sbBuilder = new StringBuilder("Connection States: {");
for (int i = 0; i < zks.length; i++) {
sbBuilder.append(i + " : " + zks[i].getState() + ", ");
|