Author: mahadev
Date: Fri Apr 3 22:01:18 2009
New Revision: 761816
URL: http://svn.apache.org/viewvc?rev=761816&view=rev
Log:
ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election) (flavio via
mahadev)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Apr 3 22:01:18 2009
@@ -35,7 +35,9 @@
tickTime from config is lost, cannot start quorum (phunt via mahadev)
ZOOKEEPER-360. WeakHashMap in Bookie.java causes NPE (flavio via mahadev)
-
+
+ ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election)
+(flavio via mahadev)
IMPROVEMENTS:
ZOOKEEPER-308. improve the atomic broadcast performance 3x.
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
Fri Apr 3 22:01:18 2009
@@ -562,6 +562,7 @@
n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
", " + n.state + ", " + n.sid);
if (n.epoch > logicalclock) {
+ LOG.debug("Increasing logical clock: " + n.epoch);
logicalclock = n.epoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Fri Apr 3 22:01:18 2009
@@ -238,13 +238,8 @@
SendWorker sw = senderWorkerMap.get(sid);
LOG.info("Create new connection to server: " + sid);
- //sw.connect();
s.socket().close();
- if(sw != null) sw.finish();
- SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
- if (channel.isConnected()) {
- initiateConnection(channel, sid);
- }
+ connectOne(sid);
} catch (IOException e) {
LOG.info("Error when closing socket or trying to reopen connection: "
@@ -329,7 +324,7 @@
* @param sid server id
*/
- void connectOne(long sid){
+ synchronized void connectOne(long sid){
if ((senderWorkerMap.get(sid) == null)) {
SocketChannel channel;
try {
@@ -395,13 +390,13 @@
*/
class Listener extends Thread {
- ServerSocketChannel ss = null;
+ volatile ServerSocketChannel ss = null;
/**
* Sleeps on accept().
*/
@Override
public void run() {
- ServerSocketChannel ss = null;
+ //ss = null;
try {
ss = ServerSocketChannel.open();
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
@@ -421,13 +416,17 @@
receiveConnection(client);
}
} catch (IOException e) {
- System.err.println("Listener.run: " + e.getMessage());
+ LOG.error("Listener.run: " + e.getMessage());
}
}
void halt(){
try{
- if((ss != null) && (ss.isOpen())) ss.close();
+ LOG.debug("Trying to close listener: " + ss);
+ if(ss != null)/* && (ss.isOpen()))*/{
+ LOG.debug("Closing listener: " + self.getId());
+ ss.close();
+ }
} catch (IOException e){
LOG.warn("Exception when shutting down listener: " + e);
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=761816&r1=761815&r2=761816&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Fri
Apr 3 22:01:18 2009
@@ -47,7 +47,7 @@
volatile int [] round;
Semaphore start0;
- Semaphore finish3;
+ Semaphore finish3, finish0;
@Override
public void setUp() throws Exception {
@@ -66,6 +66,7 @@
round[2] = 0;
start0 = new Semaphore(0);
+ finish0 = new Semaphore(0);
finish3 = new Semaphore(0);
LOG.info("SetUp " + getName());
@@ -117,11 +118,18 @@
switch(i){
case 0:
LOG.info("First peer, do nothing, just join");
- flag = false;
+ if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){
+ //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+ LOG.info("Setting flag to false");
+ flag = false;
+ }
break;
case 1:
LOG.info("Second entering case");
- if(round[1] != 0) flag = false;
+ if(round[1] != 0){
+ finish0.release();
+ flag = false;
+ }
else{
finish3.acquire();
start0.release();
@@ -167,7 +175,8 @@
thread.start();
threads.add(thread);
}
- start0.acquire();
+ if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
+ fail("First leader election failed");
QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2,
2, 2);
peer.startLeaderElection();
|