Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 874846C62 for ; Tue, 14 Jun 2011 05:14:41 +0000 (UTC) Received: (qmail 86642 invoked by uid 500); 14 Jun 2011 05:14:41 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 86584 invoked by uid 500); 14 Jun 2011 05:14:39 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 86576 invoked by uid 99); 14 Jun 2011 05:14:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2011 05:14:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2011 05:14:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 275E023888DD; Tue, 14 Jun 2011 05:14:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1135382 - in /zookeeper/trunk: ./ src/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server/util/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/... Date: Tue, 14 Jun 2011 05:14:06 -0000 To: commits@zookeeper.apache.org From: breed@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110614051407.275E023888DD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: breed Date: Tue Jun 14 05:14:05 2011 New Revision: 1135382 URL: http://svn.apache.org/viewvc?rev=1135382&view=rev Log: ZOOKEEPER-335. zookeeper servers should commit the new leader txn to their logs. ZOOKEEPER-1081. modify leader/follower code to correctly deal with new leader ZOOKEEPER-1082. modify leader election to correctly take into account current epoch Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java zookeeper/trunk/src/zookeeper.jute Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Tue Jun 14 05:14:05 2011 @@ -223,6 +223,12 @@ BUGFIXES: ZOOKEEPER-1086. zookeeper test jar has non mavenised dependency. (Ivan Kelly via michim) + ZOOKEEPER-335. zookeeper servers should commit the new leader txn to their logs. (breed) + + ZOOKEEPER-1081. modify leader/follower code to correctly deal with new leader (breed) + + ZOOKEEPER-1082. modify leader election to correctly take into account current epoch (fpj via breed) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Jun 14 05:14:05 2011 @@ -18,10 +18,19 @@ package org.apache.zookeeper.server; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -57,6 +66,7 @@ import org.apache.zookeeper.server.auth. import org.apache.zookeeper.server.auth.ProviderRegistry; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; +import org.apache.zookeeper.server.util.ZxidUtils; /** * This class implements a simple standalone ZooKeeperServer. It sets up the @@ -253,6 +263,7 @@ public class ZooKeeperServer implements // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } + // Make a clean snapshot takeSnapshot(); } @@ -884,4 +895,6 @@ public class ZooKeeperServer implements } cnxn.incrOutstandingRequests(h); } + + } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Tue Jun 14 05:14:05 2011 @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; @@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quoru import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.server.util.ZxidUtils; /** @@ -95,7 +97,7 @@ public class FastLeaderElection implemen /* * Epoch */ - long epoch; + long electionEpoch; /* * current state of sender @@ -106,6 +108,11 @@ public class FastLeaderElection implemen * Address of sender */ long sid; + + /* + * epoch of the proposed leader + */ + long peerEpoch; } /** @@ -119,15 +126,17 @@ public class FastLeaderElection implemen ToSend(mType type, long leader, long zxid, - long epoch, + long electionEpoch, ServerState state, - long sid) { + long sid, + long peerEpoch) { this.leader = leader; this.zxid = zxid; - this.epoch = epoch; + this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; + this.peerEpoch = peerEpoch; } /* @@ -143,7 +152,7 @@ public class FastLeaderElection implemen /* * Epoch */ - long epoch; + long electionEpoch; /* * Current state; @@ -154,6 +163,11 @@ public class FastLeaderElection implemen * Address of recipient */ long sid; + + /* + * Leader epoch + */ + long peerEpoch; } LinkedBlockingQueue sendqueue; @@ -206,7 +220,8 @@ public class FastLeaderElection implemen current.zxid, logicalclock, self.getPeerState(), - response.sid); + response.sid, + current.peerEpoch); sendqueue.offer(notmsg); } else { @@ -216,11 +231,15 @@ public class FastLeaderElection implemen + self.getId()); } + /* + * We check for 28 bytes for backward compatibility + */ if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } + boolean backCompatibility = (response.buffer.capacity() == 28); response.buffer.clear(); // State of peer that sent this message @@ -244,9 +263,17 @@ public class FastLeaderElection implemen Notification n = new Notification(); n.leader = response.buffer.getLong(); n.zxid = response.buffer.getLong(); - n.epoch = response.buffer.getLong(); + n.electionEpoch = response.buffer.getLong(); n.state = ackstate; n.sid = response.sid; + if(!backCompatibility){ + n.peerEpoch = response.buffer.getLong(); + } else { + if(LOG.isInfoEnabled()){ + LOG.info("Backward compatibility mode, server id: " + n.sid); + } + n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid); + } /* * Print notification info @@ -268,14 +295,15 @@ public class FastLeaderElection implemen * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) - && (n.epoch < logicalclock)){ + && (n.electionEpoch < logicalclock)){ Vote v = getVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.id, v.zxid, logicalclock, self.getPeerState(), - response.sid); + response.sid, + v.peerEpoch); sendqueue.offer(notmsg); } } else { @@ -298,7 +326,8 @@ public class FastLeaderElection implemen current.zxid, logicalclock, self.getPeerState(), - response.sid); + response.sid, + current.peerEpoch); sendqueue.offer(notmsg); } } @@ -347,7 +376,7 @@ public class FastLeaderElection implemen * @param m message to send */ private void process(ToSend m) { - byte requestBytes[] = new byte[28]; + byte requestBytes[] = new byte[36]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); /* @@ -358,7 +387,8 @@ public class FastLeaderElection implemen requestBuffer.putInt(m.state.ordinal()); requestBuffer.putLong(m.leader); requestBuffer.putLong(m.zxid); - requestBuffer.putLong(m.epoch); + requestBuffer.putLong(m.electionEpoch); + requestBuffer.putLong(m.peerEpoch); manager.toSend(m.sid, requestBuffer); @@ -413,6 +443,7 @@ public class FastLeaderElection implemen volatile long logicalclock; /* Election instance */ long proposedLeader; long proposedZxid; + long proposedEpoch; /** @@ -494,12 +525,13 @@ public class FastLeaderElection implemen proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING, - sid); + sid, + proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), " + proposedZxid + " (n.zxid), " + logicalclock + " (n.round), " + sid + " (recipient), " + self.getId() + - " (myid)"); + " (myid), " + proposedEpoch + " (n.peerEpoch)"); } sendqueue.offer(notmsg); } @@ -508,9 +540,9 @@ public class FastLeaderElection implemen private void printNotification(Notification n){ LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid + - " (n.zxid), " + n.epoch + " (n.round), " + n.state + - " (n.state), " + n.sid + " (n.sid), " + self.getPeerState() + - " (my state)"); + " (n.zxid), " + n.electionEpoch + " (n.round), " + n.state + + " (n.state), " + n.sid + " (n.sid), " + n.peerEpoch + " (n.peerEPoch), " + + self.getPeerState() + " (my state)"); } /** @@ -520,14 +552,16 @@ public class FastLeaderElection implemen * @param id Server identifier * @param zxid Last zxid observed by the issuer of this vote */ - private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) { + private boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " + newZxid + ", proposed zxid: " + curZxid); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } - return ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))); + return ((newEpoch > curEpoch) || + ((newEpoch == curEpoch) && (newZxid > curZxid)) || + ((newZxid == curZxid) && (newId > curId))); } /** @@ -566,12 +600,12 @@ public class FastLeaderElection implemen * * @param votes set of votes * @param leader leader id - * @param epoch epoch id + * @param electionEpoch epoch id */ private boolean checkLeader( HashMap votes, long leader, - long epoch){ + long electionEpoch){ boolean predicate = true; @@ -590,7 +624,7 @@ public class FastLeaderElection implemen return predicate; } - synchronized void updateProposal(long leader, long zxid){ + synchronized void updateProposal(long leader, long zxid, long epoch){ if(LOG.isDebugEnabled()){ LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid + " (newzxid), " + proposedLeader + " (oldleader), " + @@ -598,10 +632,11 @@ public class FastLeaderElection implemen } proposedLeader = leader; proposedZxid = zxid; + proposedEpoch = epoch; } synchronized Vote getVote(){ - return new Vote(proposedLeader, proposedZxid); + return new Vote(proposedLeader, proposedZxid, proposedEpoch); } /** @@ -645,6 +680,23 @@ public class FastLeaderElection implemen } /** + * Returns the initial vote value of the peer epoch. + * + * @return long + */ + private long getPeerEpoch(){ + if(self.getLearnerType() == LearnerType.PARTICIPANT) + try { + return self.getCurrentEpoch(); + } catch(IOException e) { + RuntimeException re = new RuntimeException(e.getMessage()); + re.setStackTrace(e.getStackTrace()); + throw re; + } + else return Long.MIN_VALUE; + } + + /** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. @@ -670,7 +722,7 @@ public class FastLeaderElection implemen synchronized(this){ logicalclock++; - updateProposal(getInitId(), getInitLastLoggedZxid()); + updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + @@ -717,47 +769,48 @@ public class FastLeaderElection implemen switch (n.state) { case LOOKING: // If notification > current, replace and send messages out - if (n.epoch > logicalclock) { - logicalclock = n.epoch; + if (n.electionEpoch > logicalclock) { + logicalclock = n.electionEpoch; recvset.clear(); - if(totalOrderPredicate(n.leader, n.zxid, - getInitId(), getInitLastLoggedZxid())) { - updateProposal(n.leader, n.zxid); + if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, + getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { + updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), - getInitLastLoggedZxid()); + getInitLastLoggedZxid(), + getPeerEpoch()); } sendNotifications(); - } else if (n.epoch < logicalclock) { + } else if (n.electionEpoch < logicalclock) { if(LOG.isDebugEnabled()){ - LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch + LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = " + n.electionEpoch + ", Logical clock" + logicalclock); } break; - } else if (totalOrderPredicate(n.leader, n.zxid, - proposedLeader, proposedZxid)) { - updateProposal(n.leader, n.zxid); + } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, + proposedLeader, proposedZxid, proposedEpoch)) { + updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: From = " + n.sid + ", Proposed leader = " + n.leader + - ", Porposed zxid = " + n.zxid + - ", Proposed epoch = " + n.epoch); + ", Proposed zxid = " + n.zxid + + ", Proposed election epoch = " + n.electionEpoch); } - recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); + recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, - logicalclock))) { + logicalclock, proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ - if(totalOrderPredicate(n.leader, n.zxid, - proposedLeader, proposedZxid)){ + if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, + proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } @@ -772,7 +825,7 @@ public class FastLeaderElection implemen ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, - proposedZxid); + proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } @@ -787,15 +840,15 @@ public class FastLeaderElection implemen * Consider all notifications from the same epoch * together. */ - if(n.epoch == logicalclock){ - recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch)); + if(n.electionEpoch == logicalclock){ + recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(termPredicate(recvset, new Vote(n.leader, - n.zxid, n.epoch, n.state)) - && checkLeader(outofelection, n.leader, n.epoch)) { + n.zxid, n.electionEpoch, n.peerEpoch, n.state)) + && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); - Vote endVote = new Vote(n.leader, n.zxid); + Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } @@ -806,16 +859,16 @@ public class FastLeaderElection implemen * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.leader, n.zxid, - n.epoch, n.state)); + n.electionEpoch, n.peerEpoch, n.state)); if (termPredicate(outofelection, new Vote(n.leader, - n.zxid, n.epoch, n.state)) - && checkLeader(outofelection, n.leader, n.epoch)) { + n.zxid, n.electionEpoch, n.peerEpoch, n.state)) + && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized(this){ - logicalclock = n.epoch; + logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } - Vote endVote = new Vote(n.leader, n.zxid); + Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 14 05:14:05 2011 @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import org.apache.jute.BinaryInputArchive; import org.apache.jute.Record; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; /** @@ -64,20 +65,21 @@ public class Follower extends Learner{ self.start_fle = 0; self.end_fle = 0; fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean); - try { + try { InetSocketAddress addr = findLeader(); try { connectToLeader(addr); - long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO); + long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); + //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check - long lastLoggedZxid = self.getLastLoggedZxid(); - if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) { - LOG.error("Leader epoch " + Long.toHexString(newLeaderZxid >> 32L) - + " is less than our epoch " + Long.toHexString(lastLoggedZxid >> 32L)); + long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); + if (newEpoch < self.getAcceptedEpoch()) { + LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } - syncWithLeader(newLeaderZxid); + syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (self.isRunning()) { readPacket(qp); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Tue Jun 14 05:14:05 2011 @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -43,6 +44,8 @@ import org.apache.zookeeper.server.Final import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ZxidUtils; /** * This class has the control logic for the Leader. @@ -177,6 +180,17 @@ public class Leader { final static int UPTODATE = 12; /** + * This message is the first that a follower receives from the leader. + * It has the protocol version and the epoch of the leader. + */ + public static final int LEADERINFO = 17; + + /** + * This message is used by the follow to ack a proposed epoch. + */ + public static final int ACKEPOCH = 18; + + /** * This message type is sent to a leader to request and mutation operation. * The payload will consist of a request header followed by a request. */ @@ -219,8 +233,8 @@ public class Leader { * This message type informs observers of a committed proposal. */ final static int INFORM = 8; - - private ConcurrentMap outstandingProposals = new ConcurrentHashMap(); + + ConcurrentMap outstandingProposals = new ConcurrentHashMap(); ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue(); @@ -263,6 +277,12 @@ public class Leader { } } + StateSummary leaderStateSummary; + + long epoch = -1; + boolean waitingForNewEpoch = true; + boolean readyToStart = false; + /** * This method is main function that is called to lead * @@ -282,9 +302,16 @@ public class Leader { self.tick = 0; zk.loadData(); - long epoch = self.getLastLoggedZxid() >> 32L; - epoch++; - zk.setZxid(epoch << 32L); + leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); + + // Start thread that waits for connection requests from + // new followers. + cnxAcceptor = new LearnerCnxAcceptor(); + cnxAcceptor.start(); + + long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); + self.setAcceptedEpoch(epoch); + zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized(this){ lastProposed = zk.getZxid(); @@ -300,11 +327,10 @@ public class Leader { } outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal); - // Start thread that waits for connection requests from - // new followers. - cnxAcceptor = new LearnerCnxAcceptor(); - cnxAcceptor.start(); - + readyToStart = true; + waitForEpochAck(self.getId(), leaderStateSummary); + self.setCurrentEpoch(epoch); + // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged @@ -384,7 +410,7 @@ public class Leader { } } - boolean isShutdown; + boolean isShutdown; /** * Close down all the LearnerHandlers @@ -625,7 +651,7 @@ public class Leader { * @return */ public long getEpoch(){ - return lastProposed >> 32L; + return ZxidUtils.getEpochFromZxid(lastProposed); } /** @@ -740,4 +766,53 @@ public class Leader { return lastProposed; } + private HashSet connectingFollowers = new HashSet(); + public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException { + synchronized(connectingFollowers) { + if (!waitingForNewEpoch) { + return epoch; + } + if (lastAcceptedEpoch > epoch) { + epoch = lastAcceptedEpoch+1; + } + connectingFollowers.add(sid); + QuorumVerifier verifier = self.getQuorumVerifier(); + if (verifier.containsQuorum(connectingFollowers)) { + waitingForNewEpoch = false; + connectingFollowers.notifyAll(); + } else { + connectingFollowers.wait(self.getInitLimit()*self.getTickTime()); + if (waitingForNewEpoch) { + throw new InterruptedException("Out of time to propose an epoch"); + } + } + return epoch; + } + } + + private HashSet electingFollowers = new HashSet(); + private boolean electionFinished = false; + public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { + synchronized(electingFollowers) { + if (electionFinished) { + return; + } + if (ss.getCurrentEpoch() != -1) { + if (ss.isMoreRecentThan(leaderStateSummary)) { + throw new IOException("Follower is ahead of the leader"); + } + electingFollowers.add(id); + } + QuorumVerifier verifier = self.getQuorumVerifier(); + if (readyToStart && verifier.containsQuorum(electingFollowers)) { + electionFinished = true; + electingFollowers.notifyAll(); + } else { + electingFollowers.wait(self.getInitLimit()*self.getTickTime()); + if (waitingForNewEpoch) { + throw new InterruptedException("Out of time to propose an epoch"); + } + } + } + } } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Tue Jun 14 05:14:05 2011 @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.Map.Entry; @@ -44,6 +45,7 @@ import org.apache.zookeeper.server.Serve import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; /** @@ -72,7 +74,9 @@ public class Learner { } protected InputArchive leaderIs; - protected OutputArchive leaderOs; + protected OutputArchive leaderOs; + /** the protocol version of the leader */ + protected int leaderProtocolVersion = 0x01; protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); @@ -250,22 +254,48 @@ public class Learner { /* * Send follower info, including last zxid and sid */ + long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); - long sentLastZxid = self.getLastLoggedZxid(); - qp.setZxid(sentLastZxid); + qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ + LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); - DataOutputStream dsid = new DataOutputStream(bsid); - dsid.writeLong(self.getId()); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); + boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); writePacket(qp, true); readPacket(qp); - + final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + if (qp.getType() == Leader.LEADERINFO) { + // we are connected to a 1.0 server so accept the new epoch and read the next packet + leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); + byte epochBytes[] = new byte[4]; + final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); + if (newEpoch > self.getAcceptedEpoch()) { + wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); + self.setAcceptedEpoch(newEpoch); + } else if (newEpoch == self.getAcceptedEpoch()) { + // since we have already acked an epoch equal to the leaders, we cannot ack + // again, but we still need to send our lastZxid to the leader so that we can + // sync with it if it does assume leadership of the epoch. + // the -1 indicates that this reply should not count as an ack for the new epoch + wrappedEpochBytes.putInt(-1); + } else { + throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); + } + QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); + writePacket(ackNewEpoch, true); + readPacket(qp); + } else { + if (newEpoch > self.getAcceptedEpoch()) { + self.setAcceptedEpoch(newEpoch); + } + } if (qp.getType() != Leader.NEWLEADER) { LOG.error("First packet should have been NEWLEADER"); throw new IOException("First packet should have been NEWLEADER"); @@ -321,9 +351,6 @@ public class Learner { } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - if(LOG.isInfoEnabled()){ - LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L)); - } long lastQueued = 0; // we are now going to start getting transactions to apply followed by an UPTODATE @@ -369,7 +396,9 @@ public class Learner { } } } - ack.setZxid(newLeaderZxid & ~0xffffffffL); + long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); + self.setCurrentEpoch(newEpoch); + ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); zk.startup(); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Tue Jun 14 05:14:05 2011 @@ -40,10 +40,12 @@ import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; /** @@ -73,6 +75,12 @@ public class LearnerHandler extends Thre return sid; } + protected int version = 0x1; + + int getVersion() { + return version; + } + /** * The packets to be sent to the learner */ @@ -226,8 +234,7 @@ public class LearnerHandler extends Thre */ @Override public void run() { - try { - + try { ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock .getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); @@ -240,9 +247,17 @@ public class LearnerHandler extends Thre + " is not FOLLOWERINFO or OBSERVERINFO!"); return; } - if (qp.getData() != null) { - ByteBuffer bbsid = ByteBuffer.wrap(qp.getData()); - this.sid = bbsid.getLong(); + byte learnerInfoData[] = qp.getData(); + if (learnerInfoData != null) { + if (learnerInfoData.length == 8) { + ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); + this.sid = bbsid.getLong(); + } else { + LearnerInfo li = new LearnerInfo(); + ZooKeeperServer.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li); + this.sid = li.getServerid(); + this.version = li.getProtocolVersion(); + } } else { this.sid = leader.followerCounter.getAndDecrement(); } @@ -254,7 +269,42 @@ public class LearnerHandler extends Thre learnerType = LearnerType.OBSERVER; } - long peerLastZxid = qp.getZxid(); + long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + + long peerLastZxid; + StateSummary ss = null; + if (learnerType == LearnerType.PARTICIPANT) { + long zxid = qp.getZxid(); + long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); + + if (this.getVersion() < 0x10000) { + // we are going to have to extrapolate the epoch information + long epoch = ZxidUtils.getEpochFromZxid(zxid); + ss = new StateSummary(epoch, zxid); + // fake the message + leader.waitForEpochAck(this.getSid(), ss); + } else { + byte ver[] = new byte[4]; + ByteBuffer.wrap(ver).putInt(0x10000); + QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); + oa.writeRecord(newEpochPacket, "packet"); + bufferedOutput.flush(); + QuorumPacket ackEpochPacket = new QuorumPacket(); + ia.readRecord(ackEpochPacket, "packet"); + if (ackEpochPacket.getType() != Leader.ACKEPOCH) { + LOG.error(ackEpochPacket.toString() + + " is not ACKEPOCH"); + return; + } + ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); + ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); + leader.waitForEpochAck(this.getSid(), ss); + } + peerLastZxid = ss.getLastZxid(); + } else { + peerLastZxid = qp.getZxid(); + } + /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jun 14 05:14:05 2011 @@ -17,8 +17,14 @@ */ package org.apache.zookeeper.server.quorum; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; @@ -41,6 +47,7 @@ import org.apache.zookeeper.server.ZooKe import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ZxidUtils; /** * This class manages the quorum protocol. There are three states this server @@ -395,16 +402,50 @@ public class QuorumPeer extends Thread i @Override public synchronized void start() { - try { + loadDataBase(); + cnxnFactory.start(); + startLeaderElection(); + super.start(); + } + + private void loadDataBase() { + try { zkDb.loadDataBase(); + + // load the epochs + long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; + long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); + try { + currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); + } catch(FileNotFoundException e) { + // pick a reasonable epoch number + // this should only happen once when moving to a + // new code version + LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation"); + currentEpoch = epochOfZxid; + writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); + } + if (epochOfZxid > currentEpoch) { + throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); + } + try { + acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); + } catch(FileNotFoundException e) { + // pick a reasonable epoch number + // this should only happen once when moving to a + // new code version + LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation"); + acceptedEpoch = epochOfZxid; + writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch); + } + if (acceptedEpoch < currentEpoch) { + throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch)); + } } catch(IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } - cnxnFactory.start(); - startLeaderElection(); - super.start(); - } + } ResponderThread responder; @@ -413,7 +454,13 @@ public class QuorumPeer extends Thread i responder.interrupt(); } synchronized public void startLeaderElection() { - currentVote = new Vote(myid, getLastLoggedZxid()); + try { + currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); + } catch(IOException e) { + RuntimeException re = new RuntimeException(e.getMessage()); + re.setStackTrace(e.getStackTrace()); + throw re; + } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; @@ -487,16 +534,10 @@ public class QuorumPeer extends Thread i * @return the highest zxid for this host */ public long getLastLoggedZxid() { - long lastLogged= -1L; - try { - if (!zkDb.isInitialized()) { - zkDb.loadDataBase(); - } - lastLogged = zkDb.getDataTreeLastProcessedZxid(); - } catch(IOException ie) { - LOG.warn("Unable to load database ", ie); + if (!zkDb.isInitialized()) { + loadDataBase(); } - return lastLogged; + return zkDb.getDataTreeLastProcessedZxid(); } public Follower follower; @@ -1002,5 +1043,63 @@ public class QuorumPeer extends Thread i */ public QuorumCnxManager getQuorumCnxManager() { return qcm; -} + } + private long readLongFromFile(String name) throws IOException { + File file = new File(logFactory.getSnapDir(), name); + BufferedReader br = new BufferedReader(new FileReader(file)); + String line = ""; + try { + line = br.readLine(); + return Long.parseLong(line); + } catch(NumberFormatException e) { + throw new IOException("Found " + line + " in " + file); + } finally { + br.close(); + } + } + + private long acceptedEpoch = -1; + private long currentEpoch = -1; + + public static final String CURRENT_EPOCH_FILENAME = "currentEpoch"; + + public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch"; + + private void writeLongToFile(String name, long value) throws IOException { + File file = new File(logFactory.getSnapDir(), name); + FileOutputStream out = new FileOutputStream(file); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out)); + try { + bw.write(Long.toString(value)); + bw.flush(); + out.getFD().sync(); + } finally { + bw.close(); + } + } + + public long getCurrentEpoch() throws IOException { + if (currentEpoch == -1) { + currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); + } + return currentEpoch; + } + + public long getAcceptedEpoch() throws IOException { + if (acceptedEpoch == -1) { + acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); + } + return acceptedEpoch; + } + + public void setCurrentEpoch(long e) throws IOException { + currentEpoch = e; + writeLongToFile(CURRENT_EPOCH_FILENAME, e); + + } + + public void setAcceptedEpoch(long e) throws IOException { + acceptedEpoch = e; + writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); + } } Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java?rev=1135382&view=auto ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java (added) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/StateSummary.java Tue Jun 14 05:14:05 2011 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +/** + * This class encapsulates the state comparison logic. Specifically, + * how two different states are compared. + */ +public class StateSummary { + private long currentEpoch; + private long lastZxid; + public StateSummary(long currentEpoch, long lastZxid) { + this.currentEpoch = currentEpoch; + this.lastZxid = lastZxid; + } + + public long getCurrentEpoch() { + return currentEpoch; + } + + public long getLastZxid() { + return lastZxid; + } + + public boolean isMoreRecentThan(StateSummary ss) { + return (currentEpoch > ss.currentEpoch) || (currentEpoch == ss.currentEpoch && lastZxid > ss.lastZxid); + } + @Override + public boolean equals(Object obj) { + if (!(obj instanceof StateSummary)) { + return false; + } + StateSummary ss = (StateSummary)obj; + return currentEpoch == ss.currentEpoch && lastZxid == ss.lastZxid; + } + + @Override + public int hashCode() { + return (int)(currentEpoch ^ lastZxid); + } +} Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Tue Jun 14 05:14:05 2011 @@ -22,29 +22,40 @@ import org.apache.zookeeper.server.quoru public class Vote { + public Vote(long id, long zxid) { this.id = id; this.zxid = zxid; } + + public Vote(long id, long zxid, long peerEpoch) { + this.id = id; + this.zxid = zxid; + this.peerEpoch = peerEpoch; + } - public Vote(long id, long zxid, long epoch) { + public Vote(long id, long zxid, long electionEpoch, long peerEpoch) { this.id = id; this.zxid = zxid; - this.epoch = epoch; + this.electionEpoch = electionEpoch; + this.peerEpoch = peerEpoch; } - public Vote(long id, long zxid, long epoch, ServerState state) { + public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) { this.id = id; this.zxid = zxid; - this.epoch = epoch; + this.electionEpoch = electionEpoch; this.state = state; + this.peerEpoch = peerEpoch; } public long id; public long zxid; - public long epoch = -1; + public long electionEpoch = -1; + + public long peerEpoch = -1; public ServerState state = ServerState.LOOKING; @@ -54,7 +65,7 @@ public class Vote { return false; } Vote other = (Vote) o; - return (id == other.id && zxid == other.zxid && epoch == other.epoch); + return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch); } @@ -64,6 +75,6 @@ public class Vote { } public String toString() { - return "(" + id + ", " + Long.toHexString(zxid) + ")"; + return "(" + id + ", " + Long.toHexString(zxid) + ", " + Long.toHexString(peerEpoch) + ")"; } } Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java?rev=1135382&view=auto ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java (added) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ZxidUtils.java Tue Jun 14 05:14:05 2011 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +public class ZxidUtils { + static public long getEpochFromZxid(long zxid) { + return zxid >> 32L; + } + static public long getCounterFromZxid(long zxid) { + return zxid & 0xffffffffL; + } + static public long makeZxid(long epoch, long counter) { + return (epoch << 32L) | (counter & 0xffffffffL); + } + static public String zxidToString(long zxid) { + return Long.toHexString(zxid); + } +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Tue Jun 14 05:14:05 2011 @@ -26,6 +26,7 @@ import java.io.StringReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.Map; import java.util.regex.Pattern; import org.apache.log4j.Layout; @@ -33,9 +34,15 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.WriterAppender; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.test.ClientBase; import org.junit.Assert; import org.junit.Test; @@ -46,7 +53,7 @@ import org.junit.Test; * */ public class QuorumPeerMainTest extends QuorumPeerTestBase { - /** + /** * Verify the ability to start a cluster. */ @Test @@ -103,6 +110,129 @@ public class QuorumPeerMainTest extends } /** + * Test early leader abandonment. + */ + @Test + public void testEarlyLeaderAbandonment() throws Exception { + ClientBase.setupTestEnv(); + + final int SERVER_COUNT = 3; + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n"); + } + String quorumCfgSection = sb.toString(); + + MainThread mt[] = new MainThread[SERVER_COUNT]; + ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + for(int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + + waitForAll(zk, States.CONNECTED); + + // we need to shutdown and start back up to make sure that the create session isn't the first transaction since + // that is rather innocuous. + for(int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + + waitForAll(zk, States.CONNECTING); + + for(int i = 0; i < SERVER_COUNT; i++) { + mt[i].start(); + } + + waitForAll(zk, States.CONNECTED); + + // ok lets find the leader and kill everything else, we have a few + // seconds, so it should be plenty of time + int leader = -1; + Map outstanding = null; + for(int i = 0; i < SERVER_COUNT; i++) { + if (mt[i].main.quorumPeer.leader == null) { + mt[i].shutdown(); + } else { + leader = i; + outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; + } + } + + try { + zk[leader].create("/zk"+leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("create /zk" + leader + " should have failed"); + } catch(KeeperException e) {} + + // just make sure that we actually did get it in process at the + // leader + Assert.assertTrue(outstanding.size() == 1); + Assert.assertTrue(((Proposal)outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create); + // make sure it has a chance to write it to disk + Thread.sleep(1000); + mt[leader].shutdown(); + waitForAll(zk, States.CONNECTING); + for(int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].start(); + } + } + for(int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + waitForOne(zk[i], States.CONNECTED); + zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + + mt[leader].start(); + waitForAll(zk, States.CONNECTED); + // make sure everything is consistent + for(int i = 0; i < SERVER_COUNT; i++) { + for(int j = 0; j < SERVER_COUNT; j++) { + if (i == leader) { + Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" should not have /zk" + i, zk[j].exists("/zk"+i, false) == null); + } else { + Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" does not have /zk" + i, zk[j].exists("/zk"+i, false) != null); + } + } + } + for(int i = 0; i < SERVER_COUNT; i++) { + zk[i].close(); + } + for(int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { + while(zk.getState() != state) { + Thread.sleep(500); + } + } + + private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { + int iterations = 10; + boolean someoneNotConnected = true; + while(someoneNotConnected) { + if (iterations-- == 0) { + throw new RuntimeException("Waiting too long"); + } + + someoneNotConnected = false; + for(ZooKeeper zk: zks) { + if (zk.getState() != state) { + someoneNotConnected = true; + } + } + Thread.sleep(1000); + } + } + + /** * Verify handling of bad quorum address */ @Test Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Jun 14 05:14:05 2011 @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.QuorumBase; @@ -54,20 +56,18 @@ public class QuorumPeerTestBase extends } } - public static class MainThread extends Thread { + public static class MainThread implements Runnable { final File confFile; - final TestQPMain main; + volatile TestQPMain main; public MainThread(int myid, int clientPort, String quorumCfgSection) throws IOException { - super("QuorumPeer with myid:" + myid - + " and clientPort:" + clientPort); File tmpDir = ClientBase.createTmpDir(); confFile = new File(tmpDir, "zoo.cfg"); FileWriter fwriter = new FileWriter(confFile); - fwriter.write("tickTime=2000\n"); + fwriter.write("tickTime=4000\n"); fwriter.write("initLimit=10\n"); fwriter.write("syncLimit=5\n"); @@ -94,10 +94,14 @@ public class QuorumPeerTestBase extends fwriter.write(Integer.toString(myid)); fwriter.flush(); fwriter.close(); - - main = new TestQPMain(); } + Thread currentThread; + synchronized public void start() { + main = new TestQPMain(); + currentThread = new Thread(this); + currentThread.start(); + } public void run() { String args[] = new String[1]; args[0] = confFile.toString(); @@ -106,11 +110,27 @@ public class QuorumPeerTestBase extends } catch (Exception e) { // test will still fail even though we just log/ignore LOG.error("unexpected exception in run", e); + } finally { + currentThread = null; } } - public void shutdown() { - main.shutdown(); + public void shutdown() throws InterruptedException { + Thread t = currentThread; + if (t != null && t.isAlive()) { + main.shutdown(); + t.join(500); + } } + public void join(long timeout) throws InterruptedException { + Thread t = currentThread; + if (t != null) { + t.join(timeout); + } + } + public boolean isAlive() { + Thread t = currentThread; + return t != null && t.isAlive(); + } } } Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jun 14 05:14:05 2011 @@ -448,7 +448,7 @@ public abstract class ClientBase extends protected void tearDownAll() throws Exception { synchronized (this) { - for (ZooKeeper zk : allClients) { + if (allClients != null) for (ZooKeeper zk : allClients) { try { if (zk != null) zk.close(); Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Tue Jun 14 05:14:05 2011 @@ -199,6 +199,7 @@ public class ReadOnlyModeTest extends Qu watcher.reset(); qu.start(2); qu.start(3); + ClientBase.waitForServerUp(qu.getConnString(), 2000); watcher.waitForConnected(CONNECTION_TIMEOUT); zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Modified: zookeeper/trunk/src/zookeeper.jute URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/zookeeper.jute?rev=1135382&r1=1135381&r2=1135382&view=diff ============================================================================== --- zookeeper/trunk/src/zookeeper.jute (original) +++ zookeeper/trunk/src/zookeeper.jute Tue Jun 14 05:14:05 2011 @@ -196,6 +196,10 @@ module org.apache.zookeeper.proto { } module org.apache.zookeeper.server.quorum { + class LearnerInfo { + long serverid; + int protocolVersion; + } class QuorumPacket { int type; // Request, Ack, Commit, Ping long zxid;