From commits-return-7951-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Wed Sep 11 08:27:43 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id DD33818063F for ; Wed, 11 Sep 2019 10:27:42 +0200 (CEST) Received: (qmail 48428 invoked by uid 500); 11 Sep 2019 08:27:42 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 48404 invoked by uid 99); 11 Sep 2019 08:27:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Sep 2019 08:27:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9D202814CE; Wed, 11 Sep 2019 08:27:41 +0000 (UTC) Date: Wed, 11 Sep 2019 08:27:41 +0000 To: "commits@zookeeper.apache.org" Subject: [zookeeper] branch master updated: ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156819046131.25429.8616045410520290776@gitbox.apache.org> From: eolivelli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: zookeeper X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 8460f4ed48c5f9018d882bee2be748de42e965f9 X-Git-Newrev: 42ea26b75105484ef0504396332c276952224158 X-Git-Rev: 42ea26b75105484ef0504396332c276952224158 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zookeeper.git The following commit(s) were added to refs/heads/master by this push: new 42ea26b ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot 42ea26b is described below commit 42ea26b75105484ef0504396332c276952224158 Author: Fangmin Lyu AuthorDate: Wed Sep 11 10:27:28 2019 +0200 ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot Currently, the CloseSession txn is not idempotent, executing the CloseSession twice won't get the same result, which could cause pzxid inconsistent, which in turn cause watches missing. For more details, please check the description in ZOOKEEPER-3145. Author: Fangmin Lyu Reviewers: Enrico Olivelli , Andor Molnár Closes #622 from lvfangmin/ZOOKEEPER-3145 --- zookeeper-jute/src/main/resources/zookeeper.jute | 11 ++- .../java/org/apache/zookeeper/server/DataTree.java | 61 +++++++++--- .../zookeeper/server/PrepRequestProcessor.java | 11 ++- .../apache/zookeeper/server/ZooKeeperServer.java | 19 ++++ .../zookeeper/server/util/SerializeUtils.java | 10 +- .../zookeeper/server/PrepRequestProcessorTest.java | 53 ++++++++++- .../server/quorum/CloseSessionTxnTest.java | 102 +++++++++++++++++++++ .../server/quorum/FuzzySnapshotRelatedTest.java | 60 +++++++----- 8 files changed, 283 insertions(+), 44 deletions(-) diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 2bb0492..8310664 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -72,7 +72,7 @@ module org.apache.zookeeper.proto { vectordataWatches; vectorexistWatches; vectorchildWatches; - } + } class RequestHeader { int xid; int type; @@ -92,12 +92,12 @@ module org.apache.zookeeper.proto { long zxid; int err; } - - class GetDataRequest { + + class GetDataRequest { ustring path; boolean watch; } - + class SetDataRequest { ustring path; buffer data; @@ -323,6 +323,9 @@ module org.apache.zookeeper.txn { class CreateSessionTxn { int timeOut; } + class CloseSessionTxn { + vector paths2Delete; + } class ErrorTxn { int err; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 9d552f0..4657f8e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -63,6 +63,7 @@ import org.apache.zookeeper.server.watch.WatchesPathReport; import org.apache.zookeeper.server.watch.WatchesReport; import org.apache.zookeeper.server.watch.WatchesSummary; import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateTTLTxn; import org.apache.zookeeper.txn.CreateTxn; @@ -947,7 +948,14 @@ public class DataTree { rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: - killSession(header.getClientId(), header.getZxid()); + long sessionId = header.getClientId(); + if (txn != null) { + killSession(sessionId, header.getZxid(), + ephemerals.remove(sessionId), + ((CloseSessionTxn) txn).getPaths2Delete()); + } else { + killSession(sessionId, header.getZxid()); + } break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; @@ -1119,20 +1127,45 @@ public class DataTree { // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in sequence. - Set list = ephemerals.remove(session); - if (list != null) { - for (String path : list) { - try { - deleteNode(path, zxid); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session)); - } - } catch (NoNodeException e) { - LOG.warn("Ignoring NoNodeException for path " - + path - + " while removing ephemeral for dead session 0x" - + Long.toHexString(session)); + killSession(session, zxid, ephemerals.remove(session), null); + } + + void killSession(long session, long zxid, Set paths2DeleteLocal, + List paths2DeleteInTxn) { + if (paths2DeleteInTxn != null) { + deleteNodes(session, zxid, paths2DeleteInTxn); + } + + if (paths2DeleteLocal == null) { + return; + } + + if (paths2DeleteInTxn != null) { + // explicitly check and remove to avoid potential performance + // issue when using removeAll + for (String path: paths2DeleteInTxn) { + paths2DeleteLocal.remove(path); + } + if (!paths2DeleteLocal.isEmpty()) { + LOG.warn("Unexpected extra paths under session {} which " + + "are not in txn 0x{}", paths2DeleteLocal, + Long.toHexString(zxid)); + } + } + + deleteNodes(session, zxid, paths2DeleteLocal); + } + + void deleteNodes(long session, long zxid, Iterable paths2Delete) { + for (String path : paths2Delete) { + try { + deleteNode(path, zxid); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting ephemeral node {} for session 0x{}", path, Long.toHexString(session)); } + } catch (NoNodeException e) { + LOG.warn("Ignoring NoNodeException for path {} while removing ephemeral for dead session 0x{}", + path, Long.toHexString(session)); } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index cccecbf..8b2bab0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -66,6 +66,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTTLTxn; @@ -532,8 +533,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); long startTime = Time.currentElapsedTime(); - Set es = zks.getZKDatabase().getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { + // need to move getEphemerals into zks.outstandingChanges + // synchronized block, otherwise there will be a race + // condition with the on flying deleteNode txn, and we'll + // delete the node again here, which is not correct + Set es = zks.getZKDatabase().getEphemerals(request.sessionId); for (ChangeRecord c : zks.outstandingChanges) { if (c.stat == null) { // Doing a delete @@ -545,7 +550,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req for (String path2Delete : es) { addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null)); } - + if (ZooKeeperServer.isCloseSessionTxnEnabled()) { + request.setTxn(new CloseSessionTxn(new ArrayList(es))); + } zks.sessionTracker.setSessionClosing(request.sessionId); } ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index b8e0bd2..e982cd1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -110,6 +110,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; + // Add a enable/disable option for now, we should remove this one when + // this feature is confirmed to be stable + public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; + private static boolean closeSessionTxnEnabled = true; + static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -127,6 +132,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); + + closeSessionTxnEnabled = Boolean.parseBoolean( + System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); + LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + } + + public static boolean isCloseSessionTxnEnabled() { + return closeSessionTxnEnabled; + } + + public static void setCloseSessionTxnEnabled(boolean enabled) { + ZooKeeperServer.closeSessionTxnEnabled = enabled; + LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, + ZooKeeperServer.closeSessionTxnEnabled); } protected ZooKeeperServerBean jmxServerBean; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java index 25e86a0..2454c43 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java @@ -34,7 +34,9 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.IOUtils; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTTLTxn; @@ -67,7 +69,9 @@ public class SerializeUtils { txn = new CreateSessionTxn(); break; case OpCode.closeSession: - return null; + txn = ZooKeeperServer.isCloseSessionTxnEnabled() + ? new CloseSessionTxn() : null; + break; case OpCode.create: case OpCode.create2: txn = new CreateTxn(); @@ -115,6 +119,10 @@ public class SerializeUtils { create.setAcl(createv0.getAcl()); create.setEphemeral(createv0.getEphemeral()); create.setParentCVersion(-1); + } else if (hdr.getType() == OpCode.closeSession) { + // perhaps this is before CloseSessionTxn was added, + // ignore it and reset txn to null + txn = null; } else { throw e; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 9724423..264601d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -45,6 +45,7 @@ import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord; import org.apache.zookeeper.test.ClientBase; @@ -103,6 +104,10 @@ public class PrepRequestProcessorTest extends ClientBase { } private Request createRequest(Record record, int opCode) throws IOException { + return createRequest(record, opCode, 1L); + } + + private Request createRequest(Record record, int opCode, long sessionId) throws IOException { // encoding ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); @@ -110,7 +115,7 @@ public class PrepRequestProcessorTest extends ClientBase { baos.close(); // Id List ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE); - return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); + return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); } private void process(List ops) throws Exception { @@ -174,6 +179,52 @@ public class PrepRequestProcessorTest extends ClientBase { } /** + * Test ephemerals are deleted when the session is closed with + * the newly added CloseSessionTxn in ZOOKEEPER-3145. + */ + @Test + public void testCloseSessionTxn() throws Exception { + boolean before = ZooKeeperServer.isCloseSessionTxnEnabled(); + + ZooKeeperServer.setCloseSessionTxnEnabled(true); + try { + // create a few ephemerals + long ephemeralOwner = 1; + DataTree dt = zks.getZKDatabase().dataTree; + dt.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0); + dt.createNode("/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0); + + // close session + RequestHeader header = new RequestHeader(); + header.setType(OpCode.closeSession); + + final FinalRequestProcessor frq = new FinalRequestProcessor(zks); + final CountDownLatch latch = new CountDownLatch(1); + processor = new PrepRequestProcessor(zks, new RequestProcessor() { + @Override + public void processRequest(Request request) { + frq.processRequest(request); + latch.countDown(); + } + + @Override + public void shutdown() { + // TODO Auto-generated method stub + } + }); + processor.pRequest(createRequest(header, OpCode.closeSession, ephemeralOwner)); + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + // assert ephemerals are deleted + assertEquals(null, dt.getNode("/foo")); + assertEquals(null, dt.getNode("/bar")); + } finally { + ZooKeeperServer.setCloseSessionTxnEnabled(before); + } + } + + /** * It tests that PrepRequestProcessor will return BadArgument KeeperException * if the request path (if it exists) is not valid, e.g. empty string. */ diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java new file mode 100644 index 0000000..ac9665b --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.Assert; +import org.junit.Test; + +public class CloseSessionTxnTest extends QuorumPeerTestBase { + + /** + * Test leader/leader compatibility with/without CloseSessionTxn, so that + * we can gradually rollout this code and rollback if there is problem. + */ + @Test + public void testCloseSessionTxnCompatile() throws Exception { + // Test 4 cases: + // 1. leader disabled, follower disabled + testCloseSessionWithDifferentConfig(false, false); + + // 2. leader disabled, follower enabled + testCloseSessionWithDifferentConfig(false, true); + + // 3. leader enabled, follower disabled + testCloseSessionWithDifferentConfig(true, false); + + // 4. leader enabled, follower enabled + testCloseSessionWithDifferentConfig(true, true); + } + + private void testCloseSessionWithDifferentConfig( + boolean closeSessionEnabledOnLeader, + boolean closeSessionEnabledOnFollower) throws Exception { + // 1. set up an ensemble with 3 servers + final int numServers = 3; + servers = LaunchServers(numServers); + int leaderId = servers.findLeader(); + ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnLeader); + + // 2. shutdown one of the follower, start it later to pick up the + // CloseSessionTxnEnabled config change + // + // We cannot use different static config in the same JVM, so have to + // use this tricky + int followerA = (leaderId + 1) % numServers; + servers.mt[followerA].shutdown(); + waitForOne(servers.zk[followerA], States.CONNECTING); + + // 3. create an ephemeral node + String path = "/testCloseSessionTxnCompatile"; + servers.zk[leaderId].create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + + // 3. close the client + servers.restartClient(leaderId, this); + waitForOne(servers.zk[leaderId], States.CONNECTED); + + // 4. update the CloseSessionTxnEnabled config before follower A + // started + System.setProperty("zookeeper.retainZKDatabase", "true"); + ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnFollower); + + // 5. restart follower A + servers.mt[followerA].start(); + waitForOne(servers.zk[followerA], States.CONNECTED); + + // 4. verify the ephemeral node is gone + for (int i = 0; i < numServers; i++) { + final CountDownLatch syncedLatch = new CountDownLatch(1); + servers.zk[i].sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + syncedLatch.countDown(); + } + }, null); + Assert.assertTrue(syncedLatch.await(3, TimeUnit.SECONDS)); + Assert.assertNull(servers.zk[i].exists(path, false)); + } + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java index 5cd8259..9003b3d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java @@ -125,7 +125,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { public void testMultiOpConsistency() throws Exception { LOG.info("Create a parent node"); final String path = "/testMultiOpConsistency"; - createEmptyNode(zk[followerA], path); + createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT); LOG.info("Hook to catch the 2nd sub create node txn in multi-op"); CustomDataTree dt = (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree(); @@ -175,8 +175,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode"; final String child = parent + "/child"; - createEmptyNode(zk[leaderId], parent); - createEmptyNode(zk[leaderId], child); + createEmptyNode(zk[leaderId], parent, CreateMode.PERSISTENT); + createEmptyNode(zk[leaderId], child, CreateMode.EPHEMERAL); + // create another child to test closeSession + createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL); LOG.info("shutdown follower {}", followerA); mt[followerA].shutdown(); @@ -205,8 +207,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { final String parent = "/testPZxidUpdatedDuringTakingSnapshot"; final String child = parent + "/child"; - createEmptyNode(zk[followerA], parent); - createEmptyNode(zk[followerA], child); + createEmptyNode(zk[followerA], parent, CreateMode.PERSISTENT); + createEmptyNode(zk[followerA], child, CreateMode.EPHEMERAL); + // create another child to test closeSession + createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL); LOG.info("Set up ZKDatabase to catch the node serializing in DataTree"); addSerializeListener(followerA, parent, child); @@ -217,8 +221,12 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { LOG.info("Restarting follower A to load snapshot"); mt[followerA].shutdown(); - QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CLOSED); mt[followerA].start(); + // zk[followerA] will be closed in addSerializeListener, re-create it + zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], + ClientBase.CONNECTION_TIMEOUT, this); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED); LOG.info("Check and make sure the pzxid of the parent is the same " + "on leader and follower A"); @@ -226,13 +234,14 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { } private void addSerializeListener(int sid, String parent, String child) { - final ZooKeeper zkClient = zk[followerA]; + final ZooKeeper zkClient = zk[sid]; CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree(); dt.addListener(parent, new NodeSerializeListener() { @Override public void nodeSerialized(String path) { try { zkClient.delete(child, -1); + zkClient.close(); LOG.info("Deleted the child node after the parent is serialized"); } catch (Exception e) { LOG.error("Error when deleting node {}", e); @@ -242,13 +251,26 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { } private void compareStat(String path, int sid, int compareWithSid) throws Exception { - Stat stat1 = new Stat(); - zk[sid].getData(path, null, stat1); - - Stat stat2 = new Stat(); - zk[compareWithSid].getData(path, null, stat2); - - assertEquals(stat1, stat2); + ZooKeeper[] compareZk = new ZooKeeper[2]; + compareZk[0] = new ZooKeeper("127.0.0.1:" + clientPorts[sid], + ClientBase.CONNECTION_TIMEOUT, this); + compareZk[1] = new ZooKeeper("127.0.0.1:" + clientPorts[compareWithSid], + ClientBase.CONNECTION_TIMEOUT, this); + QuorumPeerMainTest.waitForAll(compareZk, States.CONNECTED); + + try { + Stat stat1 = new Stat(); + compareZk[0].getData(path, null, stat1); + + Stat stat2 = new Stat(); + compareZk[1].getData(path, null, stat2); + + assertEquals(stat1, stat2); + } finally { + for (ZooKeeper z: compareZk) { + z.close(); + } + } } @Test @@ -286,19 +308,13 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { LOG.info("Make sure the global sessions are consistent with leader"); Map globalSessionsOnLeader = mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); - if (mt[followerA].main.quorumPeer == null) { - LOG.info("quorumPeer is null"); - } - if (mt[followerA].main.quorumPeer.getZkDb() == null) { - LOG.info("zkDb is null"); - } Map globalSessionsOnFollowerA = mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), globalSessionsOnFollowerA.keySet()); assertTrue(globalSessionsOnFollowerA.keySet().containsAll(globalSessionsOnLeader.keySet())); } - private void createEmptyNode(ZooKeeper zk, String path) throws Exception { - zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + private void createEmptyNode(ZooKeeper zk, String path, CreateMode mode) throws Exception { + zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, mode); } interface NodeCreateListener {