Author: breed
Date: Tue Dec 15 22:23:55 2009
New Revision: 891034
URL: http://svn.apache.org/viewvc?rev=891034&view=rev
Log:
ZOOKEEPER-599. Changes to FLE and QuorumCnxManager to support Observers
ZOOKEEPER-506. QuorumBase should use default leader election
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Dec 15 22:23:55 2009
@@ -192,6 +192,10 @@
ZOOKEEPER-425. Add OSGi metadata to zookeeper.jar (david bosschaert via breed)
+ ZOOKEEPER-599. Changes to FLE and QuorumCnxManager to support Observers (fpj via breed)
+
+ ZOOKEEPER-506. QuorumBase should use default leader election (fpj via breed)
+
NEW FEATURES:
ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Tue Dec 15 22:23:55 2009
@@ -29,6 +29,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
@@ -188,81 +189,110 @@
try{
response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
-
- // Receive new message
- LOG.debug("Receive new message.");
- if (response.buffer.capacity() < 28) {
- LOG.error("Got a short response: "
- + response.buffer.capacity());
- continue;
- }
- response.buffer.clear();
-
- // State of peer that sent this message
- QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
- switch (response.buffer.getInt()) {
- case 0:
- ackstate = QuorumPeer.ServerState.LOOKING;
- break;
- case 1:
- ackstate = QuorumPeer.ServerState.FOLLOWING;
- break;
- case 2:
- ackstate = QuorumPeer.ServerState.LEADING;
- break;
- }
-
- // Instantiate Notification and set its attributes
- Notification n = new Notification();
- n.leader = response.buffer.getLong();
- n.zxid = response.buffer.getLong();
- n.epoch = response.buffer.getLong();
- n.state = ackstate;
- n.sid = response.sid;
/*
- * If this server is looking, then send proposed leader
+ * If it is from an observer, respond right away.
+ * Note that the following predicate assumes that
+ * if a server is not a follower, then it must be
+ * an observer. If we ever have any other type of
+ * learner in the future, we'll have to change the
+ * way we check for observers.
*/
+ if(!self.getVotingView().containsKey(response.sid)){
+ Vote current = self.getCurrentVote();
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ current.id,
+ current.zxid,
+ logicalclock,
+ self.getPeerState(),
+ response.sid);
+
+ sendqueue.offer(notmsg);
+ } else {
+ // Receive new message
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Receive new notification message. My id = "
+ + self.getId());
+ }
+
+ if (response.buffer.capacity() < 28) {
+ LOG.error("Got a short response: "
+ + response.buffer.capacity());
+ continue;
+ }
+ response.buffer.clear();
+
+ // State of peer that sent this message
+ QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+ switch (response.buffer.getInt()) {
+ case 0:
+ ackstate = QuorumPeer.ServerState.LOOKING;
+ break;
+ case 1:
+ ackstate = QuorumPeer.ServerState.FOLLOWING;
+ break;
+ case 2:
+ ackstate = QuorumPeer.ServerState.LEADING;
+ break;
+ case 3:
+ ackstate = QuorumPeer.ServerState.OBSERVING;
+ break;
+ }
+
+ // Instantiate Notification and set its attributes
+ Notification n = new Notification();
+ n.leader = response.buffer.getLong();
+ n.zxid = response.buffer.getLong();
+ n.epoch = response.buffer.getLong();
+ n.state = ackstate;
+ n.sid = response.sid;
- if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
- recvqueue.offer(n);
- if(recvqueue.size() == 0) LOG.debug("Message: " + n.sid);
- /*
- * Send a notification back if the peer that sent this
- * message is also looking and its logical clock is
- * lagging behind.
- */
- if((ackstate == QuorumPeer.ServerState.LOOKING)
- && (n.epoch < logicalclock)){
- Vote v = getVote();
- ToSend notmsg = new ToSend(ToSend.mType.notification,
- v.id,
- v.zxid,
- logicalclock,
- self.getPeerState(),
- response.sid);
- sendqueue.offer(notmsg);
- }
- } else {
/*
- * If this server is not looking, but the one that sent the ack
- * is looking, then send back what it believes to be the leader.
+ * If this server is looking, then send proposed leader
*/
- Vote current = self.getCurrentVote();
- if(ackstate == QuorumPeer.ServerState.LOOKING){
- LOG.info("Sending new notification.");
- ToSend notmsg = new ToSend(
- ToSend.mType.notification,
- current.id,
- current.zxid,
- logicalclock,
- self.getPeerState(),
- response.sid);
- sendqueue.offer(notmsg);
- }
+ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
+ recvqueue.offer(n);
+
+ /*
+ * Send a notification back if the peer that sent this
+ * message is also looking and its logical clock is
+ * lagging behind.
+ */
+ if((ackstate == QuorumPeer.ServerState.LOOKING)
+ && (n.epoch < logicalclock)){
+ Vote v = getVote();
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ v.id,
+ v.zxid,
+ logicalclock,
+ self.getPeerState(),
+ response.sid);
+ sendqueue.offer(notmsg);
+ }
+ } else {
+ /*
+ * If this server is not looking, but the one that sent the ack
+ * is looking, then send back what it believes to be the leader.
+ */
+ Vote current = self.getCurrentVote();
+ if(ackstate == QuorumPeer.ServerState.LOOKING){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Sending new notification. My id = " +
+ self.getId() + ", Recipient = " +
+ response.sid);
+ }
+ ToSend notmsg = new ToSend(
+ ToSend.mType.notification,
+ current.id,
+ current.zxid,
+ logicalclock,
+ self.getPeerState(),
+ response.sid);
+ sendqueue.offer(notmsg);
+ }
+ }
}
-
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
@@ -550,9 +580,49 @@
}
/**
+ * A learning state can be either FOLLOWING or OBSERVING.
+ * This method simply decides which one depending on the
+ * role of the server.
+ *
+ * @return ServerState
+ */
+ private ServerState learningState(){
+ if(self.getPeerType() == LearnerType.PARTICIPANT){
+ LOG.debug("I'm a participant: " + self.getId());
+ return ServerState.FOLLOWING;
+ }
+ else{
+ LOG.debug("I'm an observer: " + self.getId());
+ return ServerState.OBSERVING;
+ }
+ }
+
+ /**
+ * Returns the initial vote value of server identifier.
+ *
+ * @return long
+ */
+ private long getInitId(){
+ if(self.getPeerType() == LearnerType.PARTICIPANT)
+ return self.getId();
+ else return Long.MIN_VALUE;
+ }
+
+ /**
+ * Returns initial last logged zxid.
+ *
+ * @return long
+ */
+ private long getInitLastLoggedZxid(){
+ if(self.getPeerType() == LearnerType.PARTICIPANT)
+ return self.getLastLoggedZxid();
+ else return Long.MIN_VALUE;
+ }
+
+ /**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
- * sends notifications to al other peers.
+ * sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
@@ -573,10 +643,11 @@
synchronized(this){
logicalclock++;
- updateProposal(self.getId(), self.getLastLoggedZxid());
+ updateProposal(getInitId(), getInitLastLoggedZxid());
}
- LOG.info("New election: " + proposedZxid);
+ LOG.info("New election. My id = " + self.getId() +
+ ", Proposed zxid = " + proposedZxid);
sendNotifications();
/*
@@ -623,14 +694,17 @@
logicalclock = n.epoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid,
- self.getId(), self.getLastLoggedZxid()))
+ getInitId(), getInitLastLoggedZxid()))
updateProposal(n.leader, n.zxid);
else
- updateProposal(self.getId(),
- self.getLastLoggedZxid());
+ updateProposal(getInitId(),
+ getInitLastLoggedZxid());
sendNotifications();
} else if (n.epoch < logicalclock) {
- LOG.info("n.epoch < logicalclock");
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Notification epoch is smaller than logicalclock.
n.epoch = " + n.epoch
+ + ", Logical clock" + logicalclock);
+ }
break;
} else if (totalOrderPredicate(n.leader, n.zxid,
proposedLeader, proposedZxid)) {
@@ -639,47 +713,66 @@
sendNotifications();
}
- LOG.info("Adding vote");
-
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding vote: From = " + n.sid +
+ ", Proposed leader = " + n.leader +
+ ", Porposed zxid = " + n.zxid +
+ ", Proposed epoch = " + n.epoch);
+ }
+
+ /*
+ * Only proceed if the vote comes from a replica in the
+ * voting view.
+ */
+ if(self.getVotingView().containsKey(n.sid)){
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
- //If have received from all nodes, then terminate
- if ((self.quorumPeers.size() == recvset.size()) &&
- (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
+ //If have received from all nodes, then terminate
+ if ((self.getVotingView().size() == recvset.size()) &&
+ (self.getQuorumVerifier().getWeight(proposedLeader) !=
0)){
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: learningState());
+ leaveInstance();
+ return new Vote(proposedLeader, proposedZxid);
- } else if (termPredicate(recvset,
- new Vote(proposedLeader, proposedZxid,
- logicalclock))) {
- //Otherwise, wait for a fixed amount of time
- LOG.debug("Passed predicate");
-
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait,
- TimeUnit.MILLISECONDS)) != null){
- if(totalOrderPredicate(n.leader, n.zxid,
- proposedLeader, proposedZxid)){
- recvqueue.put(n);
- break;
+ } else if (termPredicate(recvset,
+ new Vote(proposedLeader, proposedZxid,
+ logicalclock))) {
+
+ // Verify if there is any change in the proposed leader
+ while((n = recvqueue.poll(finalizeWait,
+ TimeUnit.MILLISECONDS)) != null){
+ if(totalOrderPredicate(n.leader, n.zxid,
+ proposedLeader, proposedZxid)){
+ recvqueue.put(n);
+ break;
+ }
}
- }
- if (n == null) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- LOG.info("About to leave instance:"
- + proposedLeader + ", " +
- proposedZxid + ", " + self.getId()
- + ", " + self.getPeerState());
- leaveInstance();
- return new Vote(proposedLeader,
- proposedZxid);
+ /*
+ * This predicate is true once we don't read any new
+ * relevant message from the reception queue
+ */
+ if (n == null) {
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: learningState());
+ if(LOG.isDebugEnabled()){
+ LOG.debug("About to leave FLE instance: Leader= "
+ + proposedLeader + ", Zxid = " +
+ proposedZxid + ", My id = " + self.getId()
+ + ", My state = " + self.getPeerState());
+ }
+
+ leaveInstance();
+ return new Vote(proposedLeader,
+ proposedZxid);
+ }
}
}
break;
+ case OBSERVING:
+ LOG.debug("Notification from observer: " + n.sid);
+ break;
default:
/*
* There is at most one leader for each epoch, so if a
@@ -696,7 +789,7 @@
n.zxid, n.epoch, n.state))
&& checkLeader(outofelection, n.leader,
n.epoch)) ){
self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ ServerState.LEADING: learningState());
leaveInstance();
return new Vote(n.leader, n.zxid);
@@ -717,7 +810,7 @@
synchronized(this){
logicalclock = n.epoch;
self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ ServerState.LEADING: learningState());
}
leaveInstance();
return new Vote(n.leader, n.zxid);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Tue Dec 15 22:23:55 2009
@@ -26,6 +26,7 @@
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
+import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -68,6 +69,12 @@
static final int MAX_CONNECTION_ATTEMPTS = 2;
/*
+ * Negative counter for observer server ids.
+ */
+
+ private long observerCounter = -1;
+
+ /*
* Local IP address
*/
final QuorumPeer self;
@@ -185,6 +192,8 @@
return false;
}
+
+
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
@@ -196,7 +205,6 @@
Long sid = null;
try {
- // Sending challenge and sid
byte[] msgBytes = new byte[8];
ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
@@ -205,8 +213,17 @@
// Read server id
sid = Long.valueOf(msgBuffer.getLong());
+ if(sid == QuorumPeer.OBSERVER_ID){
+ /*
+ * Choose identifier at random. We need a value to identify
+ * the connection.
+ */
+
+ sid = observerCounter--;
+ LOG.info("Setting arbitrary identifier to observer: " + sid);
+ }
} catch (IOException e) {
- LOG.info("Exception reading or writing challenge: "
+ LOG.warn("Exception reading or writing challenge: "
+ e.toString());
return false;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Tue Dec 15 22:23:55 2009
@@ -117,7 +117,7 @@
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
- /**
+ /*
* A peer can either be participating, which implies that it is willing to
* both vote in instances of consensus and to elect or become a Leader, or
* it may be observing in which case it isn't.
@@ -129,6 +129,17 @@
PARTICIPANT, OBSERVER;
}
+ /*
+ * To enable observers to have no identifier, we need a generic identifier
+ * at least for QuorumCnxManager. We use the following constant to as the
+ * value of such a generic identifier.
+ */
+
+ static final long OBSERVER_ID = Long.MAX_VALUE;
+
+ /*
+ * Default value of peer is participant
+ */
private LearnerType peerType = LearnerType.PARTICIPANT;
public LearnerType getPeerType() {
@@ -352,9 +363,7 @@
@Override
public synchronized void start() {
cnxnFactory.start();
- if (getPeerType() == LearnerType.PARTICIPANT) {
- startLeaderElection();
- }
+ startLeaderElection();
super.start();
}
@@ -513,10 +522,9 @@
protected Election makeLEStrategy(){
LOG.debug("Initializing leader election protocol...");
- // LeaderElection is the only implementation that correctly
- // transitions between LOOKING and OBSERVER
- if(electionAlg==null)
+ if(electionAlg==null){
return new LeaderElection(this);
+ }
return electionAlg;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
Tue Dec 15 22:23:55 2009
@@ -199,10 +199,7 @@
System.setProperty("zookeeper." + key, value);
}
}
- if (observers.size() > 0 && electionAlg != 0) {
- throw new IllegalArgumentException("Observers must currently be used with simple
leader election" +
- " (set electionAlg=0)");
- }
+
if (dataDir == null) {
throw new IllegalArgumentException("dataDir is not set");
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java Tue Dec
15 22:23:55 2009
@@ -27,6 +27,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -63,18 +64,27 @@
@Test
public void testObserver() throws Exception {
ClientBase.setupTestEnv();
- final int CLIENT_PORT_QP1 = 3181;
- final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
- final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
+
+ final int PORT_QP1 = PortAssignment.unique();
+ final int PORT_QP2 = PortAssignment.unique();
+ final int PORT_OBS = PortAssignment.unique();
+ final int PORT_QP_LE1 = PortAssignment.unique();
+ final int PORT_QP_LE2 = PortAssignment.unique();
+ final int PORT_OBS_LE = PortAssignment.unique();
+
+ final int CLIENT_PORT_QP1 = PortAssignment.unique();
+ final int CLIENT_PORT_QP2 = PortAssignment.unique();
+ final int CLIENT_PORT_OBS = PortAssignment.unique();
+
String quorumCfgSection =
- "electionAlg=0\n" +
- "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
- + ":" + (CLIENT_PORT_QP1 + 2)
- + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
- + ":" + (CLIENT_PORT_QP2 + 2)
+ "electionAlg=3\n" +
+ "server.1=localhost:" + (PORT_QP1)
+ + ":" + (PORT_QP_LE1)
+ + "\nserver.2=localhost:" + (PORT_QP2)
+ + ":" + (PORT_QP_LE2)
+ "\nserver.3=localhost:"
- + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
+ + (PORT_OBS)+ ":" + (PORT_OBS_LE) + ":observer";
String obsCfgSection = quorumCfgSection + "\npeerType=observer";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -175,11 +185,12 @@
@Test
public void testSingleObserver() throws IOException{
ClientBase.setupTestEnv();
- final int CLIENT_PORT_QP1 = 3181;
-
+ final int CLIENT_PORT_QP1 = PortAssignment.unique();
+ final int CLIENT_PORT_QP2 = PortAssignment.unique();
+
String quorumCfgSection =
- "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
- + ":" + (CLIENT_PORT_QP1 + 2) + "\npeerType=observer";
+ "server.1=localhost:" + (CLIENT_PORT_QP1)
+ + ":" + (CLIENT_PORT_QP2) + "\npeerType=observer";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
@@ -190,53 +201,4 @@
q1.shutdown();
}
- /**
- * Check that an attempt to instantiate an ensemble with observers and
- * electionAlg != 0 fails (this will be removed when the restriction is).
- * @throws Exception
- */
- @Test
- public void testLeaderElectionFail() throws Exception {
- ClientBase.setupTestEnv();
- final int CLIENT_PORT_QP1 = 3181;
- final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
- final int CLIENT_PORT_OBS = CLIENT_PORT_QP2 + 3;
-
- String quorumCfgSection =
- "electionAlg=1\n" +
- "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
- + ":" + (CLIENT_PORT_QP1 + 2)
- + "\nserver.2=localhost:" + (CLIENT_PORT_QP2 + 1)
- + ":" + (CLIENT_PORT_QP2 + 2)
- + "\nserver.3=localhost:"
- + (CLIENT_PORT_OBS+1)+ ":" + (CLIENT_PORT_OBS + 2) + ":observer";
- QuorumPeerConfig qpc = new QuorumPeerConfig();
-
- File tmpDir = ClientBase.createTmpDir();
- File confFile = new File(tmpDir, "zoo.cfg");
-
- FileWriter fwriter = new FileWriter(confFile);
- fwriter.write("tickTime=2000\n");
- fwriter.write("initLimit=10\n");
- fwriter.write("syncLimit=5\n");
-
- File dataDir = new File(tmpDir, "data");
- if (!dataDir.mkdir()) {
- throw new IOException("Unable to mkdir " + dataDir);
- }
- fwriter.write("dataDir=" + dataDir.toString() + "\n");
-
- fwriter.write("clientPort=" + CLIENT_PORT_QP1 + "\n");
- fwriter.write(quorumCfgSection + "\n");
- fwriter.flush();
- fwriter.close();
- try {
- qpc.parse(confFile.toString());
- } catch (ConfigException e) {
- LOG.info("Config exception caught as expected: " + e.getCause());
- return;
- }
-
- assertTrue("Didn't get the expected config exception", false);
- }
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=891034&r1=891033&r2=891034&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Tue Dec
15 22:23:55 2009
@@ -30,6 +30,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -47,6 +48,12 @@
private int port3;
private int port4;
private int port5;
+
+ private int portLE1;
+ private int portLE2;
+ private int portLE3;
+ private int portLE4;
+ private int portLE5;
@Override
protected void setUp() throws Exception {
@@ -66,11 +73,18 @@
port3 = PortAssignment.unique();
port4 = PortAssignment.unique();
port5 = PortAssignment.unique();
- hostPort = "127.0.0.1:" + port1
- + ",127.0.0.1:" + port2
- + ",127.0.0.1:" + port3
- + ",127.0.0.1:" + port4
- + ",127.0.0.1:" + port5;
+
+ portLE1 = PortAssignment.unique();
+ portLE2 = PortAssignment.unique();
+ portLE3 = PortAssignment.unique();
+ portLE4 = PortAssignment.unique();
+ portLE5 = PortAssignment.unique();
+
+ hostPort = "127.0.0.1:" + port1 + ":" + portLE1
+ + ",127.0.0.1:" + port2 + ":" + portLE2
+ + ",127.0.0.1:" + port3 + ":" + portLE3
+ + ",127.0.0.1:" + port4 + ":" + portLE4
+ + ",127.0.0.1:" + port5 + ":" + portLE5;
LOG.info("Ports are: " + hostPort);
s1dir = ClientBase.createTmpDir();
@@ -102,11 +116,26 @@
int initLimit = 3;
int syncLimit = 3;
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
- peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
port1 + 1000)));
- peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
port2 + 1000)));
- peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
port3 + 1000)));
- peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
port4 + 1000)));
- peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
port5 + 1000)));
+ peers.put(Long.valueOf(1), new QuorumServer(1,
+ new InetSocketAddress("127.0.0.1", port1 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE1 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(2), new QuorumServer(2,
+ new InetSocketAddress("127.0.0.1", port2 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE2 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(3), new QuorumServer(3,
+ new InetSocketAddress("127.0.0.1", port3 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE3 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(4), new QuorumServer(4,
+ new InetSocketAddress("127.0.0.1", port4 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE4 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(5), new QuorumServer(5,
+ new InetSocketAddress("127.0.0.1", port5 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE5 + 1000),
+ LearnerType.PARTICIPANT));
if (withObservers) {
peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;
@@ -114,19 +143,19 @@
}
LOG.info("creating QuorumPeer 1 port " + port1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
assertEquals(port1, s1.getClientPort());
LOG.info("creating QuorumPeer 2 port " + port2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
assertEquals(port2, s2.getClientPort());
LOG.info("creating QuorumPeer 3 port " + port3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
assertEquals(port3, s3.getClientPort());
LOG.info("creating QuorumPeer 4 port " + port4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
assertEquals(port4, s4.getClientPort());
LOG.info("creating QuorumPeer 5 port " + port5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
assertEquals(port5, s5.getClientPort());
if (withObservers) {
@@ -181,26 +210,42 @@
int initLimit = 3;
int syncLimit = 3;
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
- peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
port1 + 1000)));
- peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
port2 + 1000)));
- peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
port3 + 1000)));
- peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
port4 + 1000)));
- peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
port5 + 1000)));
+ peers.put(Long.valueOf(1), new QuorumServer(1,
+ new InetSocketAddress("127.0.0.1", port1 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE1 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(2), new QuorumServer(2,
+ new InetSocketAddress("127.0.0.1", port2 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE2 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(3), new QuorumServer(3,
+ new InetSocketAddress("127.0.0.1", port3 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE3 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(4), new QuorumServer(4,
+ new InetSocketAddress("127.0.0.1", port4 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE4 + 1000),
+ LearnerType.PARTICIPANT));
+ peers.put(Long.valueOf(5), new QuorumServer(5,
+ new InetSocketAddress("127.0.0.1", port5 + 1000),
+ new InetSocketAddress("127.0.0.1", portLE5 + 1000),
+ LearnerType.PARTICIPANT));
+
LOG.info("creating QuorumPeer 1 port " + port1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
assertEquals(port1, s1.getClientPort());
LOG.info("creating QuorumPeer 2 port " + port2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
assertEquals(port2, s2.getClientPort());
LOG.info("creating QuorumPeer 3 port " + port3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
assertEquals(port3, s3.getClientPort());
LOG.info("creating QuorumPeer 4 port " + port4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
assertEquals(port4, s4.getClientPort());
LOG.info("creating QuorumPeer 5 port " + port5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
assertEquals(port5, s5.getClientPort());
}
@@ -242,6 +287,7 @@
protected void shutdown(QuorumPeer qp) {
try {
qp.shutdown();
+ ((FastLeaderElection) qp.getElectionAlg()).shutdown();
qp.join(30000);
if (qp.isAlive()) {
fail("QP failed to shutdown in 30 seconds");
|