From commits-return-7000-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Sat Sep 15 00:08:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1281B180647 for ; Sat, 15 Sep 2018 00:08:33 +0200 (CEST) Received: (qmail 68539 invoked by uid 500); 14 Sep 2018 22:08:33 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 68528 invoked by uid 99); 14 Sep 2018 22:08:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Sep 2018 22:08:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DBDE7DFC4A; Fri, 14 Sep 2018 22:08:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hanm@apache.org To: commits@zookeeper.apache.org Message-Id: <45e095ce9ad4464eae1667c3cc743dab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-3144: Fix potential ephemeral nodes inconsistent due to global session inconsistent with fuzzy snapshot Date: Fri, 14 Sep 2018 22:08:32 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/master 716300812 -> b58791016 ZOOKEEPER-3144: Fix potential ephemeral nodes inconsistent due to global session inconsistent with fuzzy snapshot There is a race condition between update the lastProcessedZxid and the actual session change in DataTree, which could cause global session inconsistent, which then could cause ephemeral inconsistent. For more details, please check the description in JIRA ZOOKEEPER-3144. Author: Fangmin Lyu Reviewers: Michael Han Closes #621 from lvfangmin/ZOOKEEPER-3144 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/b5879101 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/b5879101 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/b5879101 Branch: refs/heads/master Commit: b58791016424e662c816e2253de96f3771f5d301 Parents: 7163008 Author: Fangmin Lyu Authored: Fri Sep 14 15:08:19 2018 -0700 Committer: Michael Han Committed: Fri Sep 14 15:08:19 2018 -0700 ---------------------------------------------------------------------- .../zookeeper/server/ZooKeeperServer.java | 28 +++--- .../server/quorum/FuzzySnapshotRelatedTest.java | 89 +++++++++++++++++++- 2 files changed, 103 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b5879101/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 09c6a8a..70cb75b 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -268,21 +268,21 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { */ public void loadData() throws IOException, InterruptedException { /* - * When a new leader starts executing Leader#lead, it + * When a new leader starts executing Leader#lead, it * invokes this method. The database, however, has been * initialized before running leader election so that * the server could pick its zxid for its initial vote. * It does it by invoking QuorumPeer#getLastLoggedZxid. * Consequently, we don't need to initialize it once more - * and avoid the penalty of loading it a second time. Not + * and avoid the penalty of loading it a second time. Not * reloading it is particularly important for applications * that host a large database. - * + * * The following if block checks whether the database has * been initialized or not. Note that this method is - * invoked by at least one other method: + * invoked by at least one other method: * ZooKeeperServer#startdata. - * + * * See ZOOKEEPER-1642 for more detail. */ if(zkDb.isInitialized()){ @@ -291,7 +291,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { else { setZxid(zkDb.loadDataBase()); } - + // Clean up dead sessions List deadSessions = new LinkedList(); for (Long session : zkDb.getSessions()) { @@ -364,7 +364,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public SessionTracker getSessionTracker() { return sessionTracker; } - + long getNextZxid() { return hzxid.incrementAndGet(); } @@ -1181,7 +1181,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { String authorizationID = saslServer.getAuthorizationID(); LOG.info("adding SASL authorization for authorizationID: " + authorizationID); cnxn.addAuthInfo(new Id("sasl",authorizationID)); - if (System.getProperty("zookeeper.superUser") != null && + if (System.getProperty("zookeeper.superUser") != null && authorizationID.equals(System.getProperty("zookeeper.superUser"))) { cnxn.addAuthInfo(new Id("super", "")); } @@ -1224,11 +1224,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ProcessTxnResult rc; int opCode = request != null ? request.type : hdr.getType(); long sessionId = request != null ? request.sessionId : hdr.getClientId(); - if (hdr != null) { - rc = getZKDatabase().processTxn(hdr, txn); - } else { - rc = new ProcessTxnResult(); - } + if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; @@ -1241,6 +1237,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } + + if (hdr != null) { + rc = getZKDatabase().processTxn(hdr, txn); + } else { + rc = new ProcessTxnResult(); + } return rc; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b5879101/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java index 8abab52..c4ca8a8 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.sasl.SaslException; import org.apache.jute.OutputArchive; @@ -42,6 +43,7 @@ import org.apache.zookeeper.server.DataNode; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; import org.junit.Assert; @@ -60,6 +62,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { MainThread[] mt = null; ZooKeeper[] zk = null; + int[] clientPorts = null; int leaderId; int followerA; @@ -67,7 +70,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { public void setup() throws Exception { LOG.info("Start up a 3 server quorum"); final int ENSEMBLE_SERVERS = 3; - final int clientPorts[] = new int[ENSEMBLE_SERVERS]; + clientPorts = new int[ENSEMBLE_SERVERS]; StringBuilder sb = new StringBuilder(); String server; @@ -259,6 +262,55 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { Assert.assertEquals(stat1, stat2); } + @Test + public void testGlobalSessionConsistency() throws Exception { + LOG.info("Hook to catch the commitSession event on followerA"); + CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main; + final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer(); + + // only take snapshot for the next global session we're going to create + final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true); + followerAMain.setCommitSessionListener(new CommitSessionListener() { + @Override + public void process(long sessionId) { + LOG.info("Take snapshot"); + if (shouldTakeSnapshot.getAndSet(false)) { + zkServer.takeSnapshot(true); + } + } + }); + + LOG.info("Create a global session"); + ZooKeeper globalClient = new ZooKeeper( + "127.0.0.1:" + clientPorts[followerA], + ClientBase.CONNECTION_TIMEOUT, this); + QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED); + + LOG.info("Restart followerA to load the data from disk"); + mt[followerA].shutdown(); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING); + + mt[followerA].start(); + QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED); + + LOG.info("Make sure the global sessions are consistent with leader"); + + Map globalSessionsOnLeader = + mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); + if (mt[followerA].main.quorumPeer == null) { + LOG.info("quorumPeer is null"); + } + if (mt[followerA].main.quorumPeer.getZkDb() == null) { + LOG.info("zkDb is null"); + } + Map globalSessionsOnFollowerA = + mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts(); + LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), + globalSessionsOnFollowerA.keySet()); + Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll( + globalSessionsOnLeader.keySet())); + } + private void createEmptyNode(ZooKeeper zk, String path) throws Exception { zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @@ -310,7 +362,17 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { public void nodeSerialized(String path); } + static interface CommitSessionListener { + public void process(long sessionId); + } + static class CustomizedQPMain extends TestQPMain { + CommitSessionListener commitSessionListener; + + public void setCommitSessionListener(CommitSessionListener listener) { + this.commitSessionListener = listener; + } + @Override protected QuorumPeer getQuorumPeer() throws SaslException { return new QuorumPeer() { @@ -323,6 +385,31 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { } }); } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) + throws IOException { + return new Follower(this, new FollowerZooKeeperServer( + logFactory, this, this.getZkDb()) { + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, self.getId(), + self.areLocalSessionsEnabled(), + getZooKeeperServerListener()) { + + public synchronized boolean commitSession( + long sessionId, int sessionTimeout) { + if (commitSessionListener != null) { + commitSessionListener.process(sessionId); + } + return super.commitSession(sessionId, sessionTimeout); + } + }; + } + }); + } }; } }