tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cos...@apache.org
Subject svn commit: r707271 [2/2] - in /tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async: ./ http/ socks/
Date Thu, 23 Oct 2008 04:45:06 GMT
Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttp.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java Wed Oct 22 21:45:05 2008
@@ -13,9 +13,10 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.coyote.client;
+package org.apache.tomcat.async.http;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
@@ -28,18 +29,19 @@
 import org.apache.coyote.Request;
 import org.apache.coyote.Response;
 import org.apache.coyote.http11.Constants;
+import org.apache.coyote.util.CoyoteUtils;
+import org.apache.tomcat.async.SelectorCallback;
+import org.apache.tomcat.async.SelectorChannel;
+import org.apache.tomcat.async.SelectorPool;
+import org.apache.tomcat.async.SelectorThread;
+import org.apache.tomcat.async.SelectorThread.SelectorFilter;
 import org.apache.tomcat.util.ObjectManager;
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.buf.HexUtils;
 import org.apache.tomcat.util.buf.MessageBytes;
-import org.apache.tomcat.util.http.Http11Parser;
+import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.HttpMessages;
 import org.apache.tomcat.util.http.MimeHeaders;
-//import org.apache.tomcat.util.modeler.Registry;
-import org.apache.tomcat.util.net.SelectorCallback;
-import org.apache.tomcat.util.net.SelectorPool;
-import org.apache.tomcat.util.net.SelectorThread;
-import org.apache.tomcat.util.net.SelectorThread.SelectorData;
 
 /**
  * HTTP async client, using tomcat buffers and I/O.
@@ -192,7 +194,7 @@
     
     protected AsyncHttpCallback httpCallback = blockingHttp;
     
-    protected SelectorData selectorData;
+    protected SelectorChannel ch;
     
     protected Request req;
     protected Response res;
@@ -206,7 +208,8 @@
     /** To set when done, wait for next req */
     protected SelectorCallback keepAliveCallback;
 
-    protected boolean debug = false; 
+    protected boolean serverMode = false;
+    protected boolean debug = true; 
 
     // ---------- Client only ------------
     // Because Response uses String. They wrap data in headReadBuf
@@ -225,6 +228,13 @@
     // ------ JMX 
     protected int ser; // id - for jmx registration and logs
     
+    // Server side only 
+    protected String serverHeader = "ApacheTomcat";
+    protected boolean http11 = false;
+    protected boolean http09 = false;
+    protected boolean error = false;
+
+    
     public void recycle() {
         recycle(false);
     }
@@ -232,7 +242,7 @@
     protected void recycle(boolean keepHead) {
         if (!keepHead) {
             headRecvBuf.recycle();
-            selectorData = null;
+            ch = null;
         }
         headSendBuf.recycle();
         recvState = State.HEAD;
@@ -260,6 +270,9 @@
         noBodySend = false; 
         closeOnEndSend = false;
         
+        http11 = false;
+        http09 = false;
+        error = false;
         
         //protected LinkedList<ByteChunk> readBuffers = new LinkedList<ByteChunk>();
         rawRecvBuf.recycle();
@@ -299,6 +312,10 @@
             log.info(dbgName + ": recycle " + ser);
         }
     }
+    
+    public SelectorCallback getSelectorCallback() {
+        return selectorCallback;
+    }
 
     protected AsyncHttp(SelectorPool client, boolean init) {
         if (client == null) {
@@ -336,7 +353,7 @@
             log.info("Create new client ser=" + ser);
         }
     }
-
+    
     public AsyncHttp(SelectorPool client) {
         this(client, true);
     }
@@ -344,6 +361,16 @@
     protected AsyncHttp() {
     }
 
+    protected AsyncHttp serverMode() {
+        serverMode = true;
+        return this;
+    }
+
+    protected AsyncHttp keepAlive(SelectorCallback ka) {
+        this.keepAliveCallback = ka;
+        return this;
+    }
+    
     public void setClientPool(AsyncHttpPool pool) {
         this.pool = pool;
     }
@@ -353,22 +380,23 @@
         this.res = res;
     }
     
-    public void setSelectorData(SelectorData selData) {
-        this.selectorData = selData;
+    public void setSelectorData(SelectorChannel ch) {
+        this.ch = ch;
     }
 
     // ----- Callback methods --------
     
     /**
-     * Read
+     * Read data.
+     * 
      * @return NEED_MORE - read all available data
      * @return CLOSE - other side closed connection
      * @return > 0 - number of bytes received
      */
-    private int tryRead(SelectorData selT, ByteChunk buf)
+    private int tryRead(SelectorChannel ch, ByteChunk buf)
             throws IOException {
         ByteBuffer bb = buf.getWriteByteBuffer();
-        int done = selT.sel.readNonBlocking(selT, bb);
+        int done = ch.read(bb);
         buf.updateWriteByteBuffer();
         
         if (done < 0) {
@@ -394,16 +422,20 @@
     }
 
     /** 
-     * Read and process a chunk of head 
+     * Read and process a chunk of head, called from dataReceived() if 
+     * in HEAD mode.
      * 
+     * @return <= 0 - still in head mode. > 0 moved to body mode, some 
+     * body chunk may have been received. 
      */
-    protected int headDataReceived(SelectorData selT) throws IOException {
+    protected int headDataReceived(SelectorChannel ch) throws IOException {
         while (recvState == State.HEAD) {
-            if (headRecvBuf.length() == 0) {
-                int done = tryRead(selT, headRecvBuf);
+//            if (headRecvBuf.length() == 0) {
+//                // Why ? What if the buffer is pre-filled ?
+                int done = tryRead(ch, headRecvBuf);
                 if (done == CLOSE) {
                     // TODO: http/0.9 ? 
-                    selT.sel.close(selT); // too early - we don't have the head
+                    ch.close(); // too early - we don't have the head
                     return done;
                 } else if (done == NEED_MORE) {
                     // End of input - other side closed, no more data
@@ -415,12 +447,12 @@
                             done + " head=" + headRecvBuf.length() + 
                             " body=" + rawRecvBuf.length());
                 }
-            } else {
-                if (debug ) {
-                    log.info(dbgName + ": headDataReceived, existing data: " + 
-                            ser + " " +  " head=" + headRecvBuf.length());
-                }
-            }
+//            } else {
+//                if (debug ) {
+//                    log.info(dbgName + ": headDataReceived, existing data: " + 
+//                            ser + " " +  " head=" + headRecvBuf.length());
+//                }
+//            }
 
             //Parse the response
             parser.setBuffer(headRecvBuf.getBuffer(), headRecvBuf.getStart(), 
@@ -447,9 +479,17 @@
             headRecvBuf.setEnd(dataStart);
             
             headersReceived();
-            return DONE;
+            
+            recvState = State.BODY_DATA;
+            if (debug) {
+                log.info(dbgName + ": Head done, start body " + ser + 
+                        " buf=" + rawRecvBuf.length() + " r=" + remainingRecv + " c=" + 
+                        chunkedRecv + " rs=" + recvState);
+            }
+            
+            return dataStart;
         }
-        // TODO: if GET/HEAD - request received..
+        // Shouldn't happen ( no break )
         return NEED_MORE;
     }
 
@@ -458,13 +498,25 @@
      * @return NEED_MORE or number of extra body bytes in the head buffer 
      */
     protected int parseHead() throws IOException {
-        boolean ok = parser.parseResponseLine(proto, status, statusMsg);
-        if (!ok) {
-            return NEED_MORE;
-        }
+        if (serverMode) {
+            boolean ok = parser.parseRequestLine(req.method(),
+                    req.unparsedURI(), req.query(), 
+                    req.requestURI(), req.protocol());
+            if (!ok) {
+                return NEED_MORE;
+            }
 
-        int dataStart = parser.parseHeaders(res.getMimeHeaders());
-        return (dataStart < 0) ? NEED_MORE : dataStart;
+            int dataStart = parser.parseHeaders(req.getMimeHeaders());
+            return (dataStart < 0) ? NEED_MORE : dataStart;
+        } else {
+            boolean ok = parser.parseResponseLine(proto, status, statusMsg);
+            if (!ok) {
+                return NEED_MORE;
+            }
+
+            int dataStart = parser.parseHeaders(res.getMimeHeaders());
+            return (dataStart < 0) ? NEED_MORE : dataStart;
+        }
     }
 
     public boolean isReadContentDelimited() {
@@ -472,27 +524,42 @@
     }
     
     protected void headersReceived() throws IOException {
-        // We have the response head, now flush remaining data.
-        if (res != null) {
-            try {
-                res.setStatus(status.getInt());
-                res.setMessage(statusMsg.toString());
-            } catch (Throwable t) {
-                log.warning("Invalid status " + status + " " + statusMsg);
+        if (serverMode) {
+            processReadTransferHeaders(req.getMimeHeaders());
+
+            if (!isReadContentDelimited()) {
+                if (req.method().equals("GET") ||
+                        req.method().equalsIgnoreCase("HEAD")) {
+                    // No body.
+                    handleEndReceive(false);
+                } else {
+                    closeOnEndSend = true;
+                    closeOnEndRecv = true; // needBody
+                }
+            } // else: delimited, need body
+        } else {
+            // We have the response head, now flush remaining data.
+            if (res != null) {
+                try {
+                    res.setStatus(status.getInt());
+                    res.setMessage(statusMsg.toString());
+                } catch (Throwable t) {
+                    log.warning("Invalid status " + status + " " + statusMsg);
+                }
+            }
+            // Extract transfer data
+            processReadTransferHeaders(res.getMimeHeaders());
+            if (contentLengthRecv >= 0) {
+                // Update response - it's a cached value
+                res.setContentLength(contentLengthRecv);
+            }
+
+            if (!isReadContentDelimited()) {
+                closeOnEndRecv = true;
+                closeOnEndSend = true;
             }
         }
-        // Extract transfer data
-        processReadTransferHeaders(res.getMimeHeaders());
-        if (contentLengthRecv >= 0) {
-            // Update response - it's a cached value
-            res.setContentLength(contentLengthRecv);
-        }
 
-        if (!isReadContentDelimited()) {
-            closeOnEndRecv = true;
-            closeOnEndSend = true;
-        }
-        //Unlock head notification
         if (httpCallback != null) {
             httpCallback.headersReceived(this, req, res);
         }
@@ -507,12 +574,12 @@
      *  was a success and done in that thread ( write is not bound to IO thr)
      * 
      */
-    protected void handleEndSendReceive(SelectorData selT) throws IOException {
+    protected void handleEndSendReceive(SelectorChannel ch) throws IOException {
         if (httpCallback != null) {
             httpCallback.done(this, false, null);
         }
         if (closeOnEndSend) {
-            selT.sel.close(selT);
+            ch.close();
         }
         if (debug) {
             log.info("client send/receive done, close=" + 
@@ -524,10 +591,10 @@
     /** 
      * called from IO thread OR servlet thread when last block has been sent.
      * 
-     * @param selT 
+     * @param ch 
      * @throws IOException 
      */
-    protected void handleEndSent(SelectorData selT) throws IOException {
+    protected void handleEndSent(SelectorChannel ch) throws IOException {
         if (debug) {
             log.info("OUT body sent done " + ser + " " + sendState);
         }
@@ -540,7 +607,7 @@
         
         // TODO: close out stream ? 
         if (closeOnEndSend) {
-            selector.close(selT);
+            ch.close();
         }
         
         if (httpCallback != null) {
@@ -550,7 +617,7 @@
         synchronized(this) {
             sendState = State.DONE;
             if (recvState == State.DONE) {
-                handleEndSendReceive(selT);
+                handleEndSendReceive(ch);
             }
         }
     }
@@ -578,7 +645,7 @@
         synchronized(this) {
             recvState = State.DONE;
             if (sendState == State.DONE) {
-                handleEndSendReceive(selectorData);
+                handleEndSendReceive(ch);
             }
         }
     }
@@ -588,7 +655,7 @@
      * Callback should not consume past the end of the body.
      *  
      */
-    public int rawDataReceived(SelectorData selT) throws IOException {
+    public int rawDataReceived(SelectorChannel ch) throws IOException {
         int rc = NEED_MORE; // keep receiving
         if (rawRecvBuf.length() == 0) {
             // Only called from headers, if GET/HEAD
@@ -688,9 +755,9 @@
                 // read interest is only suspended if next req is sent
                 // while processing the current one - don't want to 
                 // receive the whole post body, but keep what was sent.
-                // TODO: selT.readInterest(this, false);
+                // TODO: ch.readInterest(this, false);
                 // We read too much, leave it there for next request.
-                handleReceivePastEnd(selT);
+                handleReceivePastEnd(ch);
             }
             // we must read the current buffer, or select() will be sad
         }
@@ -707,22 +774,22 @@
 //            recvState = State.DONE;
 //            boolean close = bodyReceived();
 //            // done reading - will be re-enabled when keep-alive is set
-//            //selT.readInterest(this, false);
+//            //ch.readInterest(this, false);
 //            return;
 //        } else {
 //            log.warning("Untested read suspend");
-//            //selT.readInterest(this, false);
+//            //ch.readInterest(this, false);
                 
     }
 
 
-    protected void handleReceivePastEnd(SelectorData selT) throws IOException {
+    protected void handleReceivePastEnd(SelectorChannel ch) throws IOException {
         log.info(this.toString() + " CLIENT Read past end\n" + rawRecvBuf + " " 
                 + rawRecvBuf.length());
         closeOnEndRecv = true;
         closeOnEndSend = true;
         // Force close
-        selector.close(selT);
+        ch.close();
     }
 
     protected int processReadTransferHeaders(MimeHeaders headers) {
@@ -821,7 +888,9 @@
 
             // Attempt to write - now, maybe we don't need to worry about 
             // interest
-            sendData(selectorData);
+            if (ch != null) {
+                sendData(ch);
+            }
             if (sendBrigade.size() == 0) {
                 if (debug) {
                     log.info("Sent all data in-thread");
@@ -835,6 +904,16 @@
         return false; // need to wait
     }
 
+    void sendHead() throws IOException {
+        synchronized (sendBrigade) {
+            sendBrigade.add(0, headSendBuf);
+            sendData(ch);
+        }
+        // TODO: for post ( or body ) - don't send end, let body to be sent
+        endSend(this);
+    }
+    
+    
     /** 
      * Called from selector thread when 'write' is possible, or from regular
      * threads when buffers can be written.
@@ -846,7 +925,7 @@
      * thread. Again: no other code in the selector thread or outside should
      * register or clear write interst except this code. 
      */
-    protected int sendData(SelectorData selT) throws IOException {
+    protected int sendData(SelectorChannel ch) throws IOException {
         int written = 0;
         synchronized (sendBrigade) {
             while (sendBrigade.size() > 0) {
@@ -872,14 +951,13 @@
                         current.setOffset(current.getEnd());
                     } else {
                         ByteBuffer bb = current.getReadByteBuffer();
-                        done = selT.sel.writeNonBlocking(selT, 
-                                bb);
+                        done = ch.write(bb);
                         current.updateReadByteBuffer();
                     }
                 }
                 if (done < 0) { // error
                     handleError("writeRequest");
-                    selT.sel.close(selT);
+                    ch.close();
                     return CLOSE;
                 }
                 
@@ -895,7 +973,7 @@
                             remainingSend < 0) {
                         // TODO: make sure we only write max.
                         log.severe(dbgName + ": write more than Content-Length");
-                        selT.sel.close(selT);
+                        ch.close();
                         return CLOSE;
                     }
                 }
@@ -908,7 +986,7 @@
                     //  - in sel thread: change at once.
                     //  - in regular thread - writeInterest updated, on next
                     // cycle dataWritable() will be called. 
-                    selT.sel.writeInterest(selT, true);
+                    ch.writeInterest();
                     return NEED_MORE;
                 } else {
                     // DONE !!!
@@ -920,7 +998,7 @@
                     // Next buffer 
                     sendBrigade.remove(0);
                     if (current == endSendBuffer) {
-                        handleEndSent(selT);
+                        handleEndSent(ch);
                         sendDone = true;
                         return DONE;
                     }
@@ -935,17 +1013,30 @@
                 }
             }
             // size == 0
-            // Only selector thread can remove interest
-            if (selT.sel.isSelectorThread()) {
-                selT.sel.writeInterest(selT, false);
-            }
             return SUSPEND;
         }
         
     }
     
+    /** 
+     * Called after the headers are sent.
+     */
     protected void headersSent() throws IOException {
-      sendState = State.BODY_DATA;      
+      sendState = State.BODY_DATA;
+      
+      if (serverMode) {
+          int statusCode = res.getStatus();
+          if ((statusCode == 204) || (statusCode == 205)
+                  || (statusCode == 304)) {
+              noBodySend = true;
+          }
+
+          MessageBytes methodMB = req.method();
+          if (methodMB.equals("HEAD")) {
+              // No entity body
+              noBodySend = true;
+          }
+      }
     }
     
  
@@ -959,6 +1050,7 @@
      */
     public boolean endSend(AsyncHttp cb) throws IOException {
         // Finished sending the last part of the response
+        // TODO: what if client and no data ?
         synchronized (this) {
             if (sendDone) {
                 // don't do this more than once
@@ -1123,50 +1215,6 @@
     }
 
     /** 
-     * Convert the request to bytes, ready to send.
-     */
-    public static void serializeRequest(Request req, 
-                                        ByteChunk reqBuf) throws IOException {
-        req.method().toBytes();
-        if (!req.unparsedURI().isNull()) {
-            req.unparsedURI().toBytes();
-        }
-        req.protocol().toBytes();
-
-        reqBuf.append(req.method().getByteChunk());
-        reqBuf.append(space);
-        if (req.unparsedURI().isNull()) {
-            req.requestURI().toBytes();
-
-            reqBuf.append(req.requestURI().getByteChunk());      
-        } else {
-            reqBuf.append(req.unparsedURI().getByteChunk());
-        }
-        reqBuf.append(space);
-        reqBuf.append(req.protocol().getByteChunk());
-        reqBuf.append(crlf);
-        // Headers
-        MimeHeaders mimeHeaders = req.getMimeHeaders();
-        boolean hasHost = false;
-        for (int i = 0; i < mimeHeaders.size(); i++) {
-            MessageBytes name = mimeHeaders.getName(i);
-            name.toBytes();
-            reqBuf.append(name.getByteChunk());
-            if (name.equalsIgnoreCase("host")) {
-                hasHost = true;
-            }
-            reqBuf.append(col);
-            mimeHeaders.getValue(i).toBytes();
-            reqBuf.append(mimeHeaders.getValue(i).getByteChunk());
-            reqBuf.append(crlf);
-        }
-        if (!hasHost) {
-            reqBuf.append("Host: localhost\r\n".getBytes(), 0, 17);
-        }
-        reqBuf.append(crlf);
-    }
-
-    /** 
      * Convert the response to bytes, ready to send.
      */
     public static void serializeResponse(Response res, 
@@ -1217,6 +1265,10 @@
         this.httpCallback = c;
     }
 
+    public AsyncHttpCallback getCallback() {
+        return httpCallback;
+    }
+    
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append(dbgName + "-").append(ser).append(",rs=").append(getState())
@@ -1235,9 +1287,13 @@
      * @throws IOException
      */
     public void connect(String host, int port) throws IOException {
+        connect(host, port, null);
+    }
+    
+    public void connect(String host, int port, SelectorFilter filter) throws IOException {
         this.host = host;
         this.port = port;
-        serializeRequest(req, headSendBuf);    
+        CoyoteUtils.serializeRequest(req, headSendBuf);    
         sendState = State.HEAD;
         if (rawRecvBuf.length() > 0) {
             log.severe("pre-existing data");
@@ -1245,16 +1301,18 @@
         // Get an existing channel, set it in 'write' mode.
         // TODO: only need 'WRITE' if we can't write.
         String target = host + ":" + port;
-        SelectorData sdata = (pool == null) ? null :
+        SelectorChannel ch = (pool == null) ? null :
             pool.getSocketPool().getChannel(target, 
                 selectorCallback);
-        if (sdata == null) {
+
+        if (ch == null) {
             if (debug) {
                 log.info("HTTP_CONNECT: New connection " + target + " " + this);
             }
-            selector.connect(host, port, selectorCallback);
+            selector.connect(new InetSocketAddress(host, port), 
+                    selectorCallback, filter);
         } else {
-            setSelectorData(sdata);
+            setSelectorData(ch);
             if (debug) {
                 log.info("HTTP_CONNECT: Reuse connection " + target + " " + this);
             }
@@ -1262,18 +1320,61 @@
         }
     }
     
-    void sendHead() throws IOException {
-        startWrite(headSendBuf);
-        // TODO: for post ( or body ) - don't send end, let body to be sent
-        endSend(this);
-    }
-    
+    /** 
+     * Finalize the sending. Indicates servlet processing is done, some
+     * IO may still be in flight.
+     *  
+     * MUST be called -  either after service or from the commet app.
+     * 
+     * The tricky part is if this gets called before or after or at same 
+     * time with endSendReceive and with next request getting pipelined.
+     * 
+     * @throws IOException 
+     */
     public void release() throws IOException {
+        endSend(this); // make sure the final marker is sent (if we have 
+        //a POST body)
         serviceDone = true;
         maybeRelease();
     }
 
-    public void ioThreadRun(SelectorData selThread) throws IOException {
+    public void ioThreadRun(SelectorChannel ch) throws IOException {
+        if (serverMode) {
+            // Currently used only to set keep alive.
+            // If we need more, need to use state or pass a param.
+            
+            if (rawRecvBuf.length() > 0) {
+                // what if I'm not done ?
+                if (debug) {
+                    log.info("Done, pipelined next request \n" + rawRecvBuf + " " 
+                        + rawRecvBuf.length());
+                }
+                headRecvBuf.recycle();
+                headRecvBuf.append(rawRecvBuf);
+                recycle(true);
+                // No release - next req in process
+                
+                // In case it was disabled
+                selector.readInterest(ch, true);
+                // Notify that data was received. The callback must deal with
+                // pre-existing data.
+                selectorCallback.dataReceived(ch);
+                return;
+            }
+            
+            synchronized (this) {
+                if (keepAliveCallback != null) {
+                    selector.readInterest(ch, true);
+
+                    ch.updateCallback(keepAliveCallback);
+
+                    if (debug) {
+                        log.info("SERVER send/receive done, KEEP_ALIVE");
+                    }
+                }
+                // TODO: else: close
+            }
+        } 
         // Currently used only to set keep alive.
         // If we need more, need to use state or pass a param.
         returnToPool();
@@ -1295,7 +1396,7 @@
                 if (!allDone) {
                     allDone = true;
                     // pipeline or keepalive
-                    selector.runInSelectorThread(selectorData);
+                    selector.runInSelectorThread(ch);
                 }
             } else {
                 if (debug) {
@@ -1315,7 +1416,7 @@
         }
         String target = getTarget();
        
-        SelectorData sdata = selectorData;
+        SelectorChannel sdata = ch;
         recycle();
         
         if (pool != null) {
@@ -1367,33 +1468,41 @@
         return host + ":" + port;
     }
 
+    public void fromKeepAlive(SelectorChannel ch, SelectorCallback ka)
+            throws IOException {
+        synchronized (this) {
+            keepAliveCallback = ka;
+
+            ch.updateCallback(selectorCallback);
+        }
+        try {
+            selectorCallback.dataReceived(ch);
+        } catch (IOException ex) {
+            ch.close();
+        }
+    }
+
+
+    public BlockingHttp getBlockingHttp() {
+        return blockingHttp;
+    }
+    
     public class AsyncHttpSelectorCallback extends SelectorCallback { 
 
         @Override
-        public void dataReceived(SelectorData selT) throws IOException {
+        public void dataReceived(SelectorChannel ch) throws IOException {
             // Make sure it's correct. 
             // TODO: set it in 'fromKeepAlive'
-            AsyncHttp.this.selectorData = selT;
+            if (ch != null) {
+                AsyncHttp.this.ch = ch;
+            }
             
             if (recvState == State.HEAD) {
                 // Will read at least 1 byte to detect close
-                int res = headDataReceived(selT);
-                if (res == CLOSE) {
-                    selT.sel.close(selT);
-                    return;
-                } else if (res == NEED_MORE) { 
-                    // TODO: what happens if head is too large ( > headBuf ) ? 
-                    //selT.readInterest(this, true);
+                int res = headDataReceived(ch);
+                if (res < 0) {
                     return;
                 }
-                if (recvState != State.DONE) {
-                    recvState = State.BODY_DATA;
-                }
-                if (debug) {
-                    log.info(dbgName + ": Head done, start body " + ser + 
-                            " buf=" + rawRecvBuf.length() + " r=" + remainingRecv + " c=" + 
-                            chunkedRecv + " rs=" + recvState);
-                }
             } 
 
             // TODO: Make sure we don't process more than we need ( eat next req ).
@@ -1409,7 +1518,7 @@
                 if (rawRecvBuf.length() > 0) {
                     // we have something in the buffer ? Try to consume.
                     // Or at least make room for more.
-                    int state = rawDataReceived(selT);
+                    int state = rawDataReceived(ch);
                     // At this point we may have finished the req.
                     if (state == DONE) {
                         if (debug) {
@@ -1422,13 +1531,13 @@
 
                 // it should have been consumed or set to a new 
                 // buffer.
-                rc = tryRead(selT, rawRecvBuf);
+                rc = tryRead(ch, rawRecvBuf);
                 if (rc == CLOSE) {
                     closeOnEndSend = true;
                     recvState = State.DONE;
                     // Sender closed the recv stream - but he might have kept the 
                     // send stream open. Example: http1.0 post without keep-alive
-                    selT.sel.readInterest(selT, false);
+                    ch.readInterest(false);
                     return;
                 } 
                 if (rc == NEED_MORE) {
@@ -1440,17 +1549,14 @@
         }
 
         @Override
-        public void dataWriteable(SelectorData selT) 
+        public void dataWriteable(SelectorChannel ch) 
                 throws IOException {
-            if (!selT.sel.isSelectorThread()) {
-                throw new IOException("Not ST");
-            }
-            sendData(selT);
+            sendData(ch);
             return;
         }
 
         @Override
-        public void channelClosed(SelectorData selThread, Throwable ex) {
+        public void channelClosed(SelectorChannel ch, Throwable ex) {
             if (ex != null) {
                 System.err.println("Closed due to error: ");
                 ex.printStackTrace();
@@ -1458,24 +1564,142 @@
         }
 
         @Override
-        public void connected(SelectorData selThread) 
+        public void connected(SelectorChannel ch) 
                 throws IOException {
-            AsyncHttp.this.setSelectorData(selThread);
-            sendHead();            
+            AsyncHttp.this.setSelectorData(ch);
+            sendHead();
+           // sendData(ch); // if any
         }
         
         @Override
-        public void ioThreadRun(SelectorData selThread) throws IOException {
-            AsyncHttp.this.ioThreadRun(selThread);
+        public void ioThreadRun(SelectorChannel ch) throws IOException {
+            AsyncHttp.this.ioThreadRun(ch);
         }
         
         public String toString() {
             return AsyncHttp.this.toString();
         }
     }
+    
+    /**
+     * When committing the response, we have to validate the set of headers, as
+     * well as setup the response filters.
+     */
+    public void sendResponseHeaders() throws IOException {
 
-    public BlockingHttp getBlockingHttp() {
-        return blockingHttp;
+        boolean entityBody = true;
+        contentDelimitationRecv = false;
+
+        //List<OutputFilter> outputFilters = outputBuffer.getFilters();
+
+        if (http09 == true) {
+            // HTTP/0.9 - no headers
+            closeOnEndSend = true;
+            return;
+        }
+
+        int statusCode = res.getStatus();
+        if ((statusCode == 204) || (statusCode == 205)
+                || (statusCode == 304)) {
+            entityBody = false;
+            contentDelimitationRecv = true;
+        }
+
+        MessageBytes methodMB = req.method();
+        if (methodMB.equals("HEAD")) {
+            // No entity body
+            contentDelimitationRecv = true;
+        }
+
+//      // Sendfile support
+//      if (this.endpoint.getUseSendfile()) {
+//      String fileName = (String) request.getAttribute("org.apache.tomcat.sendfile.filename");
+//      if (fileName != null) {
+//      // No entity body sent here
+//      outputBuffer.addActiveFilter(outputFilters[Constants.VOID_FILTER]);
+//      contentDelimitation = true;
+//      sendfileData = new NioEndpoint.SendfileData();
+//      sendfileData.fileName = fileName;
+//      sendfileData.pos = ((Long) request.getAttribute("org.apache.tomcat.sendfile.start")).longValue();
+//      sendfileData.length = ((Long) request.getAttribute("org.apache.tomcat.sendfile.end")).longValue() - sendfileData.pos;
+//      }
+//      }
+
+        // Check for compression
+//      boolean useCompression = false;
+//      if (entityBody && (compressionLevel > 0) && (sendfileData == null)) {
+//      useCompression = isCompressable();
+//      // Change content-length to -1 to force chunking
+//      if (useCompression) {
+//      response.setContentLength(-1);
+//      }
+//      }
+
+        MimeHeaders headers = res.getMimeHeaders();
+        if (!entityBody) {
+            res.setContentLength(-1);
+        } else {
+            String contentType = res.getContentType();
+            if (contentType != null) {
+                headers.setValue("Content-Type").setString(contentType);
+            }
+            String contentLanguage = res.getContentLanguage();
+            if (contentLanguage != null) {
+                headers.setValue("Content-Language")
+                .setString(contentLanguage);
+            }
+        }
+
+        long contentLength = res.getContentLengthLong();
+        if (contentLength != -1) {
+            headers.setValue("Content-Length").setLong(contentLength);
+            remainingSend = contentLength;
+            contentDelimitationRecv = true;
+        } else {
+            if (entityBody && http11 && !closeOnEndSend) {
+                chunkedSend = true;
+                contentDelimitationRecv = true;
+                headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
+            } else {
+                closeOnEndSend = true;
+            }
+        }
+
+//      if (useCompression) {
+//      outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
+//      headers.setValue("Content-Encoding").setString("gzip");
+//      // Make Proxies happy via Vary (from mod_deflate)
+//      headers.setValue("Vary").setString("Accept-Encoding");
+//      }
+
+        // Add date header
+        headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());
+
+        // FIXME: Add transfer encoding header
+
+        if ((entityBody) && (!contentDelimitationRecv)) {
+            // Mark as close the connection after the request, and add the
+            // connection: close header
+            closeOnEndSend = true;
+        }
+
+        // If we know that the request is bad this early, add the
+        // Connection: close header.
+        closeOnEndSend = closeOnEndSend || statusDropsConnection(statusCode);
+        if (closeOnEndSend) {
+            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
+        } else if (!http11 && !error) {
+            headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
+        }
+
+        // Add server header
+        if (serverHeader.length() > 0) {
+            headers.setValue("Server").setString(serverHeader);
+        }
+
+        serializeResponse(res, headSendBuf);
+
+        startWrite(headSendBuf);
     }
     
 }
\ No newline at end of file

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttp.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpCallback.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java Wed Oct 22 21:45:05 2008
@@ -13,7 +13,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.coyote.client;
+package org.apache.tomcat.async.http;
 
 import java.io.IOException;
 import java.util.List;

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpCallback.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/AsyncHttpPool.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java Wed Oct 22 21:45:05 2008
@@ -1,12 +1,14 @@
 /*
  */
-package org.apache.coyote.client;
+package org.apache.tomcat.async.http;
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Logger;
 
-import org.apache.tomcat.util.net.SelectorPool;
+import org.apache.tomcat.async.SelectorPool;
+import org.apache.tomcat.async.SocketPool;
+import org.apache.tomcat.util.ObjectManager;
 
 /**
  * Factory and pool for AsyncHttp clients.
@@ -28,6 +30,8 @@
     
     public AsyncHttpPool(String id, SelectorPool selectors) {
         socketPool = new SocketPool(id);
+        ObjectManager.get().registerObject(socketPool, 
+                "SocketPool,id=" + id, "SocketPool");
         if (selectors == null) {
             client = SelectorPool.defaultPool();
         } else {

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/AsyncHttpPool.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/client/BlockingHttp.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java Wed Oct 22 21:45:05 2008
@@ -1,6 +1,6 @@
 /*
  */
-package org.apache.coyote.client;
+package org.apache.tomcat.async.http;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -12,7 +12,7 @@
 
 import org.apache.coyote.Request;
 import org.apache.coyote.Response;
-import org.apache.coyote.client.AsyncHttp.State;
+import org.apache.tomcat.async.http.AsyncHttp.State;
 import org.apache.tomcat.util.buf.ByteChunk;
 
 /**

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/BlockingHttp.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java (from r698781, tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/http/Http11Parser.java)
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java?p2=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java&p1=tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/http/Http11Parser.java&r1=698781&r2=707271&rev=707271&view=diff
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/http/Http11Parser.java (original)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java Wed Oct 22 21:45:05 2008
@@ -11,12 +11,13 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.tomcat.util.http;
+package org.apache.tomcat.async.http;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.MimeHeaders;
 
 /**
  * Non-blocking parser for request and response line and headers. 

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/http/Http11Parser.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java?rev=707271&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java Wed Oct 22 21:45:05 2008
@@ -0,0 +1,511 @@
+/*
+ */
+package org.apache.tomcat.async.socks;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.Channel;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import org.apache.tomcat.async.SelectorCallback;
+import org.apache.tomcat.async.SelectorChannel;
+import org.apache.tomcat.async.SelectorPool;
+
+/**
+ * A test for the selector package, and helper for the proxy - 
+ * a SOCKS4a server.
+ * 
+ * Besides the connection initialization, it's almost the 
+ *  same as the CONNECT method in http proxy.
+ * 
+ * http://ftp.icm.edu.pl/packages/socks/socks4/SOCKS4.protocol
+ * http://www.smartftp.com/Products/SmartFTP/RFC/socks4a.protocol
+ * http://www.faqs.org/rfcs/rfc1928.html
+ * https://svn.torproject.org/svn/tor/trunk/doc/spec/socks-extensions.txt
+ * 
+ * In firefox, set network.proxy.socks_remote_dns = true to do DNS via proxy.
+ * 
+ * Also interesting:
+ * http://transocks.sourceforge.net/
+ * 
+ * @author Costin Manolache
+ */
+public class SocksServer {
+    int port = 2080;
+    SelectorPool pool;
+    static Logger log = Logger.getLogger("SocksServer");
+    long idleTimeout = 60 * 1000; // 1 min
+    
+    long lastConnection = 0;
+    long totalConTime = 0;
+    int totalConnections;
+    AtomicInteger active = new AtomicInteger();
+    
+    long in;
+    long out;
+    
+    // TODO: buffer pool
+    public void setPort(int port) {
+        this.port = port;
+    }
+    
+    public void setIdleTimeout(long to) {
+        idleTimeout = to;
+    }
+
+    public void stop() {
+        System.err.println("Idle timeout");
+        System.exit(0);
+    }
+    
+    public void initServer() throws IOException {
+        pool = SelectorPool.defaultPool();
+        if (port < 0) {
+            pool.getSelector().inetdAcceptor(new SocksAcceptorCallback());            
+        } else {
+            pool.getSelector().acceptor(new SocksAcceptorCallback(), port, null, 
+                    0, 0);
+        }
+        
+        Timer timer = new Timer();
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                // if lastConnection == 0 - it'll terminate on first timer
+                float avg = (totalConnections > 0) ? 
+                        totalConTime / totalConnections : 0;
+                System.err.println("Timer" 
+                        + "\ttotal=" + totalConnections
+                        + "\tin=" + in  
+                        + "\tout=" + out
+                        + "\tavg=" + (int) avg);
+                if (active.get() == 0 
+                        && System.currentTimeMillis() - lastConnection > idleTimeout) {
+                    stop();
+                }
+            }
+        }, 5 * 60 * 1000, 5 * 60 * 1000);
+    }
+
+    
+    public class SocksAcceptorCallback extends SelectorCallback {
+        @Override
+        public SelectorCallback connectionAccepted(SelectorChannel ch, 
+                                                   Channel sockC) {
+            lastConnection = System.currentTimeMillis();
+            active.incrementAndGet();
+            totalConnections++;
+            return new SocksServerCallback(ch, sockC);
+        }
+    }
+    
+    public static class CopyCallback extends SelectorCallback {
+        
+        protected CopyCallback pair;
+
+        protected int received;
+        protected List<ByteBuffer> sendBrigade = 
+            new LinkedList<ByteBuffer>();
+        protected SelectorChannel ch;
+            
+        public CopyCallback() {
+        }
+         
+        public CopyCallback(CopyCallback pair) {
+            this.pair = pair;
+        }
+        
+        @Override
+        public void dataReceived(SelectorChannel ch) throws IOException {
+            // body.
+            while (true) {
+                ByteBuffer bb = ByteBuffer.allocateDirect(2048);
+                int rd = ch.read(bb);
+                bb.flip();
+                
+                if (rd < 0) {
+                    close(ch, pair.ch); // TODO: close client
+                    return;
+                }
+                if (rd == 0) {
+                    return;
+                }
+                this.received += bb.remaining();
+                synchronized (pair.sendBrigade) {
+                    pair.sendBrigade.add(bb);
+                }
+                sendData(pair.ch, pair.sendBrigade, ch);
+            }
+        }   
+        
+        @Override
+        public void dataWriteable(SelectorChannel sdata) throws IOException {
+            sendData(sdata, sendBrigade, pair.ch);
+        }
+        
+        void close(SelectorChannel selData1, SelectorChannel selData2) throws IOException {
+            if (selData1 != null && selData1.isOpen()) {
+                selData1.close();
+            }
+            
+            if (selData2 != null  && selData2.isOpen()) {
+                selData2.close();
+            }
+        }
+
+
+        void sendData(SelectorChannel sdata, List<ByteBuffer> sendBrigade,
+                             SelectorChannel other) 
+                throws IOException {
+            synchronized (sendBrigade) {
+                while (sendBrigade.size() > 0) {
+                    ByteBuffer current = sendBrigade.get(0);
+                    while (current.remaining() > 0) {
+                        // write
+                        int done = sdata.write(current);
+                        
+                        if (done < 0) {
+                            System.err.println("Closed while writting ");
+                            close(sdata, other);
+                            return;
+                        }
+                        
+                        if (done == 0) {
+                            sdata.writeInterest();
+                            return;
+                        }
+                    }
+                    sendBrigade.remove(0);
+                }
+            }
+        }
+        
+        /** 
+         * Close was detected, or an unhandled exception happened while processing
+         * this callback.
+         */
+        @Override
+        public void channelClosed(SelectorChannel sdata, Throwable ex) {
+            try {
+                close(null, pair.ch);
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            } // TODO: close client
+        }
+        
+        @Override
+        public void connected(SelectorChannel ch) throws IOException {
+            this.ch = ch;
+
+            pair.clientConnected(ch);
+        }
+
+        /** Called when the other side is connected.
+         */
+        public void clientConnected(SelectorChannel ch) throws IOException {
+        }        
+    }
+    
+    public class SocksServerCallback extends CopyCallback {
+
+        boolean headReceived;
+        boolean head5Received = false;
+        
+        ByteBuffer headBuffer = ByteBuffer.allocate(256);
+        ByteBuffer headReadBuffer = headBuffer.duplicate();
+        
+        ByteBuffer headResBuffer = ByteBuffer.allocate(256);
+        
+        byte ver;
+        byte cmd;
+
+        static final int CMD_CONNECT = 0;
+
+        static final byte CMD_RESOLVE = (byte) 0xF0;
+        
+        int port;
+        byte[] hostB = new byte[4];
+        CharBuffer userId = CharBuffer.allocate(256);
+        CharBuffer hostName = CharBuffer.allocate(256);
+        
+        SocketAddress sa = null;
+        private byte atyp;
+        
+        long startTime = System.currentTimeMillis();
+
+        public SocksServerCallback(SelectorChannel ch, Channel sockC) {
+            this.ch = ch;
+            pair = new CopyCallback(this);
+        }
+        
+        public void clientConnected(SelectorChannel clientCh) throws IOException {
+            headResBuffer.clear();
+            if (ver == 4) {
+                headResBuffer.put((byte) 0);
+                headResBuffer.put((byte) 90);
+                for (int i = 0; i < 6; i++ ) {
+                    headResBuffer.put((byte) 0);
+                }
+            } else {
+                headResBuffer.put((byte) 5);
+                headResBuffer.put((byte) 0);
+                headResBuffer.put((byte) 0);
+                headResBuffer.put((byte) 1); // ip
+                
+                headResBuffer.put(hostB);
+                int port2 = clientCh.getPort(true);
+                headResBuffer.putShort((short) port2);
+            }
+            headResBuffer.flip();
+            sendBrigade.add(headResBuffer);
+            log.fine("Connected " + sa.toString());
+            if (headReadBuffer.remaining() > 0) {
+                sendBrigade.add(headReadBuffer);
+            }
+            sendData(ch, sendBrigade, pair.ch);
+        }
+
+        protected int parseHead() throws IOException {
+            // data is between 0 and pos. 
+            int pos = headBuffer.position();
+            headReadBuffer.clear();
+            headReadBuffer.limit(pos);
+            if (headReadBuffer.remaining() < 2) {
+                return -1;
+            }
+            
+            ByteBuffer bb = headReadBuffer;
+            ver = bb.get();
+            if (ver == 5) {
+                return parseHead5();
+            }
+            if (headReadBuffer.remaining() < 8) {
+                return -1;
+            }
+            cmd = bb.get();
+            port = bb.getShort();
+            bb.get(hostB);
+            userId.clear();
+            int rc = readStringZ(bb, userId);
+            // Mozilla userid: MOZ ...
+            if (rc == -1) {
+                return rc;
+            }
+            if (hostB[0] == 0 && hostB[1] == 0 && hostB[2] == 0) {
+                // 0.0.0.x
+                atyp = 3;
+                hostName.clear();
+                rc = readStringZ(bb, hostName);
+                if (rc == -1) {
+                    return rc;
+                }
+            } else {
+                atyp = 1;
+            }
+            
+            headReceived = true;
+            
+            return 4;
+        }
+
+        protected int parseHead5_2() throws IOException {
+            // data is between 0 and pos. 
+            int pos = headBuffer.position();
+            
+            headReadBuffer.clear();
+            headReadBuffer.limit(pos);
+            
+            if (headReadBuffer.remaining() < 7) {
+                return -1;
+            }
+            
+            ByteBuffer bb = headReadBuffer;
+            ver = bb.get();
+            cmd = bb.get();
+            bb.get(); // reserved
+            atyp = bb.get();
+            if (atyp == 1) {
+                bb.get(hostB);
+            } else if (atyp == 3) {
+                hostName.clear();
+                int rc = readStringN(bb, hostName);
+                if (rc == -1) {
+                    return rc;
+                }
+            } // ip6 not supported right now, easy to add
+            
+            port = bb.getShort();
+            
+            head5Received = true;
+            
+            return 5;
+        }
+
+        private int parseHead5() {
+            ByteBuffer bb = headReadBuffer;
+            int nrMethods = ((int)bb.get()) & 0xFF;
+            if (bb.remaining() < nrMethods) {
+                return -1;
+            }
+            for (int i = 0; i < nrMethods; i++) {
+                // ignore 
+                bb.get();
+            }
+            return 5;
+        }
+
+        private int readStringZ(ByteBuffer bb, CharBuffer bc) throws IOException {
+            bc.clear();
+            while (true) {
+                if (!bb.hasRemaining()) {
+                    return -1; // not complete
+                }
+                byte b = bb.get();
+                if (b == 0) {
+                    bc.flip();
+                    return 0;
+                } else {
+                    bc.put((char) b);
+                }
+            }
+        }
+
+        private int readStringN(ByteBuffer bb, CharBuffer bc) throws IOException {
+            bc.clear();
+            int len = ((int) bb.get()) & 0xff;
+            for (int i = 0; i < len; i++) {
+                if (!bb.hasRemaining()) {
+                    return -1; // not complete
+                }
+                byte b = bb.get();
+                bc.put((char) b);
+            }
+            bc.flip();
+            return len;
+        }
+        
+        boolean checkResult(SelectorChannel ch, int rd) throws IOException {
+            if (rd < 0) {
+                // close
+                ch.close();
+                return true;
+            }
+            if (rd == 0) {
+                return true; // needs more data.
+            }
+            return false;
+        }
+        
+        @Override
+        public void dataReceived(SelectorChannel ch) throws IOException {
+            if (!headReceived) {
+                int rd = ch.read(headBuffer);
+                if (checkResult(ch, rd)) {
+                    return;
+                }
+                
+                rd = parseHead();
+                if (rd < 0) {
+                    return; // need more
+                }
+                if (rd == 5) {
+                    headResBuffer.clear();
+                    headResBuffer.put((byte) 5);
+                    headResBuffer.put((byte) 0);
+                    headResBuffer.flip();
+                    sendBrigade.add(headResBuffer);
+                    sendData(ch, sendBrigade, pair.ch);
+                    headReceived = true;
+                    headBuffer.clear();
+                    return;
+                } else {
+                    headReceived = true;
+                    head5Received = true;
+                    initConnection();
+                }
+            }
+            
+            if (!head5Received) {
+                int rd = ch.read(headBuffer);
+                if (checkResult(ch, rd)) {
+                    return;
+                }
+                
+                rd = parseHead5_2();
+                if (rd < 0) {
+                    return; // need more
+                }
+                
+                initConnection();                
+            }
+            
+            // body.
+            while (true) {
+                ByteBuffer bb = ByteBuffer.allocateDirect(2048);
+                int rd = ch.read(bb);
+                bb.flip();
+                
+                if (rd < 0) {
+                    ch.close(); // TODO: close client
+                    return;
+                }
+                if (rd == 0) {
+                    return;
+                }
+                received += bb.remaining();
+
+                synchronized (pair.sendBrigade) {
+                    pair.sendBrigade.add(bb);
+                }
+                sendData(pair.ch, pair.sendBrigade, ch);
+            }
+        }
+
+        public void channelClosed(SelectorChannel sdata, Throwable ex) {
+            try {
+                int a = active.decrementAndGet();
+                close(null, pair.ch);
+                long conTime = System.currentTimeMillis() - startTime;
+                System.err.println(sa + "\t" +
+                        received 
+                        + "\t" + pair.received
+                        + "\t" + a
+                        + "\t" + totalConnections
+                        + "\t" + conTime
+                        + "\t" + ((ex != null ) ? ex.toString() : ""));
+                in += received;
+                totalConTime += conTime;
+                out += pair.received;
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            } // TODO: close client
+        }
+        
+        
+        void initConnection() throws IOException {
+            // TODO: use different thread ?
+            if (atyp == 3) {
+                sa = new InetSocketAddress(hostName.toString(), port);
+            } else {
+                InetAddress addr = InetAddress.getByAddress(hostB);
+                sa = new InetSocketAddress(addr, port);
+            } // TODO: ip6
+            
+            pool.getSelector().connect(sa, pair); 
+            // TODO: catch  DNS exception
+            // TODO: resolve DNS in thread pool
+        }
+    }
+    
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java
URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java?rev=707271&view=auto
==============================================================================
--- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java (added)
+++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java Wed Oct 22 21:45:05 2008
@@ -0,0 +1,16 @@
+/*
+ */
+package org.apache.tomcat.async.socks;
+
+import org.apache.tomcat.util.IntrospectionUtils;
+
+public class SocksServerMain {
+
+    public static void main(String[] args) throws Exception {
+        SocksServer ss = new SocksServer();
+        
+        IntrospectionUtils.processArgs(ss, args);
+        
+        ss.initServer();
+    }
+}

Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/async/socks/SocksServerMain.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message