From commits-return-6724-apmail-zookeeper-commits-archive=zookeeper.apache.org@zookeeper.apache.org Fri Aug 3 16:52:58 2018 Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3615518C0A for ; Fri, 3 Aug 2018 16:52:58 +0000 (UTC) Received: (qmail 433 invoked by uid 500); 3 Aug 2018 16:52:58 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 367 invoked by uid 500); 3 Aug 2018 16:52:58 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 356 invoked by uid 99); 3 Aug 2018 16:52:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Aug 2018 16:52:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2664DFC7C; Fri, 3 Aug 2018 16:52:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: breed@apache.org To: commits@zookeeper.apache.org Message-Id: <02f239246007457abf86ee360611283e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-3104: Fix data inconsistency due to NEWLEADER being sent too early Date: Fri, 3 Aug 2018 16:52:57 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/master 7cf8035c3 -> 148c2cd6b ZOOKEEPER-3104: Fix data inconsistency due to NEWLEADER being sent too early Author: Fangmin Lyu Reviewers: Benjamin Reed , Norbert Kalmar Closes #583 from lvfangmin/ZOOKEEPER-3104 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/148c2cd6 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/148c2cd6 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/148c2cd6 Branch: refs/heads/master Commit: 148c2cd6ba73e66b1879a2e10ecda4ce4e0e2c7b Parents: 7cf8035 Author: Fangmin Lyu Authored: Fri Aug 3 09:52:08 2018 -0700 Committer: Benjamin Reed Committed: Fri Aug 3 09:52:08 2018 -0700 ---------------------------------------------------------------------- .../apache/zookeeper/server/quorum/Leader.java | 205 +++++++------ .../zookeeper/server/quorum/LearnerHandler.java | 71 +++-- .../server/quorum/QuorumPeerMainTest.java | 301 ++++++++++++++++++- 3 files changed, 443 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/148c2cd6/src/java/main/org/apache/zookeeper/server/quorum/Leader.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java index 513500a..1d96165 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java @@ -88,8 +88,7 @@ public class Leader { LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout); } - private final LearnerSnapshotThrottler learnerSnapshotThrottler = - new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); + private final LearnerSnapshotThrottler learnerSnapshotThrottler; final LeaderZooKeeperServer zk; @@ -111,6 +110,12 @@ public class Leader { return proposalStats; } + public LearnerSnapshotThrottler createLearnerSnapshotThrottler( + int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) { + return new LearnerSnapshotThrottler( + maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); + } + /** * Returns a copy of the current learner snapshot */ @@ -207,7 +212,7 @@ public class Leader { /** * Returns true if a quorum in qv is connected and synced with the leader * and false otherwise - * + * * @param qv, a QuorumVerifier */ public boolean isQuorumSynced(QuorumVerifier qv) { @@ -223,7 +228,7 @@ public class Leader { } return qv.containsQuorum(ids); } - + private final ServerSocket ss; Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { @@ -248,6 +253,8 @@ public class Leader { throw e; } this.zk = zk; + this.learnerSnapshotThrottler = createLearnerSnapshotThrottler( + maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); } /** @@ -342,17 +349,17 @@ public class Leader { * This message type informs observers of a committed proposal. */ final static int INFORM = 8; - + /** * Similar to COMMIT, only for a reconfig operation. */ final static int COMMITANDACTIVATE = 9; - + /** * Similar to INFORM, only for a reconfig operation. */ final static int INFORMANDACTIVATE = 19; - + final ConcurrentMap outstandingProposals = new ConcurrentHashMap(); private final ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue(); @@ -415,9 +422,9 @@ public class Leader { long epoch = -1; boolean waitingForNewEpoch = true; - // when a reconfig occurs where the leader is removed or becomes an observer, + // when a reconfig occurs where the leader is removed or becomes an observer, // it does not commit ops after committing the reconfig - boolean allowedToCommit = true; + boolean allowedToCommit = true; /** * This method is main function that is called to lead * @@ -467,20 +474,20 @@ public class Leader { QuorumVerifier curQV = self.getQuorumVerifier(); if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) { // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly - // specified by the user; the lack of version in a config file is interpreted as version=0). + // specified by the user; the lack of version in a config file is interpreted as version=0). // As soon as a config is established we would like to increase its version so that it // takes presedence over other initial configs that were not established (such as a config - // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). + // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE, - // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, - // and there's still no agreement on the new version that we'd like to use. Instead, we use + // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, + // and there's still no agreement on the new version that we'd like to use. Instead, we use // lastSeenQuorumVerifier which is being sent with NEWLEADER message - // so its a good way to let followers know about the new version. (The original reason for sending + // so its a good way to let followers know about the new version. (The original reason for sending // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs - // that it finds before starting to propose operations. Here we're reusing the same code path for + // that it finds before starting to propose operations. Here we're reusing the same code path for // reaching consensus on the new version number.) - + // It is important that this is done before the leader executes waitForEpochAck, // so before LearnerHandlers return from their waitForEpochAck // hence before they construct the NEWLEADER message containing @@ -488,24 +495,24 @@ public class Leader { try { QuorumVerifier newQV = self.configFromString(curQV.toString()); newQV.setVersion(zk.getZxid()); - self.setLastSeenQuorumVerifier(newQV, true); + self.setLastSeenQuorumVerifier(newQV, true); } catch (Exception e) { throw new IOException(e); } } - + newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){ newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } - + // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged - + waitForEpochAck(self.getId(), leaderStateSummary); - self.setCurrentEpoch(epoch); - + self.setCurrentEpoch(epoch); + try { waitForNewLeaderAck(self.getId(), zk.getZxid()); } catch (InterruptedException e) { @@ -517,14 +524,14 @@ public class Leader { if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){ followerSet.add(f.getSid()); } - } + } boolean initTicksShouldBeIncreased = true; for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) { if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) { initTicksShouldBeIncreased = false; break; } - } + } if (initTicksShouldBeIncreased) { LOG.warn("Enough followers present. "+ "Perhaps the initTicks need to be increased."); @@ -533,7 +540,7 @@ public class Leader { } startZkServer(); - + /** * WARNING: do not use this for anything other than QA testing * on a real cluster. Specifically to enable verification that quorum @@ -679,39 +686,39 @@ public class Leader { /** In a reconfig operation, this method attempts to find the best leader for next configuration. * If the current leader is a voter in the next configuartion, then it remains the leader. - * Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as + * Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as * up-to-date as possible, i.e., acked as many outstanding proposals as possible. - * + * * @param reconfigProposal * @param zxid of the reconfigProposal * @return server if of the designated leader */ - + private long getDesignatedLeader(Proposal reconfigProposal, long zxid) { //new configuration - Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1); - - //check if I'm in the new configuration with the same quorum address - - // if so, I'll remain the leader - if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) && - newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){ + Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1); + + //check if I'm in the new configuration with the same quorum address - + // if so, I'll remain the leader + if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) && + newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){ return self.getId(); } - // start with an initial set of candidates that are voters from new config that - // acknowledged the reconfig op (there must be a quorum). Choose one of them as + // start with an initial set of candidates that are voters from new config that + // acknowledged the reconfig op (there must be a quorum). Choose one of them as // current leader candidate HashSet candidates = new HashSet(newQVAcksetPair.getAckset()); candidates.remove(self.getId()); // if we're here, I shouldn't be the leader long curCandidate = candidates.iterator().next(); - + //go over outstanding ops in order, and try to find a candidate that acked the most ops. //this way it will be the most up-to-date and we'll minimize the number of ops that get dropped - + long curZxid = zxid + 1; Proposal p = outstandingProposals.get(curZxid); - - while (p!=null && !candidates.isEmpty()) { - for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){ + + while (p!=null && !candidates.isEmpty()) { + for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){ //reduce the set of candidates to those that acknowledged p candidates.retainAll(qvAckset.getAckset()); //no candidate acked p, return the best candidate found so far @@ -719,18 +726,18 @@ public class Leader { //update the current candidate, and if it is the only one remaining, return it curCandidate = candidates.iterator().next(); if (candidates.size() == 1) return curCandidate; - } + } curZxid++; p = outstandingProposals.get(curZxid); } - + return curCandidate; } /** * @return True if committed, otherwise false. **/ - synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { + synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { // make sure that ops are committed in order. With reconfigurations it is now possible // that different operations wait for different sets of acks, and we still want to enforce // that they are committed in order. Currently we only permit one outstanding reconfiguration @@ -739,50 +746,50 @@ public class Leader { // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen. if (outstandingProposals.containsKey(zxid - 1)) return false; - + // in order to be committed, a proposal must be accepted by a quorum. // // getting a quorum from all necessary configurations. if (!p.hasAllQuorums()) { - return false; + return false; } - + // commit proposals in order - if (zxid != lastCommitted+1) { + if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr + " not first!"); LOG.warn("First is " + (lastCommitted+1)); - } - + } + outstandingProposals.remove(zxid); - + if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null: " + p); - } else if (p.request.getHdr().getType() == OpCode.reconfig) { - LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); - - //if this server is voter in new config with the same quorum address, + } else if (p.request.getHdr().getType() == OpCode.reconfig) { + LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); + + //if this server is voter in new config with the same quorum address, //then it will remain the leader //otherwise an up-to-date follower will be designated as leader. This saves - //leader election time, unless the designated leader fails + //leader election time, unless the designated leader fails Long designatedLeader = getDesignatedLeader(p, zxid); //LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); - + self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } - - // we're sending the designated leader, and if the leader is changing the followers are - // responsible for closing the connection - this way we are sure that at least a majority of them + + // we're sending the designated leader, and if the leader is changing the followers are + // responsible for closing the connection - this way we are sure that at least a majority of them // receive the commit message. commitAndActivate(zxid, designatedLeader); informAndActivate(p, designatedLeader); @@ -795,12 +802,12 @@ public class Leader { if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); - } - } - - return true; + } + } + + return true; } - + /** * Keep a count of acks that are received by the leader for a particular * proposal @@ -809,9 +816,9 @@ public class Leader { * @param sid, the id of the server that sent the ack * @param followerAddr */ - synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { - if (!allowedToCommit) return; // last op committed was a leader change - from now on - // the new leader should commit + synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { + if (!allowedToCommit) return; // last op committed was a leader change - from now on + // the new leader should commit if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { @@ -821,7 +828,7 @@ public class Leader { } LOG.trace("outstanding proposals all"); } - + if ((zxid & 0xffffffffL) == 0) { /* * We no longer process NEWLEADER ack with this method. However, @@ -830,8 +837,8 @@ public class Leader { */ return; } - - + + if (outstandingProposals.size() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("outstanding is 0"); @@ -852,13 +859,13 @@ public class Leader { Long.toHexString(zxid), followerAddr); return; } - - p.addAck(sid); + + p.addAck(sid); /*if (LOG.isDebugEnabled()) { LOG.debug("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size()); }*/ - + boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed, @@ -875,11 +882,11 @@ public class Leader { while (allowedToCommit && hasCommitted && p!=null){ curZxid++; p = outstandingProposals.get(curZxid); - if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); + if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); } } } - + static class ToBeAppliedRequestProcessor implements RequestProcessor { private final RequestProcessor next; @@ -988,11 +995,11 @@ public class Leader { synchronized(this){ lastCommitted = zxid; } - + byte data[] = new byte[8]; - ByteBuffer buffer = ByteBuffer.wrap(data); + ByteBuffer buffer = ByteBuffer.wrap(data); buffer.putLong(designatedLeader); - + QuorumPacket qp = new QuorumPacket(Leader.COMMITANDACTIVATE, zxid, data, null); sendPacket(qp); } @@ -1006,17 +1013,17 @@ public class Leader { sendObserverPacket(qp); } - + /** * Create an inform&activate packet and send it to all observers. */ public void informAndActivate(Proposal proposal, long designatedLeader) { byte[] proposalData = proposal.packet.getData(); byte[] data = new byte[proposalData.length + 8]; - ByteBuffer buffer = ByteBuffer.wrap(data); + ByteBuffer buffer = ByteBuffer.wrap(data); buffer.putLong(designatedLeader); buffer.put(proposalData); - + QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null); sendObserverPacket(qp); } @@ -1064,19 +1071,19 @@ public class Leader { Proposal p = new Proposal(); p.packet = pp; - p.request = request; - + p.request = request; + synchronized(this) { p.addQuorumVerifier(self.getQuorumVerifier()); - + if (request.getHdr().getType() == OpCode.reconfig){ - self.setLastSeenQuorumVerifier(request.qv, true); + self.setLastSeenQuorumVerifier(request.qv, true); } - + if (self.getQuorumVerifier().getVersion() sidSet) { StringBuilder sids = new StringBuilder(); @@ -1264,7 +1271,7 @@ public class Leader { + newLeaderProposal.ackSetsToString() + " ]; starting up and setting last processed zxid: 0x{}", Long.toHexString(zk.getZxid())); - + /* * ZOOKEEPER-1324. the leader sends the new config it must complete * to others inside a NEWLEADER message (see LearnerHandler where @@ -1273,20 +1280,20 @@ public class Leader { * config to itself. */ QuorumVerifier newQV = self.getLastSeenQuorumVerifier(); - - Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid()); + + Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid()); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; } - + zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and * send leader election notifications to the ensemble. - * + * * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 */ self.updateElectionVote(getEpoch()); @@ -1378,7 +1385,7 @@ public class Leader { case COMMIT: return "COMMIT"; case COMMITANDACTIVATE: - return "COMMITANDACTIVATE"; + return "COMMITANDACTIVATE"; case PING: return "PING"; case REVALIDATE: http://git-wip-us.apache.org/repos/asf/zookeeper/blob/148c2cd6/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java index 023d6a6..9b65246 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -74,7 +74,7 @@ public class LearnerHandler extends ZooKeeperThread { * on the syncLimit. Once the deadline is past this learner should * be considered no longer "sync'd" with the leader. */ volatile long tickOfNextAckDeadline; - + /** * ZooKeeper server identifier of this learner */ @@ -159,7 +159,7 @@ public class LearnerHandler extends ZooKeeperThread { private final BufferedInputStream bufferedInput; private BufferedOutputStream bufferedOutput; - + /** * Keep track of whether we have started send packets thread */ @@ -176,7 +176,7 @@ public class LearnerHandler extends ZooKeeperThread { * that we are going to blast it to the learner */ private boolean needOpPacket = true; - + /** * Last zxid sent to the learner as part of synchronization */ @@ -188,6 +188,11 @@ public class LearnerHandler extends ZooKeeperThread { this.leader = leader; this.bufferedInput = bufferedInput; + if (Boolean.getBoolean(FORCE_SNAP_SYNC)) { + forceSnapSync = true; + LOG.info("Forcing snapshot sync is enabled"); + } + try { if (leader.self != null) { leader.self.authServer.authenticate(sock, @@ -403,7 +408,7 @@ public class LearnerHandler extends ZooKeeperThread { } else { LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion())); } - + if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } @@ -440,31 +445,15 @@ public class LearnerHandler extends ZooKeeperThread { leader.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); - + // Take any necessary action if we need to send TRUNC or DIFF // startForwarding() will be called in all cases boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); - - LOG.debug("Sending NEWLEADER message to " + sid); - // the version of this quorumVerifier will be set by leader.lead() in case - // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if - // we got here, so the version was set - if (getVersion() < 0x10000) { - QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, - newLeaderZxid, null, null); - oa.writeRecord(newLeaderQP, "packet"); - } else { - QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, - newLeaderZxid, leader.self.getLastSeenQuorumVerifier() - .toString().getBytes(), null); - queuedPackets.add(newLeaderQP); - } - bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (needSnap) { boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER; - LearnerSnapshot snapshot = + LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle); try { long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); @@ -472,11 +461,11 @@ public class LearnerHandler extends ZooKeeperThread { bufferedOutput.flush(); LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, " - + "send zxid of db as 0x{}, {} concurrent snapshots, " + + "send zxid of db as 0x{}, {} concurrent snapshots, " + "snapshot was {} from throttle", - Long.toHexString(peerLastZxid), + Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid), - Long.toHexString(zxidToSend), + Long.toHexString(zxidToSend), snapshot.getConcurrentSnapshotNumber(), snapshot.isEssential() ? "exempt" : "not exempt"); // Dump data to peer @@ -488,9 +477,25 @@ public class LearnerHandler extends ZooKeeperThread { } } + LOG.debug("Sending NEWLEADER message to " + sid); + // the version of this quorumVerifier will be set by leader.lead() in case + // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if + // we got here, so the version was set + if (getVersion() < 0x10000) { + QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, + newLeaderZxid, null, null); + oa.writeRecord(newLeaderQP, "packet"); + } else { + QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, + newLeaderZxid, leader.self.getLastSeenQuorumVerifier() + .toString().getBytes(), null); + queuedPackets.add(newLeaderQP); + } + bufferedOutput.flush(); + // Start thread that blast packets in the queue to learner startSendingPackets(); - + /* * Have to wait for the first ACK, wait until * the leader is ready, and only then we can @@ -505,12 +510,12 @@ public class LearnerHandler extends ZooKeeperThread { } if(LOG.isDebugEnabled()){ - LOG.debug("Received NEWLEADER-ACK message from " + sid); + LOG.debug("Received NEWLEADER-ACK message from " + sid); } leader.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start(); - + // now that the ack has been processed expect the syncLimit sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); @@ -526,7 +531,7 @@ public class LearnerHandler extends ZooKeeperThread { // so we need to mark when the peer can actually start // using the data // - LOG.debug("Sending UPTODATE message to " + sid); + LOG.debug("Sending UPTODATE message to " + sid); queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) { @@ -920,8 +925,8 @@ public class LearnerHandler extends ZooKeeperThread { } return queuedZxid; - } - + } + public void shutdown() { // Send the packet of death try { @@ -975,7 +980,7 @@ public class LearnerHandler extends ZooKeeperThread { QuorumPacket packet = new QuorumPacket(type, zxid, null, null); queuePacket(packet); } - + void queuePacket(QuorumPacket p) { queuedPackets.add(p); } @@ -984,7 +989,7 @@ public class LearnerHandler extends ZooKeeperThread { return isAlive() && leader.self.tick.get() <= tickOfNextAckDeadline; } - + /** * For testing, return packet queue * @return http://git-wip-us.apache.org/repos/asf/zookeeper/blob/148c2cd6/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index dff7cfc..bbdd56c 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -36,14 +36,18 @@ import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; +import javax.security.sasl.SaslException; import org.apache.commons.io.FileUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.WriterAppender; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; @@ -53,6 +57,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.test.ClientBase; @@ -481,7 +486,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { Thread.sleep(1000); } } - + private void logStates(ZooKeeper[] zks) { StringBuilder sbBuilder = new StringBuilder("Connection States: {"); for (int i = 0; i < zks.length; i++) { @@ -1205,7 +1210,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } } } - + private Proposal findProposalOfType(Map proposals, int type) { for (Proposal proposal : proposals.values()) { if (proposal.request.getHdr().getType() == type) { @@ -1214,4 +1219,296 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } return null; } + + /** + * Currently, in SNAP sync, the leader will start queuing the + * proposal/commits and the NEWLEADER packet before sending + * over the snapshot over wire. So it's possible that the zxid + * associated with the snapshot might be higher than all the + * packets queued before NEWLEADER. + * + * When the follower received the snapshot, it will apply all + * the txns queued before NEWLEADER, which may not cover all + * the txns up to the zxid in the snapshot. After that, it + * will write the snapshot out to disk with the zxid associated + * with the snapshot. In case the server crashed after writing + * this out, when loading the data from disk, it will use zxid + * of the snapshot file to sync with leader, and it could cause + * data inconsistent, because we only replayed partial of the + * historical data during previous syncing. + * + * This test case is going to cover and simulate this scenario + * and make sure there is no data inconsistency issue after fix. + */ + @Test + public void testInconsistentDueToNewLeaderOrder() throws Exception { + + // 1. set up an ensemble with 3 servers + final int ENSEMBLE_SERVERS = 3; + final int clientPorts[] = new int[ENSEMBLE_SERVERS]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + // start servers + MainThread[] mt = new MainThread[ENSEMBLE_SERVERS]; + ZooKeeper zk[] = new ZooKeeper[ENSEMBLE_SERVERS]; + Context contexts[] = new Context[ENSEMBLE_SERVERS]; + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + final Context context = new Context(); + contexts[i] = context; + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, + false) { + @Override + public TestQPMain getTestQPMain() { + return new CustomizedQPMain(context); + } + }; + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], + ClientBase.CONNECTION_TIMEOUT, this); + } + waitForAll(zk, States.CONNECTED); + LOG.info("all servers started"); + + String nodePath = "/testInconsistentDueToNewLeader"; + + int leaderId = -1; + int followerA = -1; + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + if (mt[i].main.quorumPeer.leader != null) { + leaderId = i; + } else if (followerA == -1) { + followerA = i; + } + } + LOG.info("shutdown follower {}", followerA); + mt[followerA].shutdown(); + waitForOne(zk[followerA], States.CONNECTING); + + try { + // 2. set force snapshot to be true + LOG.info("force snapshot sync"); + System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true"); + + // 3. create a node + String initialValue = "1"; + final ZooKeeper leaderZk = zk[leaderId]; + leaderZk.create(nodePath, initialValue.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOG.info("created node {} with value {}", nodePath, initialValue); + + CustomQuorumPeer leaderQuorumPeer = + (CustomQuorumPeer) mt[leaderId].main.quorumPeer; + + // 4. on the customized leader catch the startForwarding call + // (without synchronized), set the node to value v1, then + // call the super.startForwarding to generate the ongoing + // txn proposal and commit for v1 value update + leaderQuorumPeer.setStartForwardingListener( + new StartForwardingListener() { + @Override + public void start() { + if (!Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) { + return; + } + final String value = "2"; + LOG.info("start forwarding, set {} to {}", nodePath, value); + // use async, otherwise it will block the logLock in + // ZKDatabase and the setData request will timeout + try { + leaderZk.setData(nodePath, value.getBytes(), -1, + new AsyncCallback.StatCallback() { + public void processResult(int rc, String path, + Object ctx, Stat stat) {} + }, null); + // wait for the setData txn being populated + Thread.sleep(1000); + } catch (Exception e) { + LOG.error("error when set {} to {}", nodePath, value, e); + } + } + }); + + // 5. on the customized leader catch the beginSnapshot call in + // LearnerSnapshotThrottler to set the node to value v2, + // wait it hit data tree + leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() { + @Override + public void start() { + String value = "3"; + LOG.info("before sending snapshot, set {} to {}", + nodePath, value); + try { + leaderZk.setData(nodePath, value.getBytes(), -1); + LOG.info("successfully set {} to {}", nodePath, value); + } catch (Exception e) { + LOG.error("error when set {} to {}, {}", nodePath, value, e); + } + } + }); + + // 6. exit follower A after taking snapshot + CustomQuorumPeer followerAQuorumPeer = + ((CustomQuorumPeer) mt[followerA].main.quorumPeer); + LOG.info("set exit when ack new leader packet on {}", followerA); + contexts[followerA].exitWhenAckNewLeader = true; + CountDownLatch latch = new CountDownLatch(1); + final MainThread followerAMT = mt[followerA]; + contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback() { + @Override + public void start() { + try { + latch.countDown(); + followerAMT.shutdown(); + } catch (Exception e) {} + } + }; + + // 7. start follower A to do snapshot sync + LOG.info("starting follower {}", followerA); + mt[followerA].start(); + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); + + // 8. now we have invalid data on disk, let's load it and verify + LOG.info("disable exit when ack new leader packet on {}", followerA); + System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false"); + contexts[followerA].exitWhenAckNewLeader = true; + contexts[followerA].newLeaderAckCallback = null; + + LOG.info("restarting follower {}", followerA); + mt[followerA].start(); + zk[followerA].close(); + + zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], + ClientBase.CONNECTION_TIMEOUT, this); + + // 9. start follower A, after it's in broadcast state, make sure + // the node value is same as what we have on leader + waitForOne(zk[followerA], States.CONNECTED); + Assert.assertEquals( + new String(zk[followerA].getData(nodePath, null, null)), + new String(zk[leaderId].getData(nodePath, null, null)) + ); + } finally { + System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC); + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + mt[i].shutdown(); + zk[i].close(); + } + } + } + + static class Context { + boolean quitFollowing = false; + boolean exitWhenAckNewLeader = false; + NewLeaderAckCallback newLeaderAckCallback = null; + } + + static interface NewLeaderAckCallback { + public void start(); + } + + static interface StartForwardingListener { + public void start(); + } + + static interface BeginSnapshotListener { + public void start(); + } + + static class CustomizedQPMain extends TestQPMain { + + private Context context; + + public CustomizedQPMain(Context context) { + this.context = context; + } + + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new CustomQuorumPeer(context); + } + } + + static class CustomQuorumPeer extends QuorumPeer { + private Context context; + + private StartForwardingListener startForwardingListener; + private BeginSnapshotListener beginSnapshotListener; + + public CustomQuorumPeer(Context context) + throws SaslException { + this.context = context; + } + + public void setStartForwardingListener( + StartForwardingListener startForwardingListener) { + this.startForwardingListener = startForwardingListener; + } + + public void setBeginSnapshotListener( + BeginSnapshotListener beginSnapshotListener) { + this.beginSnapshotListener = beginSnapshotListener; + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) + throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, + this, this.getZkDb())) { + + @Override + void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (pp != null && pp.getType() == Leader.ACK + && context.exitWhenAckNewLeader) { + if (context.newLeaderAckCallback != null) { + context.newLeaderAckCallback.start(); + } + } + super.writePacket(pp, flush); + } + }; + } + + @Override + protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { + return new Leader(this, new LeaderZooKeeperServer(logFactory, + this, this.getZkDb())) { + @Override + public long startForwarding(LearnerHandler handler, + long lastSeenZxid) { + if (startForwardingListener != null) { + startForwardingListener.start(); + } + return super.startForwarding(handler, lastSeenZxid); + } + + @Override + public LearnerSnapshotThrottler createLearnerSnapshotThrottler( + int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) { + return new LearnerSnapshotThrottler( + maxConcurrentSnapshots, maxConcurrentSnapshotTimeout) { + + @Override + public LearnerSnapshot beginSnapshot(boolean essential) + throws SnapshotThrottleException, InterruptedException { + if (beginSnapshotListener != null) { + beginSnapshotListener.start(); + } + return super.beginSnapshot(essential); + } + }; + } + }; + } + } }