Author: fpj
Date: Wed Nov 17 14:59:29 2010
New Revision: 1036071
URL: http://svn.apache.org/viewvc?rev=1036071&view=rev
Log:
ZOOKEEPER-900. FLE implementation should be improved to use non-blocking sockets (vishal via
fpj)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Nov 17 14:59:29 2010
@@ -153,6 +153,8 @@ BUGFIXES:
ZOOKEEPER-930. Hedwig c++ client uses a non thread safe logging library (ivan via breed)
+ ZOOKEEPER-900. FLE implementation should be improved to use non-blocking sockets (vishal
via fpj)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
Wed Nov 17 14:59:29 2010
@@ -41,7 +41,7 @@ import org.apache.zookeeper.server.quoru
public class LeaderElection implements Election {
private static final Logger LOG = Logger.getLogger(LeaderElection.class);
- protected static Random epochGen = new Random();
+ protected static final Random epochGen = new Random();
protected QuorumPeer self;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Wed Nov 17 14:59:29 2010
@@ -18,15 +18,17 @@
package org.apache.zookeeper.server.quorum;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
-import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -141,49 +143,41 @@ public class QuorumCnxManager {
* @param sid
*/
public void testInitiateConnection(long sid) throws Exception {
- SocketChannel channel;
- if(LOG.isDebugEnabled()){
- LOG.debug("Opening channel to server " + sid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening channel to server " + sid);
}
-
- channel = SocketChannel.open();
- channel.socket().connect(self.getVotingView().get(sid).electionAddr, cnxTO);
- channel.socket().setTcpNoDelay(true);
- initiateConnection(channel, sid);
+ Socket sock = new Socket();
+ setSockOpts(sock);
+ sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
+ initiateConnection(sock, sid);
}
/**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
-
- public boolean initiateConnection(SocketChannel s, Long sid) {
- try {
+ public boolean initiateConnection(Socket sock, Long sid) {
+ DataOutputStream dout = null;
+ try {
// Sending id and challenge
- byte[] msgBytes = new byte[8];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- msgBuffer.putLong(self.getId());
- msgBuffer.position(0);
- s.write(msgBuffer);
+ dout = new DataOutputStream(sock.getOutputStream());
+ dout.writeLong(self.getId());
+ dout.flush();
} catch (IOException e) {
- LOG.warn("Exception reading or writing challenge: ", e);
+ LOG.warn("Ignoring exception reading or writing challenge: ", e);
+ closeSocket(sock);
return false;
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
- try {
- LOG.info("Have smaller server identifier, so dropping the connection: ("
+
- sid + ", " + self.getId() + ")");
- s.socket().close();
- } catch (IOException e) {
- LOG.warn("Ignoring exception when closing socket or trying to "
- + "reopen connection: ", e);
- }
- // Otherwise proceed with the connection
- } else {
- SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
+ LOG.info("Have smaller server identifier, so dropping the " +
+ "connection: (" + sid + ", " + self.getId() + ")");
+ closeSocket(sock);
+ // Otherwise proceed with the connection
+ } else {
+ SendWorker sw = new SendWorker(sock, sid);
+ RecvWorker rw = new RecvWorker(sock, sid);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -215,19 +209,14 @@ public class QuorumCnxManager {
* possible long value to lose the challenge.
*
*/
- boolean receiveConnection(SocketChannel s) {
+ public boolean receiveConnection(Socket sock) {
Long sid = null;
try {
- byte[] msgBytes = new byte[8];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-
- s.read(msgBuffer);
- msgBuffer.position(0);
-
// Read server id
- sid = Long.valueOf(msgBuffer.getLong());
- if(sid == QuorumPeer.OBSERVER_ID){
+ DataInputStream din = new DataInputStream(sock.getInputStream());
+ sid = din.readLong();
+ if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
@@ -237,38 +226,34 @@ public class QuorumCnxManager {
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
- LOG.warn("Exception reading or writing challenge: "
- + e.toString());
+ closeSocket(sock);
+ LOG.warn("Exception reading or writing challenge: " + e.toString());
return false;
}
//If wins the challenge, then close the new connection.
if (sid < self.getId()) {
- try {
- /*
- * This replica might still believe that the connection to sid
- * is up, so we have to shut down the workers before trying to
- * open a new connection.
- */
- SendWorker sw = senderWorkerMap.get(sid);
- if(sw != null)
- sw.finish();
-
- /*
- * Now we start a new connection
- */
- LOG.debug("Create new connection to server: " + sid);
- s.socket().close();
- connectOne(sid);
-
- } catch (IOException e) {
- LOG.info("Error when closing socket or trying to reopen connection: "
- + e.toString());
+ /*
+ * This replica might still believe that the connection to sid is
+ * up, so we have to shut down the workers before trying to open a
+ * new connection.
+ */
+ SendWorker sw = senderWorkerMap.get(sid);
+ if (sw != null) {
+ sw.finish();
}
- //Otherwise start worker threads to receive data.
+
+ /*
+ * Now we start a new connection
+ */
+ LOG.debug("Create new connection to server: " + sid);
+ closeSocket(sock);
+ connectOne(sid);
+
+ // Otherwise start worker threads to receive data.
} else {
- SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
+ SendWorker sw = new SendWorker(sock, sid);
+ RecvWorker rw = new RecvWorker(sock, sid);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -306,10 +291,10 @@ public class QuorumCnxManager {
} catch (InterruptedException e) {
LOG.warn("Exception when loopbacking", e);
}
- /*
- * Otherwise send to the corresponding thread to send.
- */
- } else
+ /*
+ * Otherwise send to the corresponding thread to send.
+ */
+ } else {
try {
/*
* Start a new connection if doesn't have one already.
@@ -330,14 +315,15 @@ public class QuorumCnxManager {
} else {
LOG.error("No queue for server " + sid);
}
- }
-
+ }
+
connectOne(sid);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to put message in queue.",
e);
- }
+ }
+ }
}
/**
@@ -349,31 +335,31 @@ public class QuorumCnxManager {
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) == null){
InetSocketAddress electionAddr;
- if(self.quorumPeers.containsKey(sid))
- electionAddr =
- self.quorumPeers.get(sid).electionAddr;
- else{
+ if (self.quorumPeers.containsKey(sid)) {
+ electionAddr = self.quorumPeers.get(sid).electionAddr;
+ } else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
- SocketChannel channel;
- if(LOG.isDebugEnabled()){
- LOG.debug("Opening channel to server " + sid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening channel to server " + sid);
}
-
- channel = SocketChannel.open();
- channel.socket().connect(self.getView().get(sid).electionAddr, cnxTO);
- channel.socket().setTcpNoDelay(true);
- initiateConnection(channel, sid);
+ Socket sock = new Socket();
+ setSockOpts(sock);
+ sock.connect(self.getView().get(sid).electionAddr, cnxTO);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to server " + sid);
+ }
+ initiateConnection(sock, sid);
} catch (UnresolvedAddressException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, also UAE cannot be wrapped cleanly
// so we log the exception in order to capture this critical
// detail.
LOG.warn("Cannot open channel to " + sid
- + " at election address " + electionAddr,
- e);
+ + " at election address " + electionAddr, e);
throw e;
} catch (IOException e) {
LOG.warn("Cannot open channel to " + sid
@@ -407,8 +393,9 @@ public class QuorumCnxManager {
boolean haveDelivered() {
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
LOG.debug("Queue size: " + queue.size());
- if (queue.size() == 0)
+ if (queue.size() == 0) {
return true;
+ }
}
return false;
@@ -428,11 +415,36 @@ public class QuorumCnxManager {
/**
* A soft halt simply finishes workers.
*/
- public void softHalt(){
- for(SendWorker sw: senderWorkerMap.values()){
- LOG.debug("Halting sender: " + sw);
- sw.finish();
- }
+ public void softHalt() {
+ for (SendWorker sw : senderWorkerMap.values()) {
+ LOG.debug("Halting sender: " + sw);
+ sw.finish();
+ }
+ }
+
+ /**
+ * Helper method to set socket options.
+ *
+ * @param sock
+ * Reference to socket
+ */
+ private void setSockOpts(Socket sock) throws SocketException {
+ sock.setTcpNoDelay(true);
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ }
+
+ /**
+ * Helper method to close a socket.
+ *
+ * @param sock
+ * Reference to socket
+ */
+ private void closeSocket(Socket sock) {
+ try {
+ sock.close();
+ } catch (IOException ie) {
+ LOG.error("Exception while closing", ie);
+ }
}
/**
@@ -440,7 +452,8 @@ public class QuorumCnxManager {
*/
public class Listener extends Thread {
- volatile ServerSocketChannel ss = null;
+ volatile ServerSocket ss = null;
+
/**
* Sleeps on accept().
*/
@@ -449,37 +462,44 @@ public class QuorumCnxManager {
int numRetries = 0;
while((!shutdown) && (numRetries < 3)){
try {
- ss = ServerSocketChannel.open();
- int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
- ss.socket().setReuseAddress(true);
+ ss = new ServerSocket();
+ ss.setReuseAddress(true);
+ int port = self.quorumPeers.get(self.getId()).electionAddr
+ .getPort();
InetSocketAddress addr = new InetSocketAddress(port);
LOG.info("My election bind port: " + addr.toString());
- setName(addr.toString());
- ss.socket().bind(addr);
-
+ setName(self.quorumPeers.get(self.getId()).electionAddr
+ .toString());
+ ss.bind(addr);
while (!shutdown) {
- SocketChannel client = ss.accept();
- Socket sock = client.socket();
- sock.setTcpNoDelay(true);
-
- LOG.debug("Connection request "
- + sock.getRemoteSocketAddress());
-
- LOG.debug("Connection request: " + self.getId());
+ Socket client = ss.accept();
+ setSockOpts(client);
+ LOG.info("Received connection request "
+ + client.getRemoteSocketAddress());
receiveConnection(client);
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
+ try {
+ ss.close();
+ Thread.sleep(1000);
+ } catch (IOException ie) {
+ LOG.error("Error closing server socket", ie);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while sleeping. " +
+ "Ignoring exception", ie);
+ }
}
}
LOG.info("Leaving listener");
- if(!shutdown)
- LOG.fatal("As I'm leaving the listener thread, " +
- "I won't be able to participate in leader " +
- "election any longer: " +
- self.quorumPeers.get(self.getId()).electionAddr);
+ if (!shutdown) {
+ LOG.fatal("As I'm leaving the listener thread, "
+ + "I won't be able to participate in leader "
+ + "election any longer: "
+ + self.quorumPeers.get(self.getId()).electionAddr);
+ }
}
/**
@@ -505,22 +525,31 @@ public class QuorumCnxManager {
*/
class SendWorker extends Thread {
Long sid;
- SocketChannel channel;
+ Socket sock;
RecvWorker recvWorker;
volatile boolean running = true;
+ DataOutputStream dout;
/**
* An instance of this thread receives messages to send
* through a queue and sends them to the server sid.
*
- * @param channel SocketChannel
- * @param sid Server identifier
+ * @param sock
+ * Socket to remote peer
+ * @param sid
+ * Server identifier of remote peer
*/
- SendWorker(SocketChannel channel, Long sid) {
+ SendWorker(Socket sock, Long sid) {
this.sid = sid;
- this.channel = channel;
+ this.sock = sock;
recvWorker = null;
-
+ try {
+ dout = new DataOutputStream(sock.getOutputStream());
+ } catch (IOException e) {
+ LOG.error("Unable to access socket output stream", e);
+ closeSocket(sock);
+ running = false;
+ }
LOG.debug("Address of remote peer: " + this.sid);
}
@@ -538,8 +567,8 @@ public class QuorumCnxManager {
}
synchronized boolean finish() {
- if(LOG.isDebugEnabled()){
- LOG.debug("Calling finish");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Calling finish for " + sid);
}
if(!running){
@@ -550,59 +579,59 @@ public class QuorumCnxManager {
}
running = false;
-
- try{
- channel.close();
- } catch (IOException e) {
- LOG.warn("Exception while closing socket");
- }
- //channel = null;
+ closeSocket(sock);
+ // channel = null;
this.interrupt();
- if (recvWorker != null)
+ if (recvWorker != null) {
recvWorker.finish();
-
- if(LOG.isDebugEnabled()){
+ }
+
+ if (LOG.isDebugEnabled()) {
LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
}
- senderWorkerMap.remove(sid);
+ senderWorkerMap.remove(sid, this);
return running;
}
synchronized void send(ByteBuffer b) throws IOException {
- byte[] msgBytes = new byte[b.capacity()
- + (Integer.SIZE / 8)];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- msgBuffer.putInt(b.capacity());
-
- msgBuffer.put(b.array(), 0, b.capacity());
- msgBuffer.position(0);
- if(channel != null)
- channel.write(msgBuffer);
- else
- throw new IOException("SocketChannel is null");
+ byte[] msgBytes = new byte[b.capacity()];
+ try {
+ b.position(0);
+ b.get(msgBytes);
+ } catch (BufferUnderflowException be) {
+ LOG.fatal("BufferUnderflowException ", be);
+ return;
+ }
+ dout.writeInt(b.capacity());
+ dout.write(b.array());
+ dout.flush();
}
@Override
public void run() {
- try{
- ByteBuffer b = lastMessageSent.get(sid);
- if(b != null) send(b);
+ try {
+ ByteBuffer b = lastMessageSent.get(sid);
+ if (b != null) {
+ send(b);
+ }
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
- while (running && !shutdown && channel != null) {
+ while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
- if(bq != null)
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
+ .get(sid);
+ if (bq != null) {
b = bq.poll(1000, TimeUnit.MILLISECONDS);
- else {
- LOG.error("No queue of incoming messages for server " + sid);
+ } else {
+ LOG.error("No queue of incoming messages for " +
+ "server " + sid);
break;
}
@@ -630,12 +659,22 @@ public class QuorumCnxManager {
*/
class RecvWorker extends Thread {
Long sid;
- SocketChannel channel;
+ Socket sock;
volatile boolean running = true;
+ DataInputStream din;
- RecvWorker(SocketChannel channel, Long sid) {
+ RecvWorker(Socket sock, Long sid) {
this.sid = sid;
- this.channel = channel;
+ this.sock = sock;
+ try {
+ din = new DataInputStream(sock.getInputStream());
+ // OK to wait until socket disconnects while reading.
+ sock.setSoTimeout(0);
+ } catch (IOException e) {
+ LOG.error("Error while accessing socket for " + sid, e);
+ closeSocket(sock);
+ running = false;
+ }
}
/**
@@ -660,54 +699,33 @@ public class QuorumCnxManager {
public void run() {
try {
byte[] size = new byte[4];
- ByteBuffer msgLength = ByteBuffer.wrap(size);
- while (running && !shutdown && channel != null) {
+ while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
- while (msgLength.hasRemaining()) {
- if (channel.read(msgLength) < 0) {
- throw new IOException("Channel eof");
- }
- }
- msgLength.position(0);
- int length = msgLength.getInt();
- if(length <= 0) {
- throw new IOException("Invalid packet length:" + length);
+ int length = din.readInt();
+ if (length <= 0 || length > PACKETMAXSIZE) {
+ throw new IOException(
+ "Received packet with invalid packet: "
+ + length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
- if (length > PACKETMAXSIZE) {
- throw new IOException("Invalid packet of length " + length);
- }
byte[] msgArray = new byte[length];
+ din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
- int numbytes = 0;
- int temp_numbytes = 0;
- while (message.hasRemaining()) {
- temp_numbytes = channel.read(message);
- if(temp_numbytes < 0) {
- throw new IOException("Channel eof before end");
- }
- numbytes += temp_numbytes;
- }
- message.position(0);
synchronized (recvQueue) {
- recvQueue
- .put(new Message(message.duplicate(), sid));
+ recvQueue.put(new Message(message.duplicate(), sid));
}
- msgLength.position(0);
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " + e);
} finally {
- try{
- channel.socket().close();
- } catch (IOException e) {
- LOG.warn("Exception while trying to close channel");
+ if (sock != null) {
+ closeSocket(sock);
}
}
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1036071&r1=1036070&r2=1036071&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed
Nov 17 14:59:29 2010
@@ -22,9 +22,11 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.net.Socket;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.io.*;
import org.apache.log4j.Logger;
import org.apache.zookeeper.PortAssignment;
@@ -165,6 +167,7 @@ public class CnxManagerTest extends ZKTe
}
}
+
@Test
public void testCnxManagerTimeout() throws Exception {
Random rand = new Random();
@@ -256,4 +259,31 @@ public class CnxManagerTest extends ZKTe
LOG.info("Socket has been closed as expected");
}
}
-}
\ No newline at end of file
+
+ /*
+ * Test if a receiveConnection is able to timeout on socket errors
+ */
+ @Test
+ public void testSocketTimeout() throws Exception {
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000,
2, 2);
+ QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+ QuorumCnxManager.Listener listener = cnxManager.listener;
+ if(listener != null){
+ listener.start();
+ } else {
+ LOG.error("Null listener when initializing cnx manager");
+ }
+ int port = peers.get(peer.getId()).electionAddr.getPort();
+ LOG.info("Election port: " + port);
+ InetSocketAddress addr = new InetSocketAddress(port);
+ Thread.sleep(1000);
+
+ Socket sock = new Socket();
+ sock.connect(peers.get(new Long(1)).electionAddr, 5000);
+ long begin = System.currentTimeMillis();
+ // Read without sending data. Verify timeout.
+ cnxManager.receiveConnection(sock);
+ long end = System.currentTimeMillis();
+ if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited
more than necessary");
+ }
+}
|