tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cos...@apache.org
Subject svn commit: r707271 [1/2] - in /tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async: ./ http/ socks/
Date Thu, 23 Oct 2008 04:45:06 GMT
Author: costin
Date: Wed Oct 22 21:45:05 2008
New Revision: 707271

URL: http://svn.apache.org/viewvc?rev=707271&view=rev
Log:
Another refactoring of the async stuff - moved all pieces in the same package, a bit better names.

Added one of the test servers - a simple non-blocking socks server. 
I've been using it on my machine ( and phone ) for few weeks, it's a nice way to see the log of all
connections made, and seems a good test for the nio madness.

The code no longer depends on tomcat.util - just nio Buffers and plain java.


Added:
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorPool.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java   (contents, props changed)
      - copied, changed from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/http/Http11Parser.java
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java   (with props)
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java   (with props)

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java?rev=707271&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java Wed Oct 22 21:45:05 2008
@@ -0,0 +1,399 @@
+/*
+ */
+package org.apache.tomcat.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Logger;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.tomcat.async.SelectorThread.SelectorFilter;
+
+
+public class JsseSslSupport implements SelectorFilter, Runnable {
+
+    static Logger log = Logger.getLogger("SSL");
+
+    SSLEngine sslEngine;
+    
+    public ByteBuffer sslRBuffer;
+    public ByteBuffer sslRBufferD;
+    public ByteBuffer sslWBuffer;
+    public ByteBuffer sslWBufferE;
+    
+    boolean handshakeDone = false;
+    boolean handshakeInProgress = false;
+    
+//    private ReentrantLock dataLock = new ReentrantLock();
+//    private Condition dataCond = dataLock.newCondition();
+
+//    private Object sda;
+
+    private SelectorChannel sdata;
+    SocketChannel socketChannel;
+    
+    //SelectorChannel sdata;
+    
+    public JsseSslSupport() {
+        try {
+            initSsl();
+        } catch (GeneralSecurityException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+    
+    public void initSsl() throws GeneralSecurityException {
+        SSLContext sslCtx = SSLContext.getInstance("tls");
+        sslCtx.init(null, new TrustManager[] {new BasicTrustManager()}, null);
+        
+        sslEngine = sslCtx.createSSLEngine();
+        
+        sslEngine.setUseClientMode(true);
+        //sslCtx.getServerSocketFactory().createServerSocket();
+        //sslCtx.getSocketFactory().createSocket(host, port)
+        // sslCtx.getSocketFactory...
+        
+        sslRBuffer = ByteBuffer.allocate(20480);
+        sslRBufferD = ByteBuffer.allocate(20480);
+        sslWBuffer = ByteBuffer.allocate(20480);
+        sslWBufferE = ByteBuffer.allocate(20480);
+        sslWBufferE.flip();
+    }
+    
+    /**
+     * The socket has some data. We need to consume it to our buffer, 
+     * and pass the notification. We need the intermediate buffer to 
+     * keep the encrypted data - SSLEngine uses 2 buffers. We still avoid
+     * a copy by decrypting directly to the destination buffer.
+     * 
+     * TODO: if buffer is full, stop read interest, when buffer is empty 
+     * re-enable.
+     */
+    public void dataReceived(SelectorChannel sdata) throws IOException {
+        if (handshakeInProgress) {
+            // shouldn't happen if blocking !
+            // need unwrap receives data
+        } else {
+            sdata.callback.dataReceived(sdata);
+        }
+    }
+    
+    public void dataWriteable(SelectorChannel sdata) throws IOException {
+        if (handshakeInProgress) {
+            // need to send buffer. 
+        } else {
+            sdata.callback.dataWriteable(sdata);
+        }
+    }
+
+    SSLEngineResult unwrapR;
+    
+    /**
+     * Typically called when a dataReceived callback is passed up.
+     * It's up to the higher layer to decide if it can handle more data 
+     * and disable read interest and manage its buffers.
+     * 
+     * We have to use one buffer.
+     * @throws IOException 
+     */
+    public int readNonBlocking(SelectorThread st, SelectorChannel sdata, 
+                               ByteBuffer bb) throws IOException {
+        
+        //sslRBuffer.clear();
+        int rd = 0;
+        boolean needsMore = true;
+        int size = bb.remaining();
+        
+        while (needsMore) {
+            // we may already have some data in sslRBuffer from last time.
+            if (unwrapR == null || 
+                    unwrapR.getStatus() == Status.BUFFER_UNDERFLOW) {
+                int sslrd = sdata.sel.readNonBlocking(sdata, sslRBuffer);
+                if (sslrd == 0) {
+                    return rd;
+                }
+                if (sslrd < 0) {
+                    // closed - but we may have read some
+                    if (rd > 0) {
+                        return rd;
+                    } else {
+                        return sslrd;
+                    }
+                }
+                //sslRBufferD.clear();
+                sslRBuffer.flip();
+            }
+            unwrapR = sslEngine.unwrap(sslRBuffer, bb);
+            if (bb == sslRBufferD) {
+               // bb.put(sslRBufferD, 0, 1);
+            }
+            
+            // TODO: what if bb is full ?
+            if (unwrapR != null) {
+                switch (unwrapR.getStatus()) {
+                case BUFFER_UNDERFLOW: {
+                    // Need more input - try to read again
+                    sslRBuffer.compact();
+                    break;
+                }
+                case OK: {
+                    if (unwrapR.getHandshakeStatus() != HandshakeStatus.FINISHED &&
+                        unwrapR.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+                            // remote side may have asked for a handshake.
+                            // TODO
+                            throw new IOException("Handshake in the middle, TODO");
+                    }
+                    needsMore = false;
+                    break;
+                }
+                case BUFFER_OVERFLOW: {
+                    // no more space in the app-side buffer.
+                    bb = sslRBufferD;
+                    break;
+                }
+                }
+            }
+            
+            //rd += sslrd; // TODO: based on what is added to bb;
+        
+            log.info("Unwrapped " + unwrapR + " " + needsMore + " " + rd);
+        }
+        return size - bb.remaining();
+    }
+    
+
+    @Override
+    public void readInterest(SelectorThread st, SelectorChannel sdata, boolean b) throws IOException {
+        if (!handshakeInProgress) {
+            st.readInterest(sdata, b);
+        }
+    }
+
+    @Override
+    public void writeInterest(SelectorThread st, SelectorChannel sdata) {
+        if (!handshakeInProgress) {
+            st.writeInterest(sdata);
+        } else {
+            sdata.interest |= SelectionKey.OP_WRITE;
+        }
+    }
+
+    
+    public int writeNonBlocking(SelectorThread st, SelectorChannel sdata, 
+                       ByteBuffer bb) throws IOException {
+
+        if (handshakeInProgress) {
+            return 0; // don't bother me.
+        }
+        
+        if (!handshakeDone) {
+            handshakeInProgress = true;
+            handleHandshking(sslEngine, sdata);
+            return 0; // can't write yet.
+        }
+        
+        sslWBufferE.clear(); // TODO: it may have data, write it ?
+        
+        SSLEngineResult wrap = sslEngine.wrap(bb, sslWBufferE);
+        if (wrap != null) {
+            switch (wrap.getStatus()) {
+            case BUFFER_UNDERFLOW: {
+                // Need more input - try to read again
+                break;
+            }
+            case OK: {
+                if (wrap.getHandshakeStatus() != HandshakeStatus.FINISHED &&
+                    wrap.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+                        // remote side may have asked for a handshake.
+                        // TODO
+                        throw new IOException("Handshake in the middle, TODO");
+                }
+            }
+            case BUFFER_OVERFLOW: {
+                // no more space in the app-side buffer.
+                //needsMore = false;
+            }
+            }
+        }
+        
+        sslWBufferE.flip();
+
+        // write what we have - more may be there
+        //int done = sdata.write(sslWBufferE);
+        int wrote = st.writeNonBlocking(sdata, sslWBufferE);
+
+        log.info("Encrypted " + wrote + " " + wrap);
+
+        
+        // TODO: how much was consummed from the bb
+        return wrote;
+    }
+
+    
+    // SSL handshake require slow tasks - that will need to be executed in a 
+    // thread anyways. Better to keep it simple ( the code is very complex ) - 
+    // and do the initial handshake in a thread, not in the IO thread.
+    // We'll need to unregister and register again from the selector.
+    private void handleHandshking(final SSLEngine sslEngine, 
+                                  final SelectorChannel sdata) {
+        log.info("Starting handshake");
+        handshakeInProgress = true;
+        SelectionKey sk = (SelectionKey) sdata.selKey;
+        this.sdata = sdata;
+        
+        //final int timeout = 2000;
+        try {
+            sdata.interest = sk.interestOps();
+            sk.cancel();
+            ((SocketChannel) sdata.channel).configureBlocking(true);
+            sdata.selKey = null;
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        new Thread(new Runnable() {
+            public void run() {
+                doHandshake(sslEngine, sdata);
+            }}).start();
+        //
+    }
+    
+    public void run() {
+        endHandshakeSelT(sdata);
+    }
+    
+    public void endHandshakeSelT(SelectorChannel sdata) {
+        try {
+            ((SocketChannel) sdata.channel).configureBlocking(false);
+            // register.
+            SelectionKey sk = ((SocketChannel) sdata.channel).register(((SelectorThreadNio) sdata.sel).selector, 
+                    sdata.interest, sdata);
+            sdata.selKey = sk;
+            
+            handshakeInProgress = false;
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        //..
+    }
+
+    /** 
+     * Handshake done - return socket to normal operation
+     * Called from background thread.
+     */
+    void endHandshake(SelectorChannel sdata) {
+        log.info("Handshake done");
+        handshakeDone = true;
+        try {
+            sdata.sel.runInSelectorThread(this);
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+    /**
+     * Actual handshake magic, in background thread.
+     */
+    private void doHandshake(final SSLEngine sslEngine, final SelectorChannel sdata) {
+        Runnable r;
+        try {
+
+            boolean initial = true;
+            SSLEngineResult wrap = null;
+            HandshakeStatus hstatus = sslEngine.getHandshakeStatus();
+            ByteBuffer bb = ByteBuffer.allocate(20480);
+            ByteBuffer rd = ByteBuffer.allocate(20480);
+            
+            while (hstatus != HandshakeStatus.FINISHED) {
+                log.info("doHandshake, status = " + hstatus + " " + sslEngine.getHandshakeStatus());
+                if (hstatus == HandshakeStatus.NEED_TASK) {
+                    while ((r = sslEngine.getDelegatedTask()) != null) {
+                        r.run();
+                    }
+                    log.info("Tasks done");
+                    hstatus = sslEngine.getHandshakeStatus();
+                } else if (hstatus == HandshakeStatus.NEED_WRAP 
+                        || initial) {
+                    initial = false;
+                    bb.clear();
+                    bb.flip();
+                    sslWBufferE.clear();
+                    wrap = sslEngine.wrap(bb, sslWBufferE);
+                    hstatus = wrap.getHandshakeStatus();
+
+                    sslWBufferE.flip();
+                    // blocking write
+                    int wcnt = sdata.sel.writeNonBlocking(sdata, sslWBufferE);
+                    log.info("Wrap done " + wcnt + " " + wrap);
+                } else if (hstatus == HandshakeStatus.NEED_UNWRAP) {
+                    log.info("Need handshake read " + hstatus);
+                    
+
+                    rd.clear();
+                    int rdB = 0;
+                    while (hstatus == HandshakeStatus.NEED_UNWRAP) {
+                        if (wrap.getStatus() == Status.BUFFER_UNDERFLOW || 
+                                bb.remaining() == 0) {
+                            bb.compact();
+                            // blocking read.
+                            rdB = sdata.sel.readNonBlocking(sdata, bb);
+                            bb.flip();
+                        }
+                        wrap = sslEngine.unwrap(bb, rd);
+                        hstatus = wrap.getHandshakeStatus();
+                        log.info("Unwrap part done " + rdB + " " + wrap);
+                    }
+                    
+                    log.info("Unwrap done " + sslEngine.getHandshakeStatus() );
+                    
+                    // rd may have some input bytes.
+                }
+            }
+            endHandshake(sdata);
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+    }
+
+
+    public static class BasicTrustManager implements X509TrustManager {
+
+        private X509Certificate[] chain;
+        
+        public void checkClientTrusted(X509Certificate[] chain, String authType)
+                throws CertificateException {
+            this.chain = chain;
+        }
+
+        public void checkServerTrusted(X509Certificate[] chain, String authType)
+                throws CertificateException {
+            this.chain = chain;
+        }
+
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
+
+
+    
+    
+    
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/JsseSslSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java Wed Oct 22 21:45:05 2008
@@ -13,12 +13,11 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.tomcat.util.net;
+package org.apache.tomcat.async;
 
 import java.io.IOException;
 import java.nio.channels.Channel;
 
-import org.apache.tomcat.util.net.SelectorThread.SelectorData;
 
 /**
  * Notiy user code of events. All methods are called from the selector thread,
@@ -35,44 +34,50 @@
     SelectorThread.DataEvents {
   
   /** 
-   * Called when the protocol is connected.
+   * Called when the protocol is connected, from the IO thread
+   * The channel is set with OP_READ interest by default.
    */
-  public void connected(SelectorData sdata) 
+  public void connected(SelectorChannel chdata) 
           throws IOException {
   }
 
   /**
-   * It is possible to write data. 
-   * For both read and write - re-enable interest if you want more data. 
+   * Called when it is possible to write data - typically the OS 
+   * buffer empty.
+   *
+   * The OP_WRITE interest is disabled before this method is called, you can
+   * enable it again when write() returns 0.
    */
-  public void dataWriteable(SelectorData sdata) throws IOException {
+  public void dataWriteable(SelectorChannel chdata) throws IOException {
   }
 
   /**
-   * Data available for read.
-   * For both read and write - re-enable interest if you want more data. 
+   * Data available for read, called from IO thread.
+   * You MUST read all data ( i.e. until read() returns 0).
+   *  
+   * OP_READ remain active - call readInterest(false) to disable.
    */
-  public void dataReceived(SelectorData sdata) throws IOException {
+  public void dataReceived(SelectorChannel chdata) throws IOException {
   }
   
   /** 
    * nextTimeEvent reached. 
    */
-  public void timeEvent(SelectorData sdata) {
+  public void timeEvent(SelectorChannel chdata) {
   }
 
   /**
  * @throws IOException  
    *  
    */
-  public void ioThreadRun(SelectorData sdata) throws IOException {
+  public void ioThreadRun(SelectorChannel chdata) throws IOException {
   }
 
   /** 
    * Close was detected, or an unhandled exception happened while processing
    * this callback.
    */
-  public void channelClosed(SelectorData sdata, Throwable ex) {
+  public void channelClosed(SelectorChannel chdata, Throwable ex) {
   }
   
   /**
@@ -84,7 +89,7 @@
    * TODO: is there any case where something else besides registering read
    * interest on the new connection is needed ? Maybe it could read some data ?
    */
-  public SelectorCallback connectionAccepted(SelectorData sdata, 
+  public SelectorCallback connectionAccepted(SelectorChannel chdata, 
                                              Channel sockC) {
     return null;
   }

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorCallback.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java?rev=707271&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java Wed Oct 22 21:45:05 2008
@@ -0,0 +1,124 @@
+package org.apache.tomcat.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.Channel;
+
+import org.apache.tomcat.async.SelectorThread.SelectorFilter;
+
+/** 
+ * This is a wrapper around the real channel, with extra info to make it easier
+ * to deal with non-blocking - callback, interest, etc.
+ * 
+ * It is stored as an attachment in the selector.
+ */
+public class SelectorChannel implements ByteChannel {
+    SelectorChannel(SelectorThread sel) {
+        this.sel = sel;
+    }
+
+    // APR long is wrapped in a ByteChannel as well - with few other longs.
+    Channel channel;
+    
+    Object selKey;
+
+    SelectorThread sel;
+    SelectorCallback callback;
+
+    SelectorCallback pendingCallback;
+
+    // Current interest, used internally to avoid waking up if no change
+    // Also used for connect and accept.
+    int interest;
+
+    // True if the callback wants to be notified of read/write
+    boolean writeInterest;
+    boolean readInterest;
+
+    /** 
+     * If != 0 - the callback will be notified closely after this time.
+     * Used for timeouts. 
+     */
+    long nextTimeEvent = 0;
+
+    // Saved to allow debug messages for bad interest/looping
+    int lastReadResult;
+    int zeroReads = 0;
+    int lastWriteResult;
+
+    SelectorFilter ssl;
+
+    int suspended = 0;
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SelData: ")
+        .append(writeInterest ? "W/" : "")
+        .append(readInterest ? "R/" : "").append("/")
+        .append(channel);
+        //append(selKey).
+        return sb.toString();
+    }
+    
+    public Channel getChannel() {
+        return channel;
+    }
+    
+    public boolean isOpen() {
+        return channel.isOpen();
+    }
+    
+    public void close() throws IOException {
+        sel.close(this);
+    }
+    
+    public int read(ByteBuffer bb) throws IOException {
+        if (ssl != null) {
+            return ssl.readNonBlocking(sel, this, bb);
+        } else {
+            return sel.readNonBlocking(this, bb);
+        }
+    }
+
+    public int write(ByteBuffer bb) throws IOException {
+        if (ssl != null) {
+            return ssl.writeNonBlocking(sel, this, bb);
+        } else {
+            return sel.writeNonBlocking(this, bb);
+        }
+    }
+    
+    public void readInterest(boolean b) throws IOException {
+        if (ssl != null) {
+            ssl.readInterest(sel, this, b);
+        } else {
+            sel.readInterest(this, b);
+        }
+    }
+
+    public void writeInterest() throws IOException {
+        if (ssl != null ) {
+            ssl.writeInterest(sel, this);
+        } else {
+            sel.writeInterest(this);
+        }
+    }
+ 
+    public InetAddress getAddress(boolean remote) {
+        return sel.getAddress(this, remote);
+    }
+
+    public int getPort(boolean remote) {
+        return sel.getPort(this, remote);
+    }
+    
+    public void updateCallback(SelectorCallback newCallback) {
+        sel.updateCallback(this, callback, newCallback);
+    }
+    
+    public void runInSelectorThread(Runnable t) throws IOException {
+        sel.runInSelectorThread(t);
+    }
+}
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorPool.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorPool.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorPool.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java Wed Oct 22 21:45:05 2008
@@ -13,7 +13,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.tomcat.util.net;
+package org.apache.tomcat.async;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,7 +36,7 @@
   int seq;
   
   SelectorThread selector;
-  private boolean daemon;
+  private boolean daemon = true;
   static SelectorPool defaultSPool = new SelectorPool();
   
   public SelectorPool() {

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorPool.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java Wed Oct 22 21:45:05 2008
@@ -13,12 +13,12 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.tomcat.util.net;
+package org.apache.tomcat.async;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
 
 /**
  * Abstract NIO/APR to avoid some of the complexity and allow more code
@@ -38,60 +38,8 @@
     *  - twisted reactor 
     *  - mina IoProcessor - also has an Apr/nio impl, ProtocolCallback->IoSession,
     */
-  
-  /** 
-   * This is stored as the attachment in the selector.
-   */
-  public static class SelectorData {
-    public SelectorData(SelectorThread sel) {
-      this.sel = sel;
-    }
-    
-    // APR long is wrapped in a ByteChannel as well - with few other longs.
-    Channel channelData;
-    Object selKey;
-    
-    public SelectorThread sel;
-    public SelectorCallback callback;
-    
-    SelectorCallback pendingCallback;
-    
-    // Current interest, used internally to avoid waking up if no change
-    // Also used for connect and accept.
-    int interest;
-    
-    // True if the callback wants to be notified of read/write
-    boolean writeInterest;
-    boolean readInterest;
-    
-    /** 
-     * If != 0 - the callback will be notified closely after this time.
-     * Used for timeouts. 
-     */
-    long nextTimeEvent = 0;
-    
-    // Saved to allow debug messages for bad interest/looping
-    int lastReadResult;
-    int zeroReads = 0;
-    int lastWriteResult;
-
-    public SelectorFilter ssl;
-    
-    public int suspended = 0;
-
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("SelData: ")
-            .append(writeInterest ? "W/" : "")
-            .append(readInterest ? "R/" : "").append("/")
-            .append(channelData);
-        //append(selKey).
-        return sb.toString();
-    }
-  }
-  
   public static interface IOThreadRunnable {
-      public void ioThreadRun(SelectorData selThread) throws IOException;
+      public void ioThreadRun(SelectorChannel selThread) throws IOException;
   }
   
   public static interface DataEvents {
@@ -100,18 +48,22 @@
        * SSL may consume sending data for negotiation.
      * @throws IOException 
        */
-      public void dataWriteable(SelectorData sdata) throws IOException;
+      public void dataWriteable(SelectorChannel sdata) throws IOException;
       
-      public void dataReceived(SelectorData sdata) throws IOException;
+      public void dataReceived(SelectorChannel sdata) throws IOException;
   }
 
   public static interface DataChannel {
-      public int writeNonBlocking(SelectorThread st, SelectorData sdata, 
+      public int writeNonBlocking(SelectorThread st, SelectorChannel sdata, 
                                   ByteBuffer bb) throws IOException;
       
-      public int readNonBlocking(SelectorThread st, SelectorData sdata, 
+      public int readNonBlocking(SelectorThread st, SelectorChannel sdata, 
                                  ByteBuffer bb) throws IOException;
       
+      public void writeInterest(SelectorThread st, SelectorChannel sdata) throws IOException;
+      
+      public void readInterest(SelectorThread st, SelectorChannel sdata, boolean b) throws IOException;
+      
   }
   
   /**
@@ -147,6 +99,17 @@
   public void stop() {
   }
 
+  public void connect(SocketAddress sa, SelectorCallback cstate)
+      throws IOException {
+      connect(sa, cstate, null);
+  }
+  
+  public void connect(SocketAddress sa, SelectorCallback cstate, 
+                      SelectorFilter filter) 
+      throws IOException {
+
+  }
+  
   /**
    * This may be blocking - involves host resolution, connect.
    * If the IP address is provided - it shouldn't block.
@@ -163,16 +126,16 @@
    * @param sc
    * @param nextTimer time to call the timeEvent() callback
    */
-  public void setTimerEventTime(SelectorData sdata, long nextTimer) {
+  public void setTimerEventTime(SelectorChannel sdata, long nextTimer) {
       sdata.nextTimeEvent = nextTimer;
   }
   
-  public int readNonBlocking(SelectorData sc, ByteBuffer bb) 
+  public int readNonBlocking(SelectorChannel sc, ByteBuffer bb) 
       throws IOException {
     return 0;
   }
 
-  public int writeNonBlocking(SelectorData sc, ByteBuffer reqBuf) 
+  public int writeNonBlocking(SelectorChannel sc, ByteBuffer reqBuf) 
       throws IOException {
     return 0;
   }
@@ -180,7 +143,7 @@
   /** 
    * 
    */
-  public int close(SelectorData sc) throws IOException {
+  public int close(SelectorChannel sc) throws IOException {
     return 0;
   }
   
@@ -196,7 +159,7 @@
   { 
   }
 
-  public void runInSelectorThread(SelectorData sdata) throws IOException {
+  public void runInSelectorThread(SelectorChannel sdata) throws IOException {
   }
 
   public void runInSelectorThread(Runnable cb) throws IOException {
@@ -213,20 +176,30 @@
   /** 
    * Change the callback associated with the socket.
    */
-  public void updateCallback(SelectorData sdata, SelectorCallback old, SelectorCallback sc) {
+  public void updateCallback(SelectorChannel sdata, SelectorCallback old, SelectorCallback sc) {
   }
   
-  public void writeInterest(SelectorData sc, boolean writeInterest) {
+  /** 
+   * Request a callback whenever data can be written. 
+   * When the callback is invoked, the write interest is removed ( to avoid 
+   * looping ). If the write() operation doesn't complete, you must call
+   * writeInterest - AND stop writing, some implementations will throw
+   * exception. write() will actually attempt to detect this and avoid the 
+   * error.
+   * 
+   * @param sc
+   */
+  public void writeInterest(SelectorChannel sc) {
   }
 
-  public void readInterest(SelectorData sc, boolean readInterest) throws IOException {
+  public void readInterest(SelectorChannel sc, boolean readInterest) throws IOException {
   }
   
-  public int getPort(SelectorData sdata, boolean remote) {
+  public int getPort(SelectorChannel sdata, boolean remote) {
       return 0;
   }
   
-  public InetAddress getAddress(SelectorData sdata, boolean remote) {
+  public InetAddress getAddress(SelectorChannel sdata, boolean remote) {
       return null;
   }
 }
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThread.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java Wed Oct 22 21:45:05 2008
@@ -13,7 +13,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.tomcat.util.net;
+package org.apache.tomcat.async;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -39,9 +39,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.tomcat.util.ObjectManager;
-import org.apache.tomcat.util.net.SelectorThread.IOThreadRunnable;
-import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+import org.apache.tomcat.async.SelectorThread.SelectorFilter;
 
 /**
  * NIO implementation.
@@ -54,14 +52,14 @@
 
     Selector selector;
 
-    ArrayList<SelectorData> readInterest = new ArrayList<SelectorData>();
-    ArrayList<SelectorData> writeInterest = new ArrayList<SelectorData>();
-    ArrayList<SelectorData> connectAcceptInterest = 
-        new ArrayList<SelectorData>();
-    ArrayList<SelectorData> updateCallback = 
-        new ArrayList<SelectorData>();
-    ArrayList<SelectorData> runInterest = 
-        new ArrayList<SelectorData>();
+    ArrayList<SelectorChannel> readInterest = new ArrayList<SelectorChannel>();
+    ArrayList<SelectorChannel> writeInterest = new ArrayList<SelectorChannel>();
+    ArrayList<SelectorChannel> connectAcceptInterest = 
+        new ArrayList<SelectorChannel>();
+    ArrayList<SelectorChannel> updateCallback = 
+        new ArrayList<SelectorChannel>();
+    ArrayList<SelectorChannel> runInterest = 
+        new ArrayList<SelectorChannel>();
     ArrayList<Runnable> runnableInterest = 
         new ArrayList<Runnable>();
 
@@ -71,18 +69,19 @@
 
     // actives are also stored in the Selector. This is only updated in the main 
     // thread
-    ArrayList<SelectorData> active = new ArrayList<SelectorData>();
+    ArrayList<SelectorChannel> active = new ArrayList<SelectorChannel>();
 
-    boolean debug = true;
+    boolean debug = false;
+    boolean debugWakeup = false;
     boolean running = true;
 
     long lastWakeup = System.currentTimeMillis(); // last time we woke
-    long sleepTime;
     long nextWakeup; // next scheduled wakeup
 
     // Normally select will wait for the next time event - if it's 
     // too far in future, maxSleep will override it.
-    private long maxSleep = 10000;
+    private long maxSleep = 600000;
+    long sleepTime = maxSleep;
 
     // Never sleep less than minSleep. This defines the resulution for 
     // time events.
@@ -90,9 +89,9 @@
 
     boolean daemon = true;
 
-    public SelectorThreadNio() {
-        this(false);
-    }
+//    public SelectorThreadNio() {
+//        this(false);
+//    }
 
     public SelectorThreadNio(boolean daemon) {
         try {
@@ -109,8 +108,6 @@
 
     public void setName(String n) {
         selectorThread.setName(n);
-        ObjectManager.get().registerObject(this, "SelectorThread-" + n, 
-                "SelectorThread");
     }
 
     /**
@@ -170,16 +167,46 @@
                 
                 int selected = selector.select(sleepTime);
                 lastWakeup = System.currentTimeMillis();
-                if (debug && selected == 0) {
+                if (debugWakeup && selected == 0) {
                     long delta = lastWakeup - now;
                     if (delta < maxSleep - 1000) { // short wakeup
-                        log.info("Wakeup " + selected + " " + (delta));
+                        log.info("Wakeup " + selected + " " + delta
+                                + " " + sleepTime);
                     }
                 }
                 if (lastWakeup - now < 10 && selected == 0) {
                     if (sloops > 50) {
                         sloops = 0;
                         log.severe("Looping !");
+                        
+                        // Let's close all sockets - one is bad, but we can't do much.
+                        Set<SelectionKey> keys = selector.keys();
+                        //Set<SelectionKey> keys = selector.keys();
+                        ArrayList<SelectorChannel> oldCh = new ArrayList<SelectorChannel>();
+                        for (SelectionKey k : keys) {
+                            SelectorChannel cd = (SelectorChannel) k.attachment();
+                            cd.interest = k.interestOps();
+                            oldCh.add(cd);
+                            k.cancel();
+                        }
+                        
+                        selector.close();
+                        selector = Selector.open();
+                        for (SelectorChannel selectorData: oldCh) {
+                            if (selectorData.channel instanceof ServerSocketChannel) {
+                                ServerSocketChannel socketChannel = 
+                                    (ServerSocketChannel) selectorData.channel;
+                                selectorData.selKey = socketChannel.register(selector, SelectionKey.OP_ACCEPT);
+                            } else {
+                                SocketChannel socketChannel =
+                                    (SocketChannel) selectorData.channel;
+                                if (selectorData.interest != 0) {
+                                    selectorData.selKey = socketChannel.register(selector, 
+                                        selectorData.interest);
+                                }
+                                
+                            }
+                        }
                     }
                     sloops++;
                 }
@@ -205,16 +232,16 @@
                         //sk.interestOps(sk.interestOps() & ~readyOps);
                         // Find the request receiving the notification
                         
-                        SelectorData sdata = (SelectorData) sk.attachment();
-                        //synchronized (sdata) {
+                        SelectorChannel ch = (SelectorChannel) sk.attachment();
+                        //synchronized (ch) {
                         // either that or updateCallback in IO/Thread
-                        SelectorCallback cstate = sdata.callback;
+                        SelectorCallback cstate = ch.callback;
                             
                         //}
                         //checkChannelKey(cstate);
-                        if (sdata.selKey != sk || sdata.channelData != sk.channel()) {
-                            sdata.selKey = sk;
-                            sdata.channelData = sk.channel();
+                        if (ch.selKey != sk || ch.channel != sk.channel()) {
+                            ch.selKey = sk;
+                            ch.channel = sk.channel();
                         }
 
                         if (sk.isValid() && sk.isAcceptable()) {
@@ -233,7 +260,7 @@
                             if (debug) {
                                 log.info("!isValid, closed socket " + cstate);
                             }
-                            close(sdata, sk, cstate, sc, null, true);
+                            close(ch, sk, cstate, sc, null, true);
                             continue;
                         }
 
@@ -243,9 +270,12 @@
                                 // Only needed once
                                 sk.interestOps(sk.interestOps() 
                                         & ~SelectionKey.OP_CONNECT);
-                                handleConnect(sdata, cstate, sc);
+                                handleConnect(ch, cstate, sc);
+                                if (!sk.isValid()) {
+                                    continue;
+                                }
                                 if (debug) {
-                                    log.info("Wakeup done, connect" + selected 
+                                    log.info("Wakeup done, connect " + selected 
                                             + " " + (lastWakeup - now)   
                                             + " ready: " + readyOps + " " 
                                             + sk.readyOps() + " " + sk);
@@ -256,7 +286,8 @@
                                 // Needs to be explicitely re-enabled
                                 sk.interestOps(sk.interestOps() 
                                         & ~SelectionKey.OP_WRITE);
-                                sdata.lastWriteResult = 0;
+                                ch.writeInterest = false;
+                                ch.lastWriteResult = 0;
                                 if (debug) {
                                     log.info("dataWritable " + selected 
                                             + " " + (lastWakeup - now)   
@@ -264,21 +295,21 @@
                                             + sk.readyOps() + " " + cstate + 
                                             " " + sk);
                                 }
-                                if(sdata.ssl != null) {
-                                    sdata.ssl.dataWriteable(sdata);
+                                if(ch.ssl != null) {
+                                    ch.ssl.dataWriteable(ch);
                                 } else {
-                                    cstate.dataWriteable(sdata);
+                                    cstate.dataWriteable(ch);
                                 }
                                 
-                                if (sdata.lastWriteResult > 0 && 
-                                        sdata.writeInterest) {
+                                if (ch.lastWriteResult > 0 && 
+                                        ch.writeInterest) {
                                     log.warning("SelectorThread: write interest" +
                                                 " after incomplete write");
                                 }
                             }
 
                             if (sk.isValid() && sk.isReadable()) {
-                                sdata.lastReadResult = 0;
+                                ch.lastReadResult = 0;
                                 if (debug) {
                                     log.info("dataReceived " + selected 
                                             + " " + (lastWakeup - now)   
@@ -286,15 +317,15 @@
                                             + sk.readyOps() + " " + cstate + 
                                             " " + sk);
                                 }
-                                if (sdata.ssl != null) {
-                                    sdata.ssl.dataReceived(sdata);
+                                if (ch.ssl != null) {
+                                    ch.ssl.dataReceived(ch);
                                 } else {
-                                    cstate.dataReceived(sdata);
+                                    cstate.dataReceived(ch);
                                 }
                             }
                         } catch (Throwable t) {
                             t.printStackTrace();
-                            close(sdata, sk, cstate, sc, t, true);
+                            close(ch, sk, cstate, sc, t, true);
                         }
 
                     }
@@ -308,18 +339,20 @@
         } // while(running)
     }
 
-    private void handleConnect(SelectorData sdata, SelectorCallback cstate, SocketChannel sc)
+    private void handleConnect(SelectorChannel ch, SelectorCallback cstate, SocketChannel sc)
             throws IOException, SocketException {
         if (!sc.finishConnect()) {
             log.warning("handleConnected - finishConnect returns false");
         }
-        sdata.sel = this;
-        sc.socket().setSoLinger(true, 0);
+        ch.sel = this;
+        //sc.socket().setSoLinger(true, 0);
         if (debug) {
-            log.info("connected() " + cstate);
+            log.info("connected() " + cstate + " isConnected()=" + sc.isConnected() + " " + 
+                    sc.isConnectionPending());
         }
-        cstate.connected(sdata);
-        readInterest(sdata, true);
+        
+        readInterest(ch, true);
+        cstate.connected(ch);
     }
 
     private void handleAccept(SelectorCallback cstate, SelectionKey sk)
@@ -328,11 +361,11 @@
         ServerSocketChannel ssc=(ServerSocketChannel)selc;
         SocketChannel sockC = ssc.accept();
         sockC.configureBlocking(false);
-        SelectorData selectorData = new SelectorData(this);
+        SelectorChannel selectorData = new SelectorChannel(this);
         selectorData.selKey = sockC.register(selector, 
                 SelectionKey.OP_READ, 
                 selectorData);
-        selectorData.channelData = sockC;
+        selectorData.channel = sockC;
         
         // Find the callback for the new socket
         SelectorCallback acb = 
@@ -363,15 +396,15 @@
      *  
      */
     @Override
-    public void updateCallback(SelectorData sdata, 
+    public void updateCallback(SelectorChannel ch, 
                                SelectorCallback current, 
                                SelectorCallback cstate) {
-        sdata.pendingCallback = cstate;
+        ch.pendingCallback = cstate;
         if (isSelectorThread()) {
-            updateCallbackIOT(sdata);
+            updateCallbackIOT(ch);
         } else {
             synchronized (updateCallback) {
-                updateCallback.add(sdata);
+                updateCallback.add(ch);
             }
         }
     }
@@ -379,9 +412,9 @@
     private void processPendingUpdateCallback() {
         if (updateCallback.size() > 0) {
             synchronized (updateCallback) {
-                Iterator<SelectorData> ci = updateCallback.iterator();
+                Iterator<SelectorChannel> ci = updateCallback.iterator();
                 while (ci.hasNext()) {
-                    SelectorData cstate = ci.next();
+                    SelectorChannel cstate = ci.next();
                     updateCallbackIOT(cstate);
                 }
                 updateCallback.clear();
@@ -389,34 +422,38 @@
         }
     }
     
-    private void updateCallbackIOT(SelectorData sdata) {
+    private void updateCallbackIOT(SelectorChannel ch) {
         if (debug) {
-            log.info("Callback update " + sdata.pendingCallback + " old=" + sdata.callback);
+            log.info("Callback update " + ch.pendingCallback + " old=" + ch.callback);
         }
-        synchronized (sdata) {
-            sdata.callback = sdata.pendingCallback; 
-            sdata.pendingCallback = null;
+        synchronized (ch) {
+            ch.callback = ch.pendingCallback; 
+            ch.pendingCallback = null;
         }
     }
 
 
-    private void close(SelectorData sdata, 
+    private void close(SelectorChannel ch, 
                        SelectionKey sk,
                        SelectorCallback cstate,
-                       Channel ch, 
+                       Channel channel, 
                        Throwable ex, boolean remove) {
         try {
             if (debug) {
                 log.info("-------------> close: " + cstate + " t=" + ex);
             }
-            if (sk != null && sk.isValid()) {
-                sk.interestOps(0);
-            }
-            if (sk != null) { 
+            boolean callClose = false;
+            if (sk != null) {
+                if (sk.isValid()) {
+                    sk.interestOps(0);
+                }
                 sk.cancel();
+                ch.selKey = null;
+                callClose = true;
             }
-            if (ch instanceof SocketChannel) {
-                SocketChannel sc = (SocketChannel) ch;
+            
+            if (channel instanceof SocketChannel) {
+                SocketChannel sc = (SocketChannel) channel;
                 if (sc.isConnected()) {
                     int o = opened.decrementAndGet();
                     //System.err.println("Close socket, opened=" + o);
@@ -431,12 +468,14 @@
                     sc.socket().close();
                 }
             }
-            ch.close();
+            channel.close();
             closed.incrementAndGet();
-            cstate.channelClosed(sdata, ex);
-            if (remove) {
-                synchronized (active) {
-                    active.remove(cstate);
+            if (callClose && cstate != null) {
+                cstate.channelClosed(ch, ex);
+                if (remove) {
+                    synchronized (active) {
+                        boolean removed = active.remove(cstate);
+                    }
                 }
             }
         } catch (IOException ex2) {
@@ -448,19 +487,14 @@
     // --------------- Socket op abstractions ------------
 
     @Override
-    public int readNonBlocking(SelectorData selectorData, ByteBuffer bb) 
+    public int readNonBlocking(SelectorChannel selectorData, ByteBuffer bb) 
     throws IOException {
         try {
             int off = bb.position();
 
             int done = 0;
             
-            if (selectorData.ssl != null) {
-                done = selectorData.ssl.readNonBlocking(this, selectorData, bb);
-            } else {
-                done = ((SocketChannel) selectorData.channelData).read(bb);
-            }
-            
+            done = ((SocketChannel) selectorData.channel).read(bb);
             
             if (debug) {
                 log.info("-------------readNB rd=" + done + " bb.limit=" + 
@@ -469,9 +503,13 @@
             if (done > 0) {
             
                 if (debug) {
-                    String s = new String(bb.array(), off,
+                    if (!bb.isDirect()) {
+                        String s = new String(bb.array(), off,
                             bb.position() - off);
-                    log.info("Data:\n" + s);
+                        log.info("Data:\n" + s);
+                    } else {
+                        log.info("Data:\n" + bb.toString());
+                    }
                 }
                 if (done + off != bb.position()) {
                     System.err.println("XXX");
@@ -499,13 +537,17 @@
                 log.info("readNB error rd=" + -1 + " bblen=" + 
                         (bb.limit() - bb.position()) + " " + selectorData.callback + " " + ex);
             }
+            // other side closed the connection
+            if (ex.getMessage().indexOf("Connection reset by peer") < 0) {
+                ex.printStackTrace();
+            }
             close(selectorData);
             return -1;
         }
     }
 
     @Override
-    public int writeNonBlocking(SelectorData selectorData, ByteBuffer bb) 
+    public int writeNonBlocking(SelectorChannel selectorData, ByteBuffer bb) 
             throws IOException {
         try {
             if (selectorData.suspended != 0) {
@@ -516,19 +558,24 @@
             if (debug) {
                 log.info("writeNB pos=" + bb.position() + " len=" + 
                         (bb.limit() - bb.position()) + " " + selectorData.callback);
-            }
-            if (debug) {
-                String s = new String(bb.array(), bb.position(),
+               if (!bb.isDirect()) {
+                    String s = new String(bb.array(), bb.position(),
+                
                         bb.limit() - bb.position());
-                log.info("Data:\n" + s);
+                    log.info("Data:\n" + s);
+                }
+            }
+            if (selectorData.writeInterest) {
+                // writeInterest will be false after a callback, if it is 
+                // set it means we want to wait for the callback.
+                if (debug) {
+                    log.info("Prevent writeNB when writeInterest is set");
+                }
+                return 0;
             }
 
             int done = 0;
-            if (selectorData.ssl != null) {
-                done = selectorData.ssl.writeNonBlocking(this, selectorData, bb);
-            } else {
-                done = ((SocketChannel) selectorData.channelData).write(bb);
-            }
+            done = ((SocketChannel) selectorData.channel).write(bb);
             selectorData.lastWriteResult = done;
             return done;
         } catch(IOException ex) {
@@ -537,13 +584,15 @@
                         (bb.limit() - bb.position()) + " " + selectorData.callback + " " + 
                         ex);
             }
-            close(selectorData);
-            return -1;
+            ex.printStackTrace();
+            selectorData.close();
+            throw ex;
+            // return -1;
         }
     }
 
-    public int getPort(SelectorData sd, boolean remote) {
-        SocketChannel socketChannel = (SocketChannel) sd.channelData;        
+    public int getPort(SelectorChannel sd, boolean remote) {
+        SocketChannel socketChannel = (SocketChannel) sd.channel;        
         
         if (remote) {
             return socketChannel.socket().getPort();
@@ -552,8 +601,8 @@
         }
     }
     
-    public InetAddress getAddress(SelectorData sd, boolean remote) {
-        SocketChannel socketChannel = (SocketChannel) sd.channelData;        
+    public InetAddress getAddress(SelectorChannel sd, boolean remote) {
+        SocketChannel socketChannel = (SocketChannel) sd.channel;        
         
         if (remote) {
             return socketChannel.socket().getInetAddress();
@@ -564,20 +613,28 @@
 
     /** 
      */
+    
     @Override
     public void connect(String host, int port, SelectorCallback cstate) 
             throws IOException {
+        connect(new InetSocketAddress(host, port), cstate);
+    }
+    
+    @Override
+    public void connect(SocketAddress sa, SelectorCallback cstate, SelectorFilter filter) 
+            throws IOException {
+
         SocketChannel socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
-        SelectorData selectorData = new SelectorData(this);
+        SelectorChannel selectorData = new SelectorChannel(this);
         selectorData.sel = this;
         selectorData.callback = cstate;
-        selectorData.channelData = socketChannel;
-        selectorData.channelData = socketChannel; // no key
+        selectorData.channel = socketChannel;
+        selectorData.channel = socketChannel; // no key
         
-        // TODO: add SSL filter
+        selectorData.ssl = filter;
         
-        socketChannel.connect(new InetSocketAddress(host,  port));
+        socketChannel.connect(sa);
         opened.incrementAndGet();
         
         synchronized (connectAcceptInterest) {
@@ -594,14 +651,14 @@
     }
 
     // TODO
-    public void setSocketOptions(SelectorData selectorData,
+    public void setSocketOptions(SelectorChannel selectorData,
                                  int linger, 
                                  boolean tcpNoDelay,
                                  int socketTimeout)
     throws IOException {
 
         SocketChannel socketChannel = 
-            (SocketChannel) selectorData.channelData;
+            (SocketChannel) selectorData.channel;
         Socket socket = socketChannel.socket();
 
         if(linger >= 0 ) 
@@ -613,10 +670,10 @@
     }
 
     @Override
-    public int close(SelectorData selectorData) throws IOException {
+    public int close(SelectorChannel selectorData) throws IOException {
         close(selectorData,
                 (SelectionKey) selectorData.selKey, selectorData.callback, 
-                selectorData.channelData, null, true);
+                selectorData.channel, null, true);
         return 0;
     }
 
@@ -640,6 +697,8 @@
         }
         if (backlog > 0) {
             serverSocket.bind( sa , backlog);
+        } else {
+            serverSocket.bind(sa);
         }
         if( serverTimeout >= 0 ) {
             serverSocket.setSoTimeout( serverTimeout );
@@ -648,8 +707,8 @@
 
         ssc.configureBlocking(false);
 
-        SelectorData selectorData = new SelectorData(this);
-        selectorData.channelData = ssc; // no key yet
+        SelectorChannel selectorData = new SelectorChannel(this);
+        selectorData.channel = ssc; // no key yet
         selectorData.callback = cstate; 
         // key will be set in pending
 
@@ -663,12 +722,12 @@
     
 
     @Override
-    public void runInSelectorThread(SelectorData sdata) throws IOException {
+    public void runInSelectorThread(SelectorChannel ch) throws IOException {
         if (isSelectorThread()) {
-            sdata.callback.ioThreadRun(sdata);
+            ch.callback.ioThreadRun(ch);
         } else {
             synchronized (runInterest) {
-                runInterest.add(sdata);
+                runInterest.add(ch);
             }
             selector.wakeup();
         }
@@ -720,8 +779,8 @@
             ServerSocketChannel ssc=(ServerSocketChannel)ch;
             ssc.configureBlocking(false);
 
-            SelectorData selectorData = new SelectorData(this);
-            selectorData.channelData = ssc;
+            SelectorChannel selectorData = new SelectorChannel(this);
+            selectorData.channel = ssc;
             selectorData.callback = cstate;
             
             synchronized (connectAcceptInterest) {
@@ -745,19 +804,19 @@
         long min = Long.MAX_VALUE;
         // TODO: test with large sets, maybe sort
         synchronized (active) {
-            Iterator<SelectorData> activeIt = active.iterator();
+            Iterator<SelectorChannel> activeIt = active.iterator();
 
             while(activeIt.hasNext()) {
-                SelectorData selectorData = activeIt.next();
-                if (! selectorData.channelData.isOpen()) {
+                SelectorChannel selectorData = activeIt.next();
+                if (! selectorData.channel.isOpen()) {
                     if (debug) {
                         log.info("Found closed socket, removing " + 
-                                selectorData.channelData);
+                                selectorData.channel);
                     }
                     activeIt.remove();
                     close(selectorData, 
                             (SelectionKey) selectorData.selKey, selectorData.callback, 
-                            selectorData.channelData, null, false); // generate callback, increment counters.
+                            selectorData.channel, null, false); // generate callback, increment counters.
                 }
 
                 long t = selectorData.nextTimeEvent;
@@ -788,50 +847,31 @@
         } else {
             sleepTime = nextSleep;
         }
-        if (sleepTime == 0) {
-            System.err.println("XXX");
-        }
         nextWakeup = now + sleepTime;
     }
 
     @Override    
-    public void writeInterest(SelectorData selectorData, boolean b) {
+    public void writeInterest(SelectorChannel selectorData) {
         // TODO: suspended ? 
 
         SelectionKey sk = (SelectionKey) selectorData.selKey;
         if (!sk.isValid()) {
             return;
         }
+        selectorData.writeInterest = true;
         int interest = sk.interestOps();
-        if (b && (interest & SelectionKey.OP_WRITE) != 0) {
-            return;
-        }
-        if (!b && (interest & SelectionKey.OP_WRITE) == 0) {
+        if ((interest & SelectionKey.OP_WRITE) != 0) {
             return;
         }
         if (Thread.currentThread() == selectorThread) {
-            selectorData.writeInterest = b;
-            if (selectorData.writeInterest) {
-                interest = 
-                    interest | SelectionKey.OP_WRITE;
-            } else {
-                interest = 
-                    interest & ~SelectionKey.OP_WRITE;                
-            }
-            if (interest == 0) {
-                log.warning("No interest " + selectorData.callback);
-            } else {
-                sk.interestOps(interest);                
-            }
+            interest = 
+                interest | SelectionKey.OP_WRITE;
+            sk.interestOps(interest);                
             if (debug) {
                 log.info("Write interest " + selectorData.callback + " i=" + interest);
             }
             return;
         }
-        if (!b) {
-            return; // can't remove interest from regular thread
-        }
-        selectorData.writeInterest = b;
         if (debug) {
             log.info("Pending write interest " + selectorData.callback);
         }
@@ -843,7 +883,7 @@
     
     
     @Override
-    public void readInterest(SelectorData selectorData, boolean b) throws IOException {
+    public void readInterest(SelectorChannel selectorData, boolean b) throws IOException {
         if (Thread.currentThread() == selectorThread) {
             selectorData.readInterest = b; 
             selThreadReadInterest(selectorData);
@@ -869,7 +909,7 @@
     }
 
 
-    private void selThreadReadInterest(SelectorData selectorData) throws IOException {
+    private void selThreadReadInterest(SelectorChannel selectorData) throws IOException {
         SelectionKey sk = (SelectionKey) selectorData.selKey;
         if (sk == null) {
             if (selectorData.readInterest) {
@@ -877,13 +917,13 @@
                     log.info("Register again for read interest");
                 }
                 SocketChannel socketChannel = 
-                    (SocketChannel) selectorData.channelData;
+                    (SocketChannel) selectorData.channel;
                 if (socketChannel.isOpen()) {
                     selectorData.sel = this;
                     selectorData.selKey = 
                         socketChannel.register(selector, 
                                 SelectionKey.OP_READ, selectorData);
-                    selectorData.channelData = socketChannel;
+                    selectorData.channel = socketChannel;
                 }
             }
             return;
@@ -926,23 +966,23 @@
 
     private void processPendingConnectAccept() throws IOException {
         synchronized (connectAcceptInterest) {
-            Iterator<SelectorData> ci = connectAcceptInterest.iterator();
+            Iterator<SelectorChannel> ci = connectAcceptInterest.iterator();
 
             while (ci.hasNext()) {
-                SelectorData selectorData = ci.next();
+                SelectorChannel selectorData = ci.next();
                 
                 // Find host, port - initiate connection
                 try {
                     // Accept interest ?
-                    if (selectorData.channelData instanceof ServerSocketChannel) {
+                    if (selectorData.channel instanceof ServerSocketChannel) {
                         ServerSocketChannel socketChannel = 
-                            (ServerSocketChannel) selectorData.channelData;
+                            (ServerSocketChannel) selectorData.channel;
                         selectorData.sel = this;
                         selectorData.selKey = 
                           socketChannel.register(selector, 
                               SelectionKey.OP_ACCEPT, selectorData);
                         
-                        selectorData.channelData = socketChannel;
+                        selectorData.channel = socketChannel;
                         synchronized (active) {
                             active.add(selectorData);
                         }
@@ -951,7 +991,7 @@
                         }
                     } else {
                         SocketChannel socketChannel =
-                          (SocketChannel) selectorData.channelData;
+                            (SocketChannel) selectorData.channel;
                         selectorData.sel = this;
                         selectorData.selKey = 
                           socketChannel.register(selector, 
@@ -978,9 +1018,9 @@
         
         if (runInterest.size() > 0) {
             synchronized (runInterest) {
-                Iterator<SelectorData> ci = runInterest.iterator();
+                Iterator<SelectorChannel> ci = runInterest.iterator();
                 while (ci.hasNext()) {
-                    SelectorData cstate = ci.next();
+                    SelectorChannel cstate = ci.next();
                     try {
                         cstate.callback.ioThreadRun(cstate);
                     } catch (Throwable t) {
@@ -1017,9 +1057,9 @@
         // Update interest 
         if (readInterest.size() > 0) {
             synchronized (readInterest) {
-                Iterator<SelectorData> ci = readInterest.iterator();
+                Iterator<SelectorChannel> ci = readInterest.iterator();
                 while (ci.hasNext()) {
-                    SelectorData cstate = ci.next();
+                    SelectorChannel cstate = ci.next();
                     selThreadReadInterest(cstate);
                     if (debug) {
                         log.info("Read interest added: " + cstate);
@@ -1030,9 +1070,9 @@
         }
         if (writeInterest.size() > 0) {
             synchronized (writeInterest) {
-                Iterator<SelectorData> ci = writeInterest.iterator();
+                Iterator<SelectorChannel> ci = writeInterest.iterator();
                 while (ci.hasNext()) {
-                    SelectorData cstate = ci.next();
+                    SelectorChannel cstate = ci.next();
                     // Fake callback - will update as side effect
                     cstate.callback.dataWriteable(cstate);
                     if (debug) {

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SelectorThreadNio.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/SocketPool.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java Wed Oct 22 21:45:05 2008
@@ -1,6 +1,6 @@
 /*
  */
-package org.apache.coyote.client;
+package org.apache.tomcat.async;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -10,11 +10,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Logger;
 
-//import org.apache.tomcat.util.modeler.Registry;
-import org.apache.tomcat.util.ObjectManager;
-import org.apache.tomcat.util.net.SelectorCallback;
-import org.apache.tomcat.util.net.SelectorThread.SelectorData;
-
 /** 
  * Very simple socket pool. For each remote host holds a list 
  * of open connections, with a close callback. 
@@ -26,6 +21,7 @@
 public class SocketPool {
     static Logger log = Logger.getLogger(SocketPool.class.getName());
     static boolean debug = false;
+    
     /** 
      * Wait for close from the other end, remove the socket from the pool. 
      */
@@ -33,28 +29,28 @@
         RemoteServer target;
 
         @Override
-        public void dataReceived(SelectorData selThread) 
+        public void dataReceived(SelectorChannel ch) 
         throws IOException {
             ByteBuffer bb = ByteBuffer.allocate(1024);
-            int rd = selThread.sel.readNonBlocking(selThread, bb);
+            int rd = ch.read(bb);
             if (rd < 0) {
                 // closed
-                selThread.sel.close(selThread);
+                ch.close();
                 return;
             } 
-            log.warning("Unexpected data waiting close " + selThread);
-            selThread.sel.close(selThread);
+            log.warning("Unexpected data waiting close " + ch);
+            ch.close();
             return;
         }
 
         @Override
-        public void channelClosed(SelectorData selThread, Throwable ex) {
+        public void channelClosed(SelectorChannel ch, Throwable ex) {
             if (debug) {
-                log.fine("Closed..." + selThread);
+                log.fine("Closed..." + ch);
             }
             target.pool.closedSockets.incrementAndGet();
             synchronized (target) {
-                target.connections.remove(selThread);
+                target.connections.remove(ch);
             }
         }
     }
@@ -67,7 +63,7 @@
      */
     static class RemoteServer {
         SocketPool pool;
-        ArrayList<SelectorData> connections = new ArrayList<SelectorData>();
+        ArrayList<SelectorChannel> connections = new ArrayList<SelectorChannel>();
     }
 
 
@@ -84,8 +80,6 @@
     // TODO: for linux, find how many sockets we can have open, use as limit
     
     public SocketPool(String id) {
-        ObjectManager.get().registerObject(this, 
-                "SocketPool,id=" + id, "SocketPool");
     }
 
     public int getTargetCount() {
@@ -114,7 +108,7 @@
      * are connected to equivalent servers ( LB ) 
      * @throws IOException 
      */
-    public SelectorData getChannel(CharSequence key, 
+    public SelectorChannel getChannel(CharSequence key, 
                                    SelectorCallback cstate) throws IOException {
         RemoteServer t = null;
         synchronized (hosts) {
@@ -126,7 +120,7 @@
         if (t.connections.size() == 0) {
             return null;
         } // one may be added - no harm.
-        SelectorData res = null;
+        SelectorChannel res = null;
         synchronized (t) {
             res = t.connections.remove(t.connections.size() - 1);
 
@@ -134,13 +128,13 @@
             if (res == null) {
                 return null;
             }
-            res.sel.updateCallback(res, res.callback, cstate);
+            res.updateCallback(cstate);
         }
         return res;      
     }
 
     public void returnChannel(CharSequence key, 
-                              SelectorData sdata,
+                              SelectorChannel sdata,
                               SelectorCallback cstate) 
             throws IOException {
         RemoteServer t = null;
@@ -157,8 +151,8 @@
         waitingSockets.incrementAndGet();
         synchronized (t) {
             t.connections.add(sdata);      
-            sdata.sel.updateCallback(sdata, cstate, cc);
-            sdata.sel.readInterest(sdata, true);
+            sdata.updateCallback(cc);
+            sdata.readInterest(true);
         }
     }
 }

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/SocketPool.java
------------------------------------------------------------------------------
    svn:mergeinfo = 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message