Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 39960 invoked from network); 4 May 2010 21:46:05 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 May 2010 21:46:05 -0000 Received: (qmail 37446 invoked by uid 500); 4 May 2010 21:46:05 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 37399 invoked by uid 500); 4 May 2010 21:46:05 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 37391 invoked by uid 99); 4 May 2010 21:46:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 May 2010 21:46:05 +0000 X-ASF-Spam-Status: No, hits=-1323.5 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 May 2010 21:46:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 729F4238899B; Tue, 4 May 2010 21:45:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r941056 - in /hadoop/zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java src/java/test/org/apache/zookeeper/test/ClientBase.java Date: Tue, 04 May 2010 21:45:13 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100504214513.729F4238899B@eris.apache.org> Author: mahadev Date: Tue May 4 21:45:13 2010 New Revision: 941056 URL: http://svn.apache.org/viewvc?rev=941056&view=rev Log: ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev) Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Tue May 4 21:45:13 2010 @@ -44,6 +44,8 @@ BUGFIXES: ZOOKEEPER-758. zkpython segfaults on invalid acl with missing key (Kapil Thangavelu via henryr) + ZOOKEEPER-737. some 4 letter words may fail with netcat (nc). (mahadev) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Tue May 4 21:45:13 2010 @@ -39,7 +39,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -386,6 +385,29 @@ public class NIOServerCnxn implements Wa sendBuffer(closeConn); } + /** + * send buffer without using the asynchronous + * calls to selector and then close the socket + * @param bb + */ + void sendBufferSync(ByteBuffer bb) { + try { + /* configure socket to be blocking + * so that we dont have to do write in + * a tight while loop + */ + sock.configureBlocking(true); + if (bb != closeConn) { + if (sock != null) { + sock.write(bb); + } + packetSent(); + } + } catch (IOException ie) { + LOG.error("Error sending data synchronously ", ie); + } + } + void sendBuffer(ByteBuffer bb) { try { if (bb != closeConn) { @@ -497,6 +519,11 @@ public class NIOServerCnxn implements Wa if (isPayload) { // not the case for 4letterword readPayload(); } + else { + // four letter words take care + // need not do anything else + return; + } } } if (k.isWritable()) { @@ -886,6 +913,30 @@ public class NIOServerCnxn implements Wa } /** + * clean up the socket related to a command and also make sure we flush the + * data before we do that + * + * @param pwriter + * the pwriter for a command socket + */ + private void cleanupWriterSocket(PrintWriter pwriter) { + try { + if (pwriter != null) { + pwriter.flush(); + pwriter.close(); + } + } catch (Exception e) { + LOG.info("Error closing PrintWriter ", e); + } finally { + try { + close(); + } catch (Exception e) { + LOG.error("Error closing a command socket ", e); + } + } + } + + /** * This class wraps the sendBuffer method of NIOServerCnxn. It is * responsible for chunking up the response to a client. Rather * than cons'ing up a response fully in memory, which may be large @@ -893,50 +944,23 @@ public class NIOServerCnxn implements Wa */ private class SendBufferWriter extends Writer { private StringBuffer sb = new StringBuffer(); - - /* FYI: clearing the READ interestOps on the key results in - * the cnxn being closed in doIO. - */ - + /** * Check if we are ready to send another chunk. * @param force force sending, even if not a full chunk */ private void checkFlush(boolean force) { if ((force && sb.length() > 0) || sb.length() > 2048) { - sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); - // including op_read keeps doio from closing the conn - wakeup(SelectionKey.OP_READ - | SelectionKey.OP_WRITE); - + sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes())); // clear our internal buffer sb.setLength(0); } } - /** - * Wakeup the selector. This is necessary as the cnxn is - * waiting for interestOps to be satisfied. If we want the - * selector to wakeup immediately (rather than the last - * select(timeout) period) we need to force a wakeup. - * @param sel the new interest ops - */ - private void wakeup(int sel) { - synchronized(factory) { - sk.selector().wakeup(); - sk.interestOps(sel); - } - } - @Override public void close() throws IOException { if (sb == null) return; - checkFlush(true); - - // nothing left, please close - wakeup(SelectionKey.OP_WRITE); - sb = null; // clear out the ref to ensure no reuse } @@ -954,10 +978,254 @@ public class NIOServerCnxn implements Wa private static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests"; + + /** + * Set of threads for commmand ports. All the 4 + * letter commands are run via a thread. Each class + * maps to a correspoding 4 letter command. CommandThread + * is the abstract class from which all the others inherit. + */ + private abstract class CommandThread extends Thread { + PrintWriter pw; + + CommandThread(PrintWriter pw) { + this.pw = pw; + } + + public void run() { + try { + commandRun(); + } catch (IOException ie) { + LOG.error("Error in running command ", ie); + } finally { + cleanupWriterSocket(pw); + } + } + + public abstract void commandRun() throws IOException; + } + + private class RuokCommand extends CommandThread { + public RuokCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + pw.print("imok"); + + } + } + + private class TraceMaskCommand extends CommandThread { + TraceMaskCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + long traceMask = ZooTrace.getTextTraceLevel(); + pw.print(traceMask); + } + } + + private class SetTraceMaskCommand extends CommandThread { + long trace = 0; + SetTraceMaskCommand(PrintWriter pw, long trace) { + super(pw); + this.trace = trace; + } + + @Override + public void commandRun() { + pw.print(trace); + } + } + + private class EnvCommand extends CommandThread { + EnvCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + List env = Environment.list(); + + pw.println("Environment:"); + for(Environment.Entry e : env) { + pw.print(e.getKey()); + pw.print("="); + pw.println(e.getValue()); + } + + } + } + + private class ConfCommand extends CommandThread { + ConfCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + zk.dumpConf(pw); + } + } + } + + private class StatResetCommand extends CommandThread { + public StatResetCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + zk.serverStats().reset(); + pw.println("Server stats reset."); + } + } + } + + private class CnxnStatResetCommand extends CommandThread { + public CnxnStatResetCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + synchronized(factory.cnxns){ + for(NIOServerCnxn c : factory.cnxns){ + c.getStats().reset(); + } + } + pw.println("Connection stats reset."); + } + } + } + + private class DumpCommand extends CommandThread { + public DumpCommand(PrintWriter pw) { + super(pw); + } + + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + pw.println("SessionTracker dump:"); + zk.sessionTracker.dumpSessions(pw); + pw.println("ephemeral nodes dump:"); + zk.dumpEphemerals(pw); + } + } + } + + private class StatCommand extends CommandThread { + int len; + public StatCommand(PrintWriter pw, int len) { + super(pw); + this.len = len; + } + + @SuppressWarnings("unchecked") + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } + else { + pw.print("Zookeeper version: "); + pw.println(Version.getFullVersion()); + if (len == statCmd) { + LOG.info("Stat command output"); + pw.println("Clients:"); + // clone should be faster than iteration + // ie give up the cnxns lock faster + HashSet cnxnset; + synchronized(factory.cnxns){ + cnxnset = (HashSet)factory + .cnxns.clone(); + } + for(NIOServerCnxn c : cnxnset){ + ((CnxnStats)c.getStats()) + .dumpConnectionInfo(pw, true); + } + pw.println(); + } + pw.print(zk.serverStats().toString()); + pw.print("Node count: "); + pw.println(zk.getZKDatabase().getNodeCount()); + } + + } + } + + private class ConsCommand extends CommandThread { + public ConsCommand(PrintWriter pw) { + super(pw); + } + + @SuppressWarnings("unchecked") + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + // clone should be faster than iteration + // ie give up the cnxns lock faster + HashSet cnxns; + synchronized (factory.cnxns) { + cnxns = (HashSet) factory.cnxns.clone(); + } + for (NIOServerCnxn c : cnxns) { + ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false); + } + pw.println(); + } + } + } + + private class WatchCommand extends CommandThread { + int len = 0; + public WatchCommand(PrintWriter pw, int len) { + super(pw); + this.len = len; + } + @Override + public void commandRun() { + if (zk == null) { + pw.println(ZK_NOT_SERVING); + } else { + DataTree dt = zk.getZKDatabase().getDataTree(); + if (len == wchsCmd) { + dt.dumpWatchesSummary(pw); + } else if (len == wchpCmd) { + dt.dumpWatches(pw, true); + } else { + dt.dumpWatches(pw, false); + } + pw.println(); + } + } + } + + /** Return if four letter word found and responded to, otw false **/ private boolean checkFourLetterWord(final SelectionKey k, final int len) - throws IOException + throws IOException { // We take advantage of the limited size of the length to look // for cmds. They are all 4-bytes which fits inside of an int @@ -969,197 +1237,77 @@ public class NIOServerCnxn implements Wa + sock.socket().getRemoteSocketAddress()); packetReceived(); + /** cancel the selection key to remove the socket handling + * from selector. This is to prevent netcat problem wherein + * netcat immediately closes the sending side after sending the + * commands and still keeps the receiving channel open. + * The idea is to remove the selectionkey from the selector + * so that the selector does not notice the closed read on the + * socket channel and keep the socket alive to write the data to + * and makes sure to close the socket after its done writing the data + */ + if (k != null) { + try { + k.cancel(); + } catch(Exception e) { + LOG.error("Error cancelling command selection key ", e); + } + } + final PrintWriter pwriter = new PrintWriter( new BufferedWriter(new SendBufferWriter())); - boolean threadWillClosePWriter = false; - try { - if (len == ruokCmd) { - pwriter.print("imok"); - return true; - } else if (len == getTraceMaskCmd) { - long traceMask = ZooTrace.getTextTraceLevel(); - pwriter.print(traceMask); - return true; - } else if (len == setTraceMaskCmd) { - int rc = sock.read(incomingBuffer); - if (rc < 0) { - throw new IOException("Read error"); - } - - incomingBuffer.flip(); - long traceMask = incomingBuffer.getLong(); - ZooTrace.setTextTraceLevel(traceMask); - pwriter.print(traceMask); - return true; - } else if (len == enviCmd) { - List env = Environment.list(); - - pwriter.println("Environment:"); - for(Environment.Entry e : env) { - pwriter.print(e.getKey()); - pwriter.print("="); - pwriter.println(e.getValue()); - } - return true; - } else if (len == confCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - zk.dumpConf(pwriter); - return true; - } else if (len == srstCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - zk.serverStats().reset(); - pwriter.println("Server stats reset."); - return true; - } else if (len == crstCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - synchronized(factory.cnxns){ - for(NIOServerCnxn c : factory.cnxns){ - c.getStats().reset(); - } - } - pwriter.println("Connection stats reset."); - return true; - } else if (len == dumpCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @Override - public void run() { - try { - pwriter.println("SessionTracker dump:"); - zk.sessionTracker.dumpSessions(pwriter); - pwriter.println("ephemeral nodes dump:"); - zk.dumpEphemerals(pwriter); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - - return true; - } else if (len == statCmd || len == srvrCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @SuppressWarnings("unchecked") - @Override - public void run() { - try { - pwriter.print("Zookeeper version: "); - pwriter.println(Version.getFullVersion()); - if (len == statCmd) { - pwriter.println("Clients:"); - // clone should be faster than iteration - // ie give up the cnxns lock faster - HashSet cnxns; - synchronized(factory.cnxns){ - cnxns = (HashSet)factory - .cnxns.clone(); - } - for(NIOServerCnxn c : cnxns){ - ((CnxnStats)c.getStats()) - .dumpConnectionInfo(pwriter, true); - } - pwriter.println(); - } - pwriter.print(zk.serverStats().toString()); - pwriter.print("Node count: "); - pwriter.println(zk.getZKDatabase().getNodeCount()); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } else if (len == consCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @SuppressWarnings("unchecked") - @Override - public void run() { - try { - // clone should be faster than iteration - // ie give up the cnxns lock faster - HashSet cnxns; - synchronized(factory.cnxns){ - cnxns = (HashSet)factory - .cnxns.clone(); - } - for(NIOServerCnxn c : cnxns){ - ((CnxnStats)c.getStats()) - .dumpConnectionInfo(pwriter, false); - } - pwriter.println(); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { - if (zk == null) { - pwriter.println(ZK_NOT_SERVING); - return true; - } - // this could be a long running task, spawn a thread so - // that we don't block the processing of other requests - threadWillClosePWriter = true; - new Thread() { - @Override - public void run() { - try { - DataTree dt = zk.getZKDatabase().getDataTree(); - if (len == wchsCmd) { - dt.dumpWatchesSummary(pwriter); - } else if (len == wchpCmd) { - dt.dumpWatches(pwriter, true); - } else { - dt.dumpWatches(pwriter, false); - } - pwriter.println(); - } finally { - pwriter.flush(); - pwriter.close(); - } - } - }.start(); - return true; - } - } finally { - // if we spawned a thread it is responsible for eventually - // flushing and closeing the writer - if (!threadWillClosePWriter) { - pwriter.flush(); - pwriter.close(); + if (len == ruokCmd) { + RuokCommand ruok = new RuokCommand(pwriter); + ruok.start(); + return true; + } else if (len == getTraceMaskCmd) { + TraceMaskCommand tmask = new TraceMaskCommand(pwriter); + tmask.start(); + return true; + } else if (len == setTraceMaskCmd) { + int rc = sock.read(incomingBuffer); + if (rc < 0) { + throw new IOException("Read error"); } + + incomingBuffer.flip(); + long traceMask = incomingBuffer.getLong(); + ZooTrace.setTextTraceLevel(traceMask); + SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask); + setMask.start(); + return true; + } else if (len == enviCmd) { + EnvCommand env = new EnvCommand(pwriter); + env.start(); + return true; + } else if (len == confCmd) { + ConfCommand ccmd = new ConfCommand(pwriter); + ccmd.start(); + return true; + } else if (len == srstCmd) { + StatResetCommand strst = new StatResetCommand(pwriter); + strst.start(); + return true; + } else if (len == crstCmd) { + CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter); + crst.start(); + return true; + } else if (len == dumpCmd) { + DumpCommand dump = new DumpCommand(pwriter); + dump.start(); + return true; + } else if (len == statCmd || len == srvrCmd) { + StatCommand stat = new StatCommand(pwriter, len); + stat.start(); + return true; + } else if (len == consCmd) { + ConsCommand cons = new ConsCommand(pwriter); + cons.start(); + return true; + } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { + WatchCommand wcmd = new WatchCommand(pwriter, len); + wcmd.start(); + return true; } return false; } @@ -1618,7 +1766,8 @@ public class NIOServerCnxn implements Wa pwriter.print(((SocketChannel)channel).socket() .getRemoteSocketAddress()); pwriter.print("["); - pwriter.print(Integer.toHexString(sk.interestOps())); + pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps()) + : "0"); pwriter.print("](queued="); pwriter.print(getOutstandingRequests()); pwriter.print(",recved="); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=941056&r1=941055&r2=941056&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue May 4 21:45:13 2010 @@ -226,6 +226,8 @@ public abstract class ClientBase extends OutputStream outstream = sock.getOutputStream(); outstream.write(cmd.getBytes()); outstream.flush(); + // this replicates NC - close the output stream before reading + sock.shutdownOutput(); reader = new BufferedReader(