Author: mahadev
Date: Tue May 4 21:49:50 2010
New Revision: 941061
URL: http://svn.apache.org/viewvc?rev=941061&view=rev
Log:
ZOOKEEPER-737. some 4 letter words may fail with netcat (nc) (mahadev)
Modified:
hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/ClientBase.java
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=941061&r1=941060&r2=941061&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Tue May 4 21:49:50 2010
@@ -36,6 +36,8 @@ BUGFIXES:
ZOOKEEPER-758. zkpython segfaults on invalid acl with missing key
(Kapil Thangavelu via henryr)
+ ZOOKEEPER-737. some 4 letter words may fail with netcat (nc) (mahadev)
+
Release 3.3.0 - 2010-03-24
Non-backward compatible changes:
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=941061&r1=941060&r2=941061&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
Tue May 4 21:49:50 2010
@@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@@ -386,6 +385,29 @@ public class NIOServerCnxn implements Wa
sendBuffer(closeConn);
}
+ /**
+ * send buffer without using the asynchronous
+ * calls to selector and then close the socket
+ * @param bb
+ */
+ void sendBufferSync(ByteBuffer bb) {
+ try {
+ /* configure socket to be blocking
+ * so that we dont have to do write in
+ * a tight while loop
+ */
+ sock.configureBlocking(true);
+ if (bb != closeConn) {
+ if (sock != null) {
+ sock.write(bb);
+ }
+ packetSent();
+ }
+ } catch (IOException ie) {
+ LOG.error("Error sending data synchronously ", ie);
+ }
+ }
+
void sendBuffer(ByteBuffer bb) {
try {
if (bb != closeConn) {
@@ -497,6 +519,11 @@ public class NIOServerCnxn implements Wa
if (isPayload) { // not the case for 4letterword
readPayload();
}
+ else {
+ // four letter words take care
+ // need not do anything else
+ return;
+ }
}
}
if (k.isWritable()) {
@@ -886,6 +913,30 @@ public class NIOServerCnxn implements Wa
}
/**
+ * clean up the socket related to a command and also make sure we flush the
+ * data before we do that
+ *
+ * @param pwriter
+ * the pwriter for a command socket
+ */
+ private void cleanupWriterSocket(PrintWriter pwriter) {
+ try {
+ if (pwriter != null) {
+ pwriter.flush();
+ pwriter.close();
+ }
+ } catch (Exception e) {
+ LOG.info("Error closing PrintWriter ", e);
+ } finally {
+ try {
+ close();
+ } catch (Exception e) {
+ LOG.error("Error closing a command socket ", e);
+ }
+ }
+ }
+
+ /**
* This class wraps the sendBuffer method of NIOServerCnxn. It is
* responsible for chunking up the response to a client. Rather
* than cons'ing up a response fully in memory, which may be large
@@ -893,50 +944,23 @@ public class NIOServerCnxn implements Wa
*/
private class SendBufferWriter extends Writer {
private StringBuffer sb = new StringBuffer();
-
- /* FYI: clearing the READ interestOps on the key results in
- * the cnxn being closed in doIO.
- */
-
+
/**
* Check if we are ready to send another chunk.
* @param force force sending, even if not a full chunk
*/
private void checkFlush(boolean force) {
if ((force && sb.length() > 0) || sb.length() > 2048) {
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
- // including op_read keeps doio from closing the conn
- wakeup(SelectionKey.OP_READ
- | SelectionKey.OP_WRITE);
-
+ sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
// clear our internal buffer
sb.setLength(0);
}
}
- /**
- * Wakeup the selector. This is necessary as the cnxn is
- * waiting for interestOps to be satisfied. If we want the
- * selector to wakeup immediately (rather than the last
- * select(timeout) period) we need to force a wakeup.
- * @param sel the new interest ops
- */
- private void wakeup(int sel) {
- synchronized(factory) {
- sk.selector().wakeup();
- sk.interestOps(sel);
- }
- }
-
@Override
public void close() throws IOException {
if (sb == null) return;
-
checkFlush(true);
-
- // nothing left, please close
- wakeup(SelectionKey.OP_WRITE);
-
sb = null; // clear out the ref to ensure no reuse
}
@@ -954,10 +978,254 @@ public class NIOServerCnxn implements Wa
private static final String ZK_NOT_SERVING =
"This ZooKeeper instance is not currently serving requests";
+
+ /**
+ * Set of threads for commmand ports. All the 4
+ * letter commands are run via a thread. Each class
+ * maps to a correspoding 4 letter command. CommandThread
+ * is the abstract class from which all the others inherit.
+ */
+ private abstract class CommandThread extends Thread {
+ PrintWriter pw;
+
+ CommandThread(PrintWriter pw) {
+ this.pw = pw;
+ }
+
+ public void run() {
+ try {
+ commandRun();
+ } catch (IOException ie) {
+ LOG.error("Error in running command ", ie);
+ } finally {
+ cleanupWriterSocket(pw);
+ }
+ }
+
+ public abstract void commandRun() throws IOException;
+ }
+
+ private class RuokCommand extends CommandThread {
+ public RuokCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ pw.print("imok");
+
+ }
+ }
+
+ private class TraceMaskCommand extends CommandThread {
+ TraceMaskCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ long traceMask = ZooTrace.getTextTraceLevel();
+ pw.print(traceMask);
+ }
+ }
+
+ private class SetTraceMaskCommand extends CommandThread {
+ long trace = 0;
+ SetTraceMaskCommand(PrintWriter pw, long trace) {
+ super(pw);
+ this.trace = trace;
+ }
+
+ @Override
+ public void commandRun() {
+ pw.print(trace);
+ }
+ }
+
+ private class EnvCommand extends CommandThread {
+ EnvCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ List<Environment.Entry> env = Environment.list();
+
+ pw.println("Environment:");
+ for(Environment.Entry e : env) {
+ pw.print(e.getKey());
+ pw.print("=");
+ pw.println(e.getValue());
+ }
+
+ }
+ }
+
+ private class ConfCommand extends CommandThread {
+ ConfCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ zk.dumpConf(pw);
+ }
+ }
+ }
+
+ private class StatResetCommand extends CommandThread {
+ public StatResetCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ zk.serverStats().reset();
+ pw.println("Server stats reset.");
+ }
+ }
+ }
+
+ private class CnxnStatResetCommand extends CommandThread {
+ public CnxnStatResetCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ synchronized(factory.cnxns){
+ for(NIOServerCnxn c : factory.cnxns){
+ c.getStats().reset();
+ }
+ }
+ pw.println("Connection stats reset.");
+ }
+ }
+ }
+
+ private class DumpCommand extends CommandThread {
+ public DumpCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ pw.println("SessionTracker dump:");
+ zk.sessionTracker.dumpSessions(pw);
+ pw.println("ephemeral nodes dump:");
+ zk.dumpEphemerals(pw);
+ }
+ }
+ }
+
+ private class StatCommand extends CommandThread {
+ int len;
+ public StatCommand(PrintWriter pw, int len) {
+ super(pw);
+ this.len = len;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ pw.print("Zookeeper version: ");
+ pw.println(Version.getFullVersion());
+ if (len == statCmd) {
+ LOG.info("Stat command output");
+ pw.println("Clients:");
+ // clone should be faster than iteration
+ // ie give up the cnxns lock faster
+ HashSet<NIOServerCnxn> cnxnset;
+ synchronized(factory.cnxns){
+ cnxnset = (HashSet<NIOServerCnxn>)factory
+ .cnxns.clone();
+ }
+ for(NIOServerCnxn c : cnxnset){
+ ((CnxnStats)c.getStats())
+ .dumpConnectionInfo(pw, true);
+ }
+ pw.println();
+ }
+ pw.print(zk.serverStats().toString());
+ pw.print("Node count: ");
+ pw.println(zk.getZKDatabase().getNodeCount());
+ }
+
+ }
+ }
+
+ private class ConsCommand extends CommandThread {
+ public ConsCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ // clone should be faster than iteration
+ // ie give up the cnxns lock faster
+ HashSet<NIOServerCnxn> cnxns;
+ synchronized (factory.cnxns) {
+ cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
+ }
+ for (NIOServerCnxn c : cnxns) {
+ ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false);
+ }
+ pw.println();
+ }
+ }
+ }
+
+ private class WatchCommand extends CommandThread {
+ int len = 0;
+ public WatchCommand(PrintWriter pw, int len) {
+ super(pw);
+ this.len = len;
+ }
+ @Override
+ public void commandRun() {
+ if (zk == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ DataTree dt = zk.getZKDatabase().getDataTree();
+ if (len == wchsCmd) {
+ dt.dumpWatchesSummary(pw);
+ } else if (len == wchpCmd) {
+ dt.dumpWatches(pw, true);
+ } else {
+ dt.dumpWatches(pw, false);
+ }
+ pw.println();
+ }
+ }
+ }
+
+
/** Return if four letter word found and responded to, otw false **/
private boolean checkFourLetterWord(final SelectionKey k, final int len)
- throws IOException
+ throws IOException
{
// We take advantage of the limited size of the length to look
// for cmds. They are all 4-bytes which fits inside of an int
@@ -969,197 +1237,77 @@ public class NIOServerCnxn implements Wa
+ sock.socket().getRemoteSocketAddress());
packetReceived();
+ /** cancel the selection key to remove the socket handling
+ * from selector. This is to prevent netcat problem wherein
+ * netcat immediately closes the sending side after sending the
+ * commands and still keeps the receiving channel open.
+ * The idea is to remove the selectionkey from the selector
+ * so that the selector does not notice the closed read on the
+ * socket channel and keep the socket alive to write the data to
+ * and makes sure to close the socket after its done writing the data
+ */
+ if (k != null) {
+ try {
+ k.cancel();
+ } catch(Exception e) {
+ LOG.error("Error cancelling command selection key ", e);
+ }
+ }
+
final PrintWriter pwriter = new PrintWriter(
new BufferedWriter(new SendBufferWriter()));
- boolean threadWillClosePWriter = false;
- try {
- if (len == ruokCmd) {
- pwriter.print("imok");
- return true;
- } else if (len == getTraceMaskCmd) {
- long traceMask = ZooTrace.getTextTraceLevel();
- pwriter.print(traceMask);
- return true;
- } else if (len == setTraceMaskCmd) {
- int rc = sock.read(incomingBuffer);
- if (rc < 0) {
- throw new IOException("Read error");
- }
-
- incomingBuffer.flip();
- long traceMask = incomingBuffer.getLong();
- ZooTrace.setTextTraceLevel(traceMask);
- pwriter.print(traceMask);
- return true;
- } else if (len == enviCmd) {
- List<Environment.Entry> env = Environment.list();
-
- pwriter.println("Environment:");
- for(Environment.Entry e : env) {
- pwriter.print(e.getKey());
- pwriter.print("=");
- pwriter.println(e.getValue());
- }
- return true;
- } else if (len == confCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- zk.dumpConf(pwriter);
- return true;
- } else if (len == srstCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- zk.serverStats().reset();
- pwriter.println("Server stats reset.");
- return true;
- } else if (len == crstCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- synchronized(factory.cnxns){
- for(NIOServerCnxn c : factory.cnxns){
- c.getStats().reset();
- }
- }
- pwriter.println("Connection stats reset.");
- return true;
- } else if (len == dumpCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- // this could be a long running task, spawn a thread so
- // that we don't block the processing of other requests
- threadWillClosePWriter = true;
- new Thread() {
- @Override
- public void run() {
- try {
- pwriter.println("SessionTracker dump:");
- zk.sessionTracker.dumpSessions(pwriter);
- pwriter.println("ephemeral nodes dump:");
- zk.dumpEphemerals(pwriter);
- } finally {
- pwriter.flush();
- pwriter.close();
- }
- }
- }.start();
-
- return true;
- } else if (len == statCmd || len == srvrCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- // this could be a long running task, spawn a thread so
- // that we don't block the processing of other requests
- threadWillClosePWriter = true;
- new Thread() {
- @SuppressWarnings("unchecked")
- @Override
- public void run() {
- try {
- pwriter.print("Zookeeper version: ");
- pwriter.println(Version.getFullVersion());
- if (len == statCmd) {
- pwriter.println("Clients:");
- // clone should be faster than iteration
- // ie give up the cnxns lock faster
- HashSet<NIOServerCnxn> cnxns;
- synchronized(factory.cnxns){
- cnxns = (HashSet<NIOServerCnxn>)factory
- .cnxns.clone();
- }
- for(NIOServerCnxn c : cnxns){
- ((CnxnStats)c.getStats())
- .dumpConnectionInfo(pwriter, true);
- }
- pwriter.println();
- }
- pwriter.print(zk.serverStats().toString());
- pwriter.print("Node count: ");
- pwriter.println(zk.getZKDatabase().getNodeCount());
- } finally {
- pwriter.flush();
- pwriter.close();
- }
- }
- }.start();
- return true;
- } else if (len == consCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- // this could be a long running task, spawn a thread so
- // that we don't block the processing of other requests
- threadWillClosePWriter = true;
- new Thread() {
- @SuppressWarnings("unchecked")
- @Override
- public void run() {
- try {
- // clone should be faster than iteration
- // ie give up the cnxns lock faster
- HashSet<NIOServerCnxn> cnxns;
- synchronized(factory.cnxns){
- cnxns = (HashSet<NIOServerCnxn>)factory
- .cnxns.clone();
- }
- for(NIOServerCnxn c : cnxns){
- ((CnxnStats)c.getStats())
- .dumpConnectionInfo(pwriter, false);
- }
- pwriter.println();
- } finally {
- pwriter.flush();
- pwriter.close();
- }
- }
- }.start();
- return true;
- } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
- if (zk == null) {
- pwriter.println(ZK_NOT_SERVING);
- return true;
- }
- // this could be a long running task, spawn a thread so
- // that we don't block the processing of other requests
- threadWillClosePWriter = true;
- new Thread() {
- @Override
- public void run() {
- try {
- DataTree dt = zk.getZKDatabase().getDataTree();
- if (len == wchsCmd) {
- dt.dumpWatchesSummary(pwriter);
- } else if (len == wchpCmd) {
- dt.dumpWatches(pwriter, true);
- } else {
- dt.dumpWatches(pwriter, false);
- }
- pwriter.println();
- } finally {
- pwriter.flush();
- pwriter.close();
- }
- }
- }.start();
- return true;
- }
- } finally {
- // if we spawned a thread it is responsible for eventually
- // flushing and closeing the writer
- if (!threadWillClosePWriter) {
- pwriter.flush();
- pwriter.close();
+ if (len == ruokCmd) {
+ RuokCommand ruok = new RuokCommand(pwriter);
+ ruok.start();
+ return true;
+ } else if (len == getTraceMaskCmd) {
+ TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
+ tmask.start();
+ return true;
+ } else if (len == setTraceMaskCmd) {
+ int rc = sock.read(incomingBuffer);
+ if (rc < 0) {
+ throw new IOException("Read error");
}
+
+ incomingBuffer.flip();
+ long traceMask = incomingBuffer.getLong();
+ ZooTrace.setTextTraceLevel(traceMask);
+ SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
+ setMask.start();
+ return true;
+ } else if (len == enviCmd) {
+ EnvCommand env = new EnvCommand(pwriter);
+ env.start();
+ return true;
+ } else if (len == confCmd) {
+ ConfCommand ccmd = new ConfCommand(pwriter);
+ ccmd.start();
+ return true;
+ } else if (len == srstCmd) {
+ StatResetCommand strst = new StatResetCommand(pwriter);
+ strst.start();
+ return true;
+ } else if (len == crstCmd) {
+ CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
+ crst.start();
+ return true;
+ } else if (len == dumpCmd) {
+ DumpCommand dump = new DumpCommand(pwriter);
+ dump.start();
+ return true;
+ } else if (len == statCmd || len == srvrCmd) {
+ StatCommand stat = new StatCommand(pwriter, len);
+ stat.start();
+ return true;
+ } else if (len == consCmd) {
+ ConsCommand cons = new ConsCommand(pwriter);
+ cons.start();
+ return true;
+ } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
+ WatchCommand wcmd = new WatchCommand(pwriter, len);
+ wcmd.start();
+ return true;
}
return false;
}
@@ -1618,7 +1766,8 @@ public class NIOServerCnxn implements Wa
pwriter.print(((SocketChannel)channel).socket()
.getRemoteSocketAddress());
pwriter.print("[");
- pwriter.print(Integer.toHexString(sk.interestOps()));
+ pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps())
+ : "0");
pwriter.print("](queued=");
pwriter.print(getOutstandingRequests());
pwriter.print(",recved=");
Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=941061&r1=941060&r2=941061&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/ClientBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/ClientBase.java
Tue May 4 21:49:50 2010
@@ -224,6 +224,8 @@ public abstract class ClientBase extends
OutputStream outstream = sock.getOutputStream();
outstream.write(cmd.getBytes());
outstream.flush();
+ // this replicates NC - close the output stream before reading
+ sock.shutdownOutput();
reader =
new BufferedReader(
|