Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C212119CD for ; Tue, 8 Jul 2014 03:26:29 +0000 (UTC) Received: (qmail 18851 invoked by uid 500); 8 Jul 2014 03:26:29 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 18817 invoked by uid 500); 8 Jul 2014 03:26:28 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 18803 invoked by uid 99); 8 Jul 2014 03:26:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 03:26:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2014 03:26:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3BE152388860; Tue, 8 Jul 2014 03:25:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1608648 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Tue, 08 Jul 2014 03:25:57 -0000 To: commits@zookeeper.apache.org From: michim@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140708032558.3BE152388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: michim Date: Tue Jul 8 03:25:56 2014 New Revision: 1608648 URL: http://svn.apache.org/r1608648 Log: ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via michim) Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java Removed: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEBackwardElectionRoundTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETestUtils.java Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Tue Jul 8 03:25:56 2014 @@ -683,6 +683,9 @@ BUGFIXES: ZOOKEEPER-1835. dynamic configuration file renaming fails on Windows (Bruno Freudensprung via rakeshr) + ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via + michim) + IMPROVEMENTS: ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports, Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Tue Jul 8 03:25:56 2014 @@ -95,6 +95,13 @@ public class FastLeaderElection implemen static public class Notification { /* + * Format version, introduced in 3.4.6 + */ + + public final static int CURRENTVERSION = 0x2; + int version; + + /* * Proposed leader */ long leader; @@ -125,9 +132,9 @@ public class FastLeaderElection implemen */ long peerEpoch; } - + static byte[] dummyData = new byte[0]; - + /** * Messages that a peer wants to send to other peers. * These messages can be both Notifications and Acks @@ -142,16 +149,15 @@ public class FastLeaderElection implemen long electionEpoch, ServerState state, long sid, - long peerEpoch, + long peerEpoch, byte[] configData) { - this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; - this.peerEpoch = peerEpoch; + this.peerEpoch = peerEpoch; this.configData = configData; } @@ -184,7 +190,7 @@ public class FastLeaderElection implemen * Used to send a QuorumVerifier (configuration info) */ byte[] configData = dummyData; - + /* * Leader epoch */ @@ -201,7 +207,7 @@ public class FastLeaderElection implemen * spawns a new thread. */ - private class Messenger { + protected class Messenger { /** * Receives messages from instance of QuorumCnxManager on @@ -223,78 +229,91 @@ public class FastLeaderElection implemen Message response; while (!stop) { // Sleeps on receive - try{ + try { response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; - + // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } - + // this is the backwardCompatibility mode in place before ZK-107 // It is for a version of the protocol in which we didn't send peer epoch - // With peer epoch the message became 36 bytes + // With peer epoch and version the message became 40 bytes boolean backCompatibility28 = (response.buffer.capacity() == 28); - - // ZK-107 sends the configuration info in every message. - // So messages are 36 bytes + size of configuration info - // (variable length, shoulld be at the end of the message). - boolean backCompatibility36 = (response.buffer.capacity() == 36); + // this is the backwardCompatibility mode for no version information + boolean backCompatibility40 = (response.buffer.capacity() == 40); + response.buffer.clear(); + + // Instantiate Notification and set its attributes + Notification n = new Notification(); + int rstate = response.buffer.getInt(); long rleader = response.buffer.getLong(); long rzxid = response.buffer.getLong(); long relectionEpoch = response.buffer.getLong(); long rpeerepoch; - - if(!backCompatibility28){ - rpeerepoch = response.buffer.getLong(); - } else { - if(LOG.isInfoEnabled()){ - LOG.info("Backward compatibility mode (28 bits), server id: " + response.sid); + + int version = 0x0; + if (!backCompatibility28) { + rpeerepoch = response.buffer.getLong(); + if (!backCompatibility40) { + /* + * Version added in 3.4.6 + */ + + version = response.buffer.getInt(); + } else { + LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid); } + } else { + LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid); rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid); } - + QuorumVerifier rqv = null; - - // check if we have more than 36 bytes. If so extract config info from message. - if(!backCompatibility28 && !backCompatibility36){ - byte b[] = new byte[response.buffer.remaining()]; - response.buffer.get(b); + + // check if we have a version that includes config. If so extract config info from message. + if (version > 0x1) { + int configLength = response.buffer.getInt(); + byte b[] = new byte[configLength]; + + response.buffer.get(b); - synchronized(self){ - try { - rqv = self.configFromString(new String(b)); - QuorumVerifier curQV = self.getQuorumVerifier(); - if (rqv.getVersion() > curQV.getVersion()) { - LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion()) + " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion())); - self.processReconfig(rqv, null, null, false); - if (!rqv.equals(curQV)) { - LOG.info("restarting leader election"); - self.shuttingDownLE = true; - self.getElectionAlg().shutdown(); - break; + synchronized(self) { + try { + rqv = self.configFromString(new String(b)); + QuorumVerifier curQV = self.getQuorumVerifier(); + if (rqv.getVersion() > curQV.getVersion()) { + LOG.info("{} Received version: {} my version: {}", self.getId(), + Long.toHexString(rqv.getVersion()), + Long.toHexString(self.getQuorumVerifier().getVersion())); + self.processReconfig(rqv, null, null, false); + if (!rqv.equals(curQV)) { + LOG.info("restarting leader election"); + self.shuttingDownLE = true; + self.getElectionAlg().shutdown(); + + break; } - } - } catch (IOException e) { - LOG.error("Something went wrong while processing config received from " + response.sid); + } + } catch (IOException e) { + LOG.error("Something went wrong while processing config received from {}", response.sid); } catch (ConfigException e) { - LOG.error("Something went wrong while processing config received from " + response.sid); - } - } + LOG.error("Something went wrong while processing config received from {}", response.sid); + } + } } else { - if(LOG.isInfoEnabled()){ - LOG.info("Backward compatibility mode (before reconfig), server id: " + response.sid); - } + LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid); } /* - * If it is from a non-voting server (such as an observer or - * a non-voting follower), respond right away. + * If it is from a non-voting server (such as an observer or + * a non-voting follower), respond right away. */ if(!self.getVotingView().containsKey(response.sid)){ Vote current = self.getCurrentVote(); @@ -331,17 +350,18 @@ public class FastLeaderElection implemen case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; + default: + continue; } - // Instantiate Notification and set its attributes - Notification n = new Notification(); n.leader = rleader; n.zxid = rzxid; n.electionEpoch = relectionEpoch; n.state = ackstate; - n.sid = response.sid; + n.sid = response.sid; n.peerEpoch = rpeerepoch; - n.qv = rqv; + n.version = version; + n.qv = rqv; /* * Print notification info */ @@ -383,14 +403,14 @@ public class FastLeaderElection implemen Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ if(LOG.isDebugEnabled()){ - LOG.debug("Sending new notification. My id = " + - self.getId() + " recipient=" + - response.sid + " zxid=0x" + - Long.toHexString(current.getZxid()) + - " leader=" + current.getId() + " config version = " + + LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", + self.getId(), + response.sid, + Long.toHexString(current.getZxid()), + current.getId(), Long.toHexString(self.getQuorumVerifier().getVersion())); } - + QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, @@ -399,7 +419,7 @@ public class FastLeaderElection implemen current.getElectionEpoch(), self.getPeerState(), response.sid, - current.getPeerEpoch(), + current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } @@ -414,9 +434,6 @@ public class FastLeaderElection implemen } } - - - /** * This worker simply dequeues a message to send and * and queues it on the manager's queue. @@ -451,21 +468,13 @@ public class FastLeaderElection implemen * * @param m message to send */ - private void process(ToSend m) { - byte requestBytes[] = new byte[36 + m.configData.length]; - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - - /* - * Building notification packet to send - */ - - requestBuffer.clear(); - requestBuffer.putInt(m.state.ordinal()); - requestBuffer.putLong(m.leader); - requestBuffer.putLong(m.zxid); - requestBuffer.putLong(m.electionEpoch); - requestBuffer.putLong(m.peerEpoch); - requestBuffer.put(m.configData); + void process(ToSend m) { + ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), + m.leader, + m.zxid, + m.electionEpoch, + m.peerEpoch, + m.configData); manager.toSend(m.sid, requestBuffer); @@ -474,6 +483,8 @@ public class FastLeaderElection implemen WorkerSender ws; WorkerReceiver wr; + Thread wsThread = null; + Thread wrThread = null; /** * Constructor of class Messenger. @@ -484,17 +495,23 @@ public class FastLeaderElection implemen this.ws = new WorkerSender(manager); - Thread t = new Thread(this.ws, + this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); - t.setDaemon(true); - t.start(); + this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); - t = new Thread(this.wr, + this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); - t.setDaemon(true); - t.start(); + this.wrThread.setDaemon(true); + } + + /** + * Starts instances of WorkerSender and WorkerReceiver + */ + void start(){ + this.wsThread.start(); + this.wrThread.start(); } /** @@ -522,6 +539,55 @@ public class FastLeaderElection implemen return logicalclock; } + static ByteBuffer buildMsg(int state, + long leader, + long zxid, + long electionEpoch, + long epoch) { + byte requestBytes[] = new byte[40]; + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + + /* + * Building notification packet to send, this is called directly only in tests + */ + + requestBuffer.clear(); + requestBuffer.putInt(state); + requestBuffer.putLong(leader); + requestBuffer.putLong(zxid); + requestBuffer.putLong(electionEpoch); + requestBuffer.putLong(epoch); + requestBuffer.putInt(0x1); + + return requestBuffer; + } + + static ByteBuffer buildMsg(int state, + long leader, + long zxid, + long electionEpoch, + long epoch, + byte[] configData) { + byte requestBytes[] = new byte[44 + configData.length]; + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + + /* + * Building notification packet to send + */ + + requestBuffer.clear(); + requestBuffer.putInt(state); + requestBuffer.putLong(leader); + requestBuffer.putLong(zxid); + requestBuffer.putLong(electionEpoch); + requestBuffer.putLong(epoch); + requestBuffer.putInt(Notification.CURRENTVERSION); + requestBuffer.putInt(configData.length); + requestBuffer.put(configData); + + return requestBuffer; + } + /** * Constructor of FastLeaderElection. It takes two parameters, one * is the QuorumPeer object that instantiated this object, and the other @@ -557,12 +623,17 @@ public class FastLeaderElection implemen this.messenger = new Messenger(manager); } + /** + * This method starts the sender and receiver threads. + */ + public void start() { + this.messenger.start(); + } + private void leaveInstance(Vote v) { if(LOG.isDebugEnabled()){ - LOG.debug("About to leave FLE instance: leader=" - + v.getId() + ", zxid=0x" + - Long.toHexString(v.getZxid()) + ", my id=" + self.getId() - + ", my state=" + self.getPeerState()); + LOG.debug("About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}", + v.getId(), Long.toHexString(v.getZxid()), self.getId(), self.getPeerState()); } recvqueue.clear(); } @@ -582,7 +653,6 @@ public class FastLeaderElection implemen messenger.halt(); LOG.debug("FLE is down"); } - /** * Send notifications to all peers upon a change in our vote @@ -608,14 +678,17 @@ public class FastLeaderElection implemen } private void printNotification(Notification n){ - LOG.info("Notification: " + n.leader + " (n.leader), 0x" + LOG.info("Notification: " + + Long.toHexString(n.version) + " (message format version), " + + n.leader + " (n.leader), 0x" + Long.toHexString(n.zxid) + " (n.zxid), 0x" + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state + " (n.state), " + n.sid + " (n.sid), 0x" + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), " - + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):"")); + + self.getPeerState() + " (my state)" + + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):"")); } - + /** * Check if a pair (server id, zxid) succeeds our @@ -630,7 +703,7 @@ public class FastLeaderElection implemen if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } - + /* * We return true if one of the following three cases hold: * 1- New epoch is higher @@ -638,8 +711,8 @@ public class FastLeaderElection implemen * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ - - return ((newEpoch > curEpoch) || + + return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); } @@ -700,7 +773,7 @@ public class FastLeaderElection implemen else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; } else if(logicalclock != electionEpoch) { predicate = false; - } + } return predicate; } @@ -776,7 +849,7 @@ public class FastLeaderElection implemen } else return Long.MIN_VALUE; } - + /** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Tue Jul 8 03:25:56 2014 @@ -70,7 +70,7 @@ public class QuorumCnxManager { // stale notifications to peers static final int SEND_CAPACITY = 1; - static final int PACKETMAXSIZE = 1024 * 1024; + static final int PACKETMAXSIZE = 1024 * 512; /* * Maximum number of attempts to connect to a peer */ Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jul 8 03:25:56 2014 @@ -810,7 +810,9 @@ public class QuorumPeer extends ZooKeepe QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); - le = new FastLeaderElection(this, qcm); + FastLeaderElection fle = new FastLeaderElection(this, qcm); + fle.start(); + le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Tue Jul 8 03:25:56 2014 @@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quoru public class Vote { - public Vote(long id, long zxid) { + public Vote(long id, + long zxid) { + this.version = 0x0; this.id = id; this.zxid = zxid; this.electionEpoch = -1; @@ -31,7 +33,10 @@ public class Vote { this.state = ServerState.LOOKING; } - public Vote(long id, long zxid, long peerEpoch) { + public Vote(long id, + long zxid, + long peerEpoch) { + this.version = 0x0; this.id = id; this.zxid = zxid; this.electionEpoch = -1; @@ -39,7 +44,11 @@ public class Vote { this.state = ServerState.LOOKING; } - public Vote(long id, long zxid, long electionEpoch, long peerEpoch) { + public Vote(long id, + long zxid, + long electionEpoch, + long peerEpoch) { + this.version = 0x0; this.id = id; this.zxid = zxid; this.electionEpoch = electionEpoch; @@ -47,7 +56,13 @@ public class Vote { this.state = ServerState.LOOKING; } - public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) { + public Vote(int version, + long id, + long zxid, + long electionEpoch, + long peerEpoch, + ServerState state) { + this.version = version; this.id = id; this.zxid = zxid; this.electionEpoch = electionEpoch; @@ -55,6 +70,21 @@ public class Vote { this.peerEpoch = peerEpoch; } + public Vote(long id, + long zxid, + long electionEpoch, + long peerEpoch, + ServerState state) { + this.id = id; + this.zxid = zxid; + this.electionEpoch = electionEpoch; + this.state = state; + this.peerEpoch = peerEpoch; + this.version = 0x0; + } + + final private int version; + final private long id; final private long zxid; @@ -63,6 +93,10 @@ public class Vote { final private long peerEpoch; + public int getVersion() { + return version; + } + public long getId() { return id; } @@ -91,7 +125,10 @@ public class Vote { return false; } Vote other = (Vote) o; - return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch); + return (id == other.id + && zxid == other.zxid + && electionEpoch == other.electionEpoch + && peerEpoch == other.peerEpoch); } Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java?rev=1608648&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java Tue Jul 8 03:25:56 2014 @@ -0,0 +1,150 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.test.ClientBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLEBackwardElectionRoundTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class); + + int count; + HashMap peers; + File tmpdir[]; + int port[]; + + QuorumCnxManager cnxManagers[]; + + @Before + public void setUp() throws Exception { + count = 3; + + peers = new HashMap(count); + tmpdir = new File[count]; + port = new int[count]; + cnxManagers = new QuorumCnxManager[count - 1]; + } + + @After + public void tearDown() throws Exception { + for(int i = 0; i < (count - 1); i++){ + if(cnxManagers[i] != null){ + cnxManagers[i].halt(); + } + } + } + + /** + * This test is checking the following case. A server S is + * currently LOOKING and it receives notifications from + * a quorum indicating they are following S. The election + * round E of S is higher than the election round E' in the + * notification messages, so S becomes the leader and sets + * its epoch back to E'. In the meanwhile, one or more + * followers turn to LOOKING and elect S in election round E. + * Having leader and followers with different election rounds + * might prevent other servers from electing a leader because + * they can't get a consistent set of notifications from a + * quorum. + * + * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514} + * + * + * @throws Exception + */ + + @Test + public void testBackwardElectionRound() throws Exception { + LOG.info("TestLE: {}, {}", getTestName(), count); + for(int i = 0; i < count; i++) { + int clientport = PortAssignment.unique(); + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress(clientport), + new InetSocketAddress(PortAssignment.unique()))); + tmpdir[i] = ClientBase.createTmpDir(); + port[i] = clientport; + } + + ByteBuffer initialMsg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1); + + /* + * Start server 0 + */ + QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2); + peer.startLeaderElection(); + FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0); + thread.start(); + + /* + * Start mock server 1 + */ + QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2); + cnxManagers[0] = new QuorumCnxManager(mockPeer); + cnxManagers[0].listener.start(); + + cnxManagers[0].toSend(0l, initialMsg); + + /* + * Start mock server 2 + */ + mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2); + cnxManagers[1] = new QuorumCnxManager(mockPeer); + cnxManagers[1].listener.start(); + + cnxManagers[1].toSend(0l, initialMsg); + + /* + * Run another instance of leader election. + */ + thread.join(5000); + thread = new FLETestUtils.LEThread(peer, 0); + thread.start(); + + /* + * Send the same messages, this time should not make 0 the leader. + */ + cnxManagers[0].toSend(0l, initialMsg); + cnxManagers[1].toSend(0l, initialMsg); + + thread.join(5000); + + if (!thread.isAlive()) { + Assert.fail("Should not have joined"); + } + + } +} Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java?rev=1608648&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java Tue Jul 8 03:25:56 2014 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.test.ClientBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLELostMessageTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class); + + int count; + HashMap peers; + File tmpdir[]; + int port[]; + + QuorumCnxManager cnxManager; + + @Before + public void setUp() throws Exception { + count = 3; + + peers = new HashMap(count); + tmpdir = new File[count]; + port = new int[count]; + } + + @After + public void tearDown() throws Exception { + cnxManager.halt(); + } + + @Test + public void testLostMessage() throws Exception { + LOG.info("TestLE: {}, {}", getTestName(), count); + for(int i = 0; i < count; i++) { + int clientport = PortAssignment.unique(); + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress(clientport), + new InetSocketAddress(PortAssignment.unique()))); + tmpdir[i] = ClientBase.createTmpDir(); + port[i] = clientport; + } + + /* + * Start server 0 + */ + QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2); + peer.startLeaderElection(); + FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1); + thread.start(); + + /* + * Start mock server 1 + */ + mockServer(); + thread.join(5000); + if (thread.isAlive()) { + Assert.fail("Threads didn't join"); + } + } + + void mockServer() throws InterruptedException, IOException { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2); + cnxManager = new QuorumCnxManager(peer); + cnxManager.listener.start(); + + cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0)); + cnxManager.recvQueue.take(); + cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0)); + } +} Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java?rev=1608648&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java Tue Jul 8 03:25:56 2014 @@ -0,0 +1,84 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.nio.ByteBuffer; + +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Assert; + +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; + +public class FLETestUtils { + protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class); + + /* + * Thread to run an instance of leader election for + * a given quorum peer. + */ + static class LEThread extends Thread { + private int i; + private QuorumPeer peer; + + LEThread(QuorumPeer peer, int i) { + this.i = i; + this.peer = peer; + LOG.info("Constructor: {}", getName()); + + } + + public void run() { + try { + Vote v = null; + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election: {}", i); + v = peer.getElectionAlg().lookForLeader(); + + if (v == null) { + Assert.fail("Thread " + i + " got a null vote"); + } + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: {}, {}", i, v.getId()); + + Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("Joining"); + } + } + + /* + * Creates a leader election notification message. + */ + static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){ + return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch); + } + +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Tue Jul 8 03:25:56 2014 @@ -152,8 +152,6 @@ public class FLENewEpochTest extends ZKT @Test public void testLENewEpoch() throws Exception { - FastLeaderElection le[] = new FastLeaderElection[count]; - LOG.info("TestLE: " + getTestName()+ ", " + count); for(int i = 0; i < count; i++) { peers.put(Long.valueOf(i), @@ -166,7 +164,7 @@ public class FLENewEpochTest extends ZKT port[i] = PortAssignment.unique(); } - for(int i = 1; i < le.length; i++) { + for(int i = 1; i < count; i++) { QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); peer.startLeaderElection(); LEThread thread = new LEThread(peer, i); Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java Tue Jul 8 03:25:56 2014 @@ -78,6 +78,7 @@ public class FLEPredicateTest extends ZK PortAssignment.unique(), 3, 0, 1000, 2, 2); MockFLE mock = new MockFLE(peer); + mock.start(); /* * Lower epoch must return false Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Tue Jul 8 03:25:56 2014 @@ -297,7 +297,6 @@ public class FLETest extends ZKTestCase * @throws Exception */ private void runElection(int rounds) throws Exception { - FastLeaderElection le[] = new FastLeaderElection[count]; ConcurrentHashMap > quora = new ConcurrentHashMap >(); @@ -322,7 +321,7 @@ public class FLETest extends ZKTestCase /* * Start one LEThread for each peer we want to run. */ - for(int i = 0; i < le.length; i++) { + for(int i = 0; i < count; i++) { QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); peer.startLeaderElection(); Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java Tue Jul 8 03:25:56 2014 @@ -140,8 +140,6 @@ public class FLEZeroWeightTest extends Z @Test public void testZeroWeightQuorum() throws Exception { - FastLeaderElection le[] = new FastLeaderElection[count]; - LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count); for(int i = 0; i < count; i++) { InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); @@ -153,7 +151,7 @@ public class FLEZeroWeightTest extends Z tmpdir[i] = ClientBase.createTmpDir(); } - for(int i = 0; i < le.length; i++) { + for(int i = 0; i < count; i++) { QuorumHierarchical hq = new QuorumHierarchical(qp); QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq); peer.startLeaderElection(); Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=1608648&r1=1608647&r2=1608648&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Tue Jul 8 03:25:56 2014 @@ -37,6 +37,7 @@ import org.apache.zookeeper.PortAssignme import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.quorum.Election; +import org.apache.zookeeper.server.quorum.FLELostMessageTest; import org.apache.zookeeper.server.quorum.LeaderElection; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.Vote;