From commits-return-8592-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Thu Nov 12 15:08:28 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id D892718066B for ; Thu, 12 Nov 2020 16:08:27 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 477FA4983C for ; Thu, 12 Nov 2020 15:08:27 +0000 (UTC) Received: (qmail 38182 invoked by uid 500); 12 Nov 2020 15:08:27 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 38170 invoked by uid 99); 12 Nov 2020 15:08:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Nov 2020 15:08:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D71F98E7BE; Thu, 12 Nov 2020 15:08:26 +0000 (UTC) Date: Thu, 12 Nov 2020 15:08:26 +0000 To: "commits@zookeeper.apache.org" Subject: [zookeeper] branch branch-3.5 updated: ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <160519370668.23959.7568853092902827432@gitbox.apache.org> From: nkalmar@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: zookeeper X-Git-Refname: refs/heads/branch-3.5 X-Git-Reftype: branch X-Git-Oldrev: db9fed4c95e4828389b30c0f6e94182db26ff99b X-Git-Newrev: 31032cf9818559da3500607378525b631aae02ab X-Git-Rev: 31032cf9818559da3500607378525b631aae02ab X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. nkalmar pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/zookeeper.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 31032cf ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log 31032cf is described below commit 31032cf9818559da3500607378525b631aae02ab Author: Michael Han AuthorDate: Thu Nov 12 16:03:25 2020 +0100 ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log Problem: During DIFF sync, followers will persist the proposals from leader only after receiving the UPTODATE message. Leader will start serve client requests once receive a quorum of NEWLEADER ACK from followers, and NEWLEADER ACKs are sent from followers before receiving UPTODATE. So it's possible that when leader starting serving, followers might not persistent all of the DIFF sync proposals from leader. If at a certain point in time (e.g. once receiving UPTODATE from leader) all quorum ser [...] Solution: Make sure follower to persist all DIFF proposals before they ACK NEWLEADER. When leader receives a quorum of NEWLEADER ACK, it's guaranteed that all DIFF sync proposals are quorum persistent and can survive after the recovery. Author: Michael Han Reviewers: Enrico Olivelli , Norbert Kalmar Closes #1445 from hanm/ZOOKEEPER-3911 (cherry picked from commit b978dfb949e4ac4d703e956c6ef811415c831bcd) Signed-off-by: Norbert Kalmar --- .../apache/zookeeper/server/ZooKeeperServer.java | 16 +- .../apache/zookeeper/server/quorum/Learner.java | 19 +- .../server/quorum/DIFFSyncConsistencyTest.java | 294 +++++++++++++++++++++ 3 files changed, 325 insertions(+), 4 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 4e4feaf..23591c2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -467,6 +467,19 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public synchronized void startup() { + startupWithServerState(State.RUNNING); + } + + public synchronized void startupWithoutServing() { + startupWithServerState(State.INITIAL); + } + + public synchronized void startServing() { + setState(State.RUNNING); + notifyAll(); + } + + private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } @@ -475,7 +488,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { registerJMX(); - setState(State.RUNNING); + setState(state); + notifyAll(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 2da5add..48d4962 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -559,8 +559,22 @@ public class Learner { } self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotCommitted) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotCommitted.clear(); + } + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } @@ -568,8 +582,7 @@ public class Learner { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); - sock.setSoTimeout(self.tickTime * self.syncLimit); - zk.startup(); + zk.startServing(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java new file mode 100644 index 0000000..9b9ea55 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Map; +import javax.security.sasl.SaslException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { + + private static int SERVER_COUNT = 3; + private MainThread[] mt = new MainThread[SERVER_COUNT]; + + @Test + @Timeout(value = 120) + public void testInconsistentDueToUncommittedLog() throws Exception { + final int LEADER_TIMEOUT_MS = 10_000; + final int[] clientPorts = new int[SERVER_COUNT]; + + StringBuilder sb = new StringBuilder(); + String server; + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), + "waiting for server " + i + " being up"); + } + + int leader = findLeader(mt); + CountdownWatcher watch = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Map outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; + // Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other + // followers are offline. + int previousTick = mt[leader].main.quorumPeer.tickTime; + mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + // Let the previous tick on the leader exhaust itself so the new tick time takes effect + Thread.sleep(previousTick); + + LOG.info("LEADER ELECTED {}", leader); + + // Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower. + // In other words, we want to make sure the followers get the proposal later through DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].shutdown(); + } + } + + // Send a create request to old leader and make sure it's synced to disk. + try { + zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals. + assertTrue(outstanding.size() > 0); + Proposal p = findProposalOfType(outstanding, OpCode.create); + LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding); + assertNotNull(p, "Old leader doesn't have 'create' proposal"); + + // Make sure leader sync the proposal to disk. + int sleepTime = 0; + Long longLeader = (long) leader; + while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) { + if (sleepTime > 2000) { + fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } + + // Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE + // message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can + // deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the + // quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals + // from DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true); + LOG.info("Follower {} started.", i); + } + + // Verify leader can see it. The fact that leader can see it implies that + // leader should, at this point in time, get a quorum of ACK of NEWLEADER + // from two followers so leader can start serving requests; this also implies + // that DIFF sync from leader to followers are finished at this point in time. + // We then verify later that followers should have the same view after we shutdown + // this leader, otherwise it's a violation of ZAB / sequential consistency. + int c = 0; + while (c < 100) { + ++c; + try { + Stat stat = zk.exists("/zk" + leader, false); + assertNotNull(stat, "server " + leader + " should have /zk"); + break; + } catch (KeeperException.ConnectionLossException e) { + + } + Thread.sleep(100); + } + + // Shutdown all servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + waitForOne(zk, States.CONNECTING); + + // Now restart all servers except the old leader. Only old leader has the transaction sync to disk. + // The old followers only had in memory view of the transaction, and they didn't have a chance + // to sync to disk because we made them fail at UPTODATE. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false); + LOG.info("Follower {} started again.", i); + } + + int newLeader = findLeader(mt); + assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!"); + + // This simulates the case where clients connected to the old leader had a view of the data + // "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX"). + // This inconsistent view of the quorum exposed from leaders is a violation of ZAB. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != newLeader) { + continue; + } + zk.close(); + zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + Stat val = zk.exists("/zk" + leader, false); + assertNotNull(val, "Data inconsistency detected! " + + "Server " + i + " should have a view of /zk" + leader + "!"); + } + + zk.close(); + } + + @AfterEach + public void tearDown() { + for (int i = 0; i < mt.length; i++) { + try { + mt[i].shutdown(); + } catch (InterruptedException e) { + LOG.warn("Quorum Peer interrupted while shutting it down", e); + } + } + } + + static class CustomQuorumPeer extends QuorumPeer { + + private volatile boolean injectError = false; + + public CustomQuorumPeer() throws SaslException { + + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { + + @Override + void readPacket(QuorumPacket pp) throws IOException { + /** + * In real scenario got SocketTimeoutException while reading + * the packet from leader because of network problem, but + * here throwing SocketTimeoutException based on whether + * error is injected or not + */ + super.readPacket(pp); + if (injectError && pp.getType() == Leader.UPTODATE) { + String type = LearnerHandler.packetToString(pp); + throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + + type); + } + } + + }; + } + + public void setInjectError(boolean injectError) { + this.injectError = injectError; + } + + } + + static class MockTestQPMain extends TestQPMain { + + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new CustomQuorumPeer(); + } + + } + + private Proposal findProposalOfType(Map proposals, int type) { + for (Proposal proposal : proposals.values()) { + if (proposal.request.getHdr().getType() == type) { + return proposal; + } + } + return null; + } + + private int findLeader(MainThread[] mt) { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } +}