tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cos...@apache.org
Subject svn commit: r1292133 - in /tomcat/trunk/java/org/apache: coyote/spdy/ tomcat/spdy/
Date Wed, 22 Feb 2012 05:47:22 GMT
Author: costin
Date: Wed Feb 22 05:47:22 2012
New Revision: 1292133

URL: http://svn.apache.org/viewvc?rev=1292133&view=rev
Log:
First part of spdy support - the low-level protocol and a protocol implementation.



Added:
    tomcat/trunk/java/org/apache/coyote/spdy/
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyProxyProtocol.java
    tomcat/trunk/java/org/apache/tomcat/spdy/
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
    tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java

Added: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (added)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,586 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.spdy;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.coyote.AbstractProcessor;
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.AsyncContextCallback;
+import org.apache.coyote.InputBuffer;
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Request;
+import org.apache.coyote.RequestInfo;
+import org.apache.coyote.Response;
+import org.apache.coyote.http11.upgrade.UpgradeInbound;
+import org.apache.tomcat.spdy.SpdyFrame;
+import org.apache.tomcat.spdy.SpdyConnection;
+import org.apache.tomcat.spdy.SpdyStream;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.buf.Ascii;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.http.MimeHeaders;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+/**
+ * A spdy stream ( multiplexed over a spdy tcp connection ) processed by a
+ * tomcat servlet.
+ * 
+ * Based on the AJP processor.
+ * 
+ */
+public class SpdyProcessor extends AbstractProcessor<Object> implements
+        Runnable {
+
+    // TODO: handle input
+    // TODO: recycle
+    // TODO: swallow input ( recycle only after input close )
+    // TODO: find a way to inject an OutputBuffer, or interecept close() -
+    // so we can send FIN in the last data packet.
+
+    private SpdyConnection spdy;
+
+    // Associated spdy stream
+    SpdyStream spdyStream;
+
+    ByteChunk keyBuffer = new ByteChunk();
+
+    boolean error = false;
+
+    private boolean finished;
+
+    SpdyFrame inFrame = null;
+
+    boolean outClosed = false;
+
+    boolean outCommit = false;
+
+    public SpdyProcessor(SpdyConnection spdy, AbstractEndpoint endpoint) {
+        super(endpoint);
+
+        this.spdy = spdy;
+        request.setInputBuffer(new LiteInputBuffer());
+        response.setOutputBuffer(new LiteOutputBuffer());
+
+    }
+
+    class LiteInputBuffer implements InputBuffer {
+        @Override
+        public int doRead(ByteChunk bchunk, Request request) throws IOException {
+            if (inFrame == null) {
+                // blocking
+                inFrame = spdyStream.getIn(endpoint.getSoTimeout());
+            }
+            if (inFrame == null) {
+                return -1;
+            }
+
+            int rd = Math.min(inFrame.endData, bchunk.getBytes().length);
+            System.arraycopy(inFrame.data, inFrame.off, bchunk.getBytes(),
+                    bchunk.getStart(), rd);
+            inFrame.advance(rd);
+            if (inFrame.off == inFrame.endData) {
+                spdy.getSpdyContext().releaseFrame(inFrame);
+            }
+            bchunk.setEnd(bchunk.getEnd() + rd);
+            return rd;
+        }
+    }
+
+    final class LiteOutputBuffer implements OutputBuffer {
+        long byteCount;
+
+        @Override
+        public int doWrite(org.apache.tomcat.util.buf.ByteChunk chunk,
+                Response response) throws IOException {
+            if (!response.isCommitted()) {
+
+                // Send the connector a request for commit. The connector should
+                // then validate the headers, send them (using sendHeader) and
+                // set the filters accordingly.
+                response.action(ActionCode.COMMIT, null);
+
+            }
+            spdyStream.sendDataFrame(chunk.getBuffer(), chunk.getStart(),
+                    chunk.getLength(), false);
+            byteCount += chunk.getLength();
+            return chunk.getLength();
+        }
+
+        @Override
+        public long getBytesWritten() {
+            return byteCount;
+        }
+    }
+
+    void onRequest() {
+        Executor exec = spdy.getSpdyContext().getExecutor();
+        exec.execute(this);
+    }
+
+    /**
+     * Execute the request.
+     */
+    @Override
+    public void run() {
+        RequestInfo rp = request.getRequestProcessor();
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            adapter.service(request, response);
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            // log.error(sm.getString("ajpprocessor.request.process"), t);
+            // 500 - Internal Server Error
+            t.printStackTrace();
+            response.setStatus(500);
+            adapter.log(request, response, 0);
+            error = true;
+        }
+
+        // TODO: async, etc ( detached mode - use a special light protocol)
+
+        // Finish the response if not done yet
+        if (!finished) {
+            try {
+                finish();
+            } catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                error = true;
+            }
+        }
+
+        if (error) {
+            response.setStatus(500);
+        }
+
+        request.updateCounters();
+        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
+        // TODO: recycle
+    }
+
+    private void finish() {
+        if (!response.isCommitted()) {
+            response.action(ActionCode.COMMIT, response);
+        }
+
+        if (finished)
+            return;
+
+        finished = true;
+
+        response.finish();
+    }
+
+    static final byte[] EMPTY = new byte[0];
+
+    // Processor implementation
+
+    private void maybeCommit() {
+        if (outCommit) {
+            return;
+        }
+        if (!response.isCommitted()) {
+            // Validate and write response headers
+            try {
+                sendSynReply();
+            } catch (IOException e) {
+                e.printStackTrace();
+                // Set error flag
+                error = true;
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void action(ActionCode actionCode, Object param) {
+        if (spdy.getSpdyContext().debug) {
+            // System.err.println(actionCode);
+        }
+
+        // TODO: async
+
+        if (actionCode == ActionCode.COMMIT) {
+            maybeCommit();
+        } else if (actionCode == ActionCode.CLIENT_FLUSH) {
+            maybeCommit();
+
+            // try {
+            // flush(true);
+            // } catch (IOException e) {
+            // // Set error flag
+            // error = true;
+            // }
+
+        } else if (actionCode == ActionCode.DISABLE_SWALLOW_INPUT) {
+            // TODO: Do not swallow request input but
+            // make sure we are closing the connection
+            error = true;
+
+        } else if (actionCode == ActionCode.CLOSE) {
+            if (outClosed) {
+                return;
+            }
+            outClosed = true;
+            // Close
+            // End the processing of the current request, and stop any further
+            // transactions with the client
+            maybeCommit();
+
+            try {
+                spdyStream.sendDataFrame(EMPTY, 0, 0, true);
+            } catch (IOException e) {
+                // Set error flag
+                e.printStackTrace();
+                error = true;
+            }
+
+        } else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE) {
+
+            // if (!certificates.isNull()) {
+            // ByteChunk certData = certificates.getByteChunk();
+            // X509Certificate jsseCerts[] = null;
+            // ByteArrayInputStream bais =
+            // new ByteArrayInputStream(certData.getBytes(),
+            // certData.getStart(),
+            // certData.getLength());
+            // // Fill the elements.
+            // try {
+            // CertificateFactory cf;
+            // if (clientCertProvider == null) {
+            // cf = CertificateFactory.getInstance("X.509");
+            // } else {
+            // cf = CertificateFactory.getInstance("X.509",
+            // clientCertProvider);
+            // }
+            // while(bais.available() > 0) {
+            // X509Certificate cert = (X509Certificate)
+            // cf.generateCertificate(bais);
+            // if(jsseCerts == null) {
+            // jsseCerts = new X509Certificate[1];
+            // jsseCerts[0] = cert;
+            // } else {
+            // X509Certificate [] temp = new
+            // X509Certificate[jsseCerts.length+1];
+            // System.arraycopy(jsseCerts,0,temp,0,jsseCerts.length);
+            // temp[jsseCerts.length] = cert;
+            // jsseCerts = temp;
+            // }
+            // }
+            // } catch (java.security.cert.CertificateException e) {
+            // getLog().error(sm.getString("ajpprocessor.certs.fail"), e);
+            // return;
+            // } catch (NoSuchProviderException e) {
+            // getLog().error(sm.getString("ajpprocessor.certs.fail"), e);
+            // return;
+            // }
+            // request.setAttribute(SSLSupport.CERTIFICATE_KEY, jsseCerts);
+            // }
+
+        } else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) {
+
+            // Get remote host name using a DNS resolution
+            if (request.remoteHost().isNull()) {
+                try {
+                    request.remoteHost().setString(
+                            InetAddress.getByName(
+                                    request.remoteAddr().toString())
+                                    .getHostName());
+                } catch (IOException iex) {
+                    // Ignore
+                }
+            }
+        } else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) {
+            String configured = (String) endpoint.getAttribute("proxyPort");
+            int localPort = 0;
+            if (configured != null) {
+                localPort = Integer.parseInt(configured);
+            } else {
+                localPort = endpoint.getPort();
+            }
+            request.setLocalPort(localPort);
+
+        } else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) {
+            InetAddress localAddress = endpoint.getAddress();
+            String localIp = localAddress == null ? null : localAddress
+                    .getHostAddress();
+            if (localIp == null) {
+                localIp = (String) endpoint.getAttribute("proxyIP");
+            }
+            if (localIp == null) {
+                localIp = "127.0.0.1";
+            }
+            request.localAddr().setString(localIp);
+
+        } else if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) {
+            InetAddress localAddress = endpoint.getAddress();
+            String localH = localAddress == null ? null : localAddress
+                    .getCanonicalHostName();
+            if (localH == null) {
+                localH = (String) endpoint.getAttribute("proxyName");
+            }
+            if (localH == null) {
+                localH = "localhost";
+            }
+
+            request.localAddr().setString(localH);
+
+        } else if (actionCode == ActionCode.REQ_SET_BODY_REPLAY) {
+
+            // // Set the given bytes as the content
+            // ByteChunk bc = (ByteChunk) param;
+            // int length = bc.getLength();
+            // bodyBytes.setBytes(bc.getBytes(), bc.getStart(), length);
+            // request.setContentLength(length);
+            // first = false;
+            // empty = false;
+            // replay = true;
+
+        } else if (actionCode == ActionCode.ASYNC_START) {
+            asyncStateMachine.asyncStart((AsyncContextCallback) param);
+        } else if (actionCode == ActionCode.ASYNC_DISPATCHED) {
+            asyncStateMachine.asyncDispatched();
+        } else if (actionCode == ActionCode.ASYNC_TIMEOUT) {
+            AtomicBoolean result = (AtomicBoolean) param;
+            result.set(asyncStateMachine.asyncTimeout());
+        } else if (actionCode == ActionCode.ASYNC_RUN) {
+            asyncStateMachine.asyncRun((Runnable) param);
+        } else if (actionCode == ActionCode.ASYNC_ERROR) {
+            asyncStateMachine.asyncError();
+        } else if (actionCode == ActionCode.ASYNC_IS_STARTED) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncStarted());
+        } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
+            ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
+        } else {
+            // TODO:
+            // actionInternal(actionCode, param);
+        }
+
+    }
+
+    private static byte[] STATUS = "status".getBytes();
+
+    private static byte[] VERSION = "version".getBytes();
+
+    private static byte[] HTTP11 = "HTTP/1.1".getBytes();
+
+    private static byte[] OK200 = "200 OK".getBytes();
+
+    /**
+     * When committing the response, we have to validate the set of headers, as
+     * well as setup the response filters.
+     */
+    protected void sendSynReply() throws IOException {
+
+        response.setCommitted(true);
+
+        // Special headers
+        MimeHeaders headers = response.getMimeHeaders();
+        String contentType = response.getContentType();
+        if (contentType != null) {
+            headers.setValue("Content-Type").setString(contentType);
+        }
+        String contentLanguage = response.getContentLanguage();
+        if (contentLanguage != null) {
+            headers.setValue("Content-Language").setString(contentLanguage);
+        }
+        long contentLength = response.getContentLengthLong();
+        if (contentLength >= 0) {
+            headers.setValue("Content-Length").setLong(contentLength);
+        }
+
+        sendResponseHead();
+    }
+
+    private void sendResponseHead() throws IOException {
+        SpdyFrame rframe = spdy.getFrame(SpdyConnection.TYPE_SYN_REPLY);
+        // TODO: is closed ?
+        rframe.streamId = spdyStream.reqFrame.streamId;
+        rframe.associated = 0;
+
+        MimeHeaders headers = response.getMimeHeaders();
+        for (int i = 0; i < headers.size(); i++) {
+            MessageBytes mb = headers.getName(i);
+            mb.toBytes();
+            ByteChunk bc = mb.getByteChunk();
+            byte[] bb = bc.getBuffer();
+            for (int j = bc.getStart(); j < bc.getEnd(); j++) {
+                bb[j] = (byte) Ascii.toLower(bb[j]);
+            }
+            // TODO: filter headers: Connection, Keep-Alive, Proxy-Connection,
+            rframe.headerName(bc.getBuffer(), bc.getStart(), bc.getLength());
+            mb = headers.getValue(i);
+            mb.toBytes();
+            bc = mb.getByteChunk();
+            rframe.headerValue(bc.getBuffer(), bc.getStart(), bc.getLength());
+        }
+        rframe.headerName(STATUS, 0, STATUS.length);
+
+        if (response.getStatus() == 0) {
+            rframe.headerValue(OK200, 0, OK200.length);
+        } else {
+            // HTTP header contents
+            String message = null;
+            if (org.apache.coyote.Constants.USE_CUSTOM_STATUS_MSG_IN_HEADER
+                    && HttpMessages.isSafeInHttpHeader(response.getMessage())) {
+                message = response.getMessage();
+            }
+            if (message == null) {
+                message = HttpMessages.getMessage(response.getStatus());
+            }
+            if (message == null) {
+                // mod_jk + httpd 2.x fails with a null status message - bug
+                // 45026
+                message = Integer.toString(response.getStatus());
+            }
+            // TODO: optimize
+            String status = response.getStatus() + " " + message;
+            byte[] statusB = status.getBytes();
+            rframe.headerValue(statusB, 0, statusB.length);
+        }
+        rframe.headerName(VERSION, 0, VERSION.length);
+        rframe.headerValue(HTTP11, 0, HTTP11.length);
+
+        spdy.sendFrameBlocking(rframe, spdyStream);
+        // we can't reuse the frame - it'll be queued, the coyote processor
+        // may be reused as well.
+        outCommit = true;
+    }
+
+    @Override
+    public boolean isComet() {
+        return false;
+    }
+
+    @Override
+    public SocketState process(SocketWrapper socket) throws IOException {
+        throw new IOException("Unimplemented");
+    }
+
+    @Override
+    public SocketState event(SocketStatus status) throws IOException {
+        System.err.println("EVENT: " + status);
+        return null;
+    }
+
+    @Override
+    public SocketState asyncDispatch(SocketStatus status) {
+        System.err.println("ASYNC DISPATCH: " + status);
+        return null;
+    }
+
+    @Override
+    public boolean isUpgrade() {
+        return false;
+    }
+
+    @Override
+    public SocketState upgradeDispatch() throws IOException {
+        return null;
+    }
+
+    public void onSynStream(SpdyStream str) throws IOException {
+        this.spdyStream = str;
+        SpdyFrame frame = str.reqFrame;
+        // We need to make a copy - the frame buffer will be reused.
+        // We use the 'wrap' methods of MimeHeaders - which should be
+        // lighter on mem in some cases.
+        RequestInfo rp = request.getRequestProcessor();
+        rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
+
+        // Request received.
+        MimeHeaders mimeHeaders = request.getMimeHeaders();
+
+        for (int i = 0; i < frame.nvCount; i++) {
+            int nameLen = frame.read16();
+            if (nameLen > frame.remaining()) {
+                throw new IOException("Name too long");
+            }
+
+            keyBuffer.setBytes(frame.data, frame.off, nameLen);
+            if (keyBuffer.equals("method")) {
+                frame.advance(nameLen);
+                int valueLen = frame.read16();
+                if (valueLen > frame.remaining()) {
+                    throw new IOException("Name too long");
+                }
+                request.method().setBytes(frame.data, frame.off, valueLen);
+                frame.advance(valueLen);
+            } else if (keyBuffer.equals("url")) {
+                frame.advance(nameLen);
+                int valueLen = frame.read16();
+                if (valueLen > frame.remaining()) {
+                    throw new IOException("Name too long");
+                }
+                request.requestURI().setBytes(frame.data, frame.off, valueLen);
+                if (spdy.getSpdyContext().debug) {
+                    System.err.println("URL= " + request.requestURI());
+                }
+                frame.advance(valueLen);
+            } else if (keyBuffer.equals("version")) {
+                frame.advance(nameLen);
+                int valueLen = frame.read16();
+                if (valueLen > frame.remaining()) {
+                    throw new IOException("Name too long");
+                }
+                frame.advance(valueLen);
+            } else {
+                MessageBytes value = mimeHeaders.addValue(frame.data,
+                        frame.off, nameLen);
+                frame.advance(nameLen);
+                int valueLen = frame.read16();
+                if (valueLen > frame.remaining()) {
+                    throw new IOException("Name too long");
+                }
+                value.setBytes(frame.data, frame.off, valueLen);
+                frame.advance(valueLen);
+            }
+        }
+
+        onRequest();
+    }
+
+    @Override
+    public void recycle(boolean socketClosing) {
+    }
+
+    @Override
+    public void setSslSupport(SSLSupport sslSupport) {
+    }
+
+    @Override
+    public UpgradeInbound getUpgradeInbound() {
+        return null;
+    }
+
+}

Added: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProxyProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProxyProtocol.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProxyProtocol.java (added)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProxyProtocol.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,122 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.spdy;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.coyote.AbstractProtocol;
+import org.apache.coyote.ajp.Constants;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.spdy.SpdyConnection;
+import org.apache.tomcat.spdy.SpdyContextProxy;
+import org.apache.tomcat.spdy.SpdyStream;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
+import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.SSLImplementation;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+/**
+ * SPDY in 'proxy' mode - no SSL and no header compression. 
+ * This doesn't require JNI libraries, SSL/compression are off-loaded to 
+ * a reverse proxy ( apache, etc ). 
+ * 
+ * To configure:
+ * <Connector port="8011" protocol="org.apache.coyote.spdy.SpdyProxyProtocol"/> 
+ * 
+ * To test, use 
+ *   chrome  --use-spdy=no-compress,no-ssl [--enable-websocket-over-spdy]
+ * 
+ * TODO: Remote information (client ip, certs, etc ) will be sent in X- headers.
+ * TODO: if spdy->spdy proxy, info about original spdy stream for pushes.
+ * 
+ */
+public class SpdyProxyProtocol extends AbstractProtocol {
+    private static final Log log = LogFactory.getLog(SpdyProxyProtocol.class);
+    
+    JIoEndpoint.Handler cHandler = new TomcatJioHandler();
+    SpdyContextProxy spdyContext;
+    
+    public SpdyProxyProtocol() {
+        endpoint = new JIoEndpoint();
+        ((JIoEndpoint) endpoint).setHandler(cHandler);
+        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
+        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
+        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
+    }
+    
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+    @Override
+    protected String getNamePrefix() {
+        return "spdy2-jio";
+    }
+
+    @Override
+    protected String getProtocolName() {
+        return "spdy2";
+    }
+
+    @Override
+    protected Handler getHandler() {
+        return cHandler;
+    }
+    
+    public void start() throws Exception {
+        super.start();
+        spdyContext = new SpdyContextProxy() {
+            @Override
+            protected void onSynStream(SpdyConnection con, SpdyStream ch) throws IOException {
+                SpdyProcessor sp = new SpdyProcessor(con, endpoint);
+                sp.setAdapter(adapter);
+                sp.onSynStream(ch);
+            }
+        };
+        spdyContext.setExecutor(endpoint.getExecutor());
+    }
+    
+    public class TomcatJioHandler implements JIoEndpoint.Handler {
+
+        @Override
+        public Object getGlobal() {
+            return null;
+        }
+
+        @Override
+        public void recycle() {
+        }
+
+        @Override
+        public SocketState process(SocketWrapper<Socket> socket,
+                SocketStatus status) {
+            SpdyConnection ch = spdyContext.getConnection(socket.getSocket()); 
+            ch.onBlockingSocket();
+            return SocketState.CLOSED;
+        }
+
+        @Override
+        public SSLImplementation getSslImplementation() {
+            return null;
+        }
+
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyConnection.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,641 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Main class implementing SPDY protocol. Works with both blocking and
+ * non-blocking sockets. To simplify integration in various endpoints there is
+ * no 'socket' layer/abstraction, but read/write abstract methods.
+ * 
+ * Because SPDY is multiplexing, a blocking socket needs a second thread to
+ * handle writes ( the read thread may be blocked while a servlet is writing ).
+ * The intended use of SPDY with blocking sockets is for frontend(load-balancer)
+ * to tomcat, where each tomcat will have a few spdy connections.
+ * 
+ */
+public abstract class SpdyConnection { // implements Runnable {
+
+    // TODO: this can be pooled, to avoid allocation on idle connections
+    // TODO: override socket timeout
+
+    protected volatile SpdyFrame inFrame;
+
+    protected CompressSupport compressSupport;
+
+    // Fields stored for each spdy connection
+    Map<Integer, SpdyStream> channels = new HashMap<Integer, SpdyStream>();
+
+    // --------------
+    protected static final Logger log = Logger.getLogger(SpdyConnection.class
+            .getName());
+
+    public static final int TYPE_SYN_STREAM = 1;
+
+    public static final int TYPE_SYN_REPLY = 2;
+
+    public static final int TYPE_RST_STREAM = 3;
+
+    public static final int TYPE_SETTINGS = 4;
+
+    public static final int TYPE_PING = 6;
+
+    public static final int TYPE_GOAWAY = 7;
+
+    public static final int TYPE_HEADERS = 8;
+
+    public static final int TYPE_WINDOW = 8;
+
+    public static String[] TYPES = { "SYN_STREAM", "SYN_REPLY", "RST_STREAM",
+            "SETTINGS", "5", "PING", "GOAWAY", "HEADERS", "WINDOW_UPDATE" };
+
+    static int FLAG_HALF_CLOSE = 1;
+
+    public static String[] RST_ERRORS = {
+            // This is a generic error, and should only be used if a more
+            // specific error is not available.
+            "PROTOCOL_ERROR", "INVALID_STREAM",
+            // This is returned when a frame is received for a stream which is
+            // not
+            // active.
+            "REFUSED_STREAM",
+            // Indicates that the stream was refused before any processing has
+            // been
+            // done on the stream.
+            "UNSUPPORTED_VERSION",
+            // 4 Indicates that the recipient of a stream does not support the
+            // SPDY version requested.
+            "CANCEL",
+            // 5 Used by the creator of a stream to indicate that the stream is
+            // no longer needed.
+            "FLOW_CONTROL_ERROR",
+            // 6 The endpoint detected that its peer violated the flow control
+            // protocol.
+            "STREAM_IN_USE",
+            // 7 The endpoint received a SYN_REPLY for a stream already open.
+            "STREAM_ALREADY_CLOSED"
+    // 8 The endpoint received a data or SYN_REPLY frame for a stream which
+    // is half closed.
+    };
+
+    // protected SpdyFrame currentOutFrame = new SpdyFrame();
+
+    protected SpdyContext spdyContext;
+
+    protected boolean inClosed;
+
+    int lastChannel;
+
+    int outStreamId = 1;
+
+    // TODO: finer handling of priorities
+    LinkedList<SpdyFrame> prioriyQueue = new LinkedList<SpdyFrame>();
+
+    LinkedList<SpdyFrame> outQueue = new LinkedList<SpdyFrame>();
+
+    Lock framerLock = new ReentrantLock();
+
+    // --------------
+
+    public static byte[] NPN = "spdy/2".getBytes();
+
+    private Condition outCondition;
+
+    public SpdyConnection(SpdyContext spdyContext) {
+        this.setSpdyContext(spdyContext);
+        outCondition = framerLock.newCondition();
+    }
+
+    /**
+     * Write.
+     */
+    public abstract int write(byte[] data, int off, int len) throws IOException;
+
+    /**
+     * Like read, but may return 0 if no data is available and the channel
+     * supports polling.
+     */
+    public abstract int read(byte[] data, int off, int len) throws IOException;
+
+    public void setCompressSupport(CompressSupport cs) {
+        compressSupport = cs;
+    }
+
+    public SpdyFrame getFrame(int type) {
+        SpdyFrame frame = getSpdyContext().getFrame();
+        frame.c = true;
+        frame.type = type;
+        return frame;
+    }
+
+    public SpdyFrame getDataFrame() throws IOException {
+        SpdyFrame frame = getSpdyContext().getFrame();
+        return frame;
+    }
+
+    /*
+     * Output requirements: - common case: sendFrame called from a thread ( like
+     * servlets ), wants to be blocked anyways
+     * 
+     * - but also need to support 'non-blocking' mode ( ping )
+     * 
+     * - we need to queue frames when write would block, so we can prioritize.
+     * 
+     * - for fully non-blocking write: there will be a drain callback.
+     */
+
+    /**
+     * Handles the out queue for blocking sockets.
+     */
+    SpdyFrame out;
+
+    boolean draining = false;
+
+    /**
+     * Non blocking if the socket is not blocking.
+     */
+    private boolean drain() {
+        while (true) {
+            framerLock.lock();
+
+            try {
+                if (out == null) {
+                    out = prioriyQueue.poll();
+                    if (out == null) {
+                        out = outQueue.poll();
+                    }
+                    if (out == null) {
+                        return false;
+                    }
+                    SpdyFrame oframe = out;
+                    try {
+                        if (oframe.type == TYPE_SYN_STREAM) {
+                            oframe.fixNV(18);
+                            if (compressSupport != null) {
+                                compressSupport.compress(oframe, 18);
+                            }
+                        } else if (oframe.type == TYPE_SYN_REPLY
+                                || oframe.type == TYPE_HEADERS) {
+                            oframe.fixNV(14);
+                            if (compressSupport != null) {
+                                compressSupport.compress(oframe, 14);
+                            }
+                        }
+                    } catch (IOException ex) {
+                        abort("Compress error");
+                        return false;
+                    }
+                    if (oframe.type == TYPE_SYN_STREAM) {
+                        oframe.streamId = outStreamId;
+                        outStreamId += 2;
+                        channels.put(oframe.streamId, oframe.stream);
+                    }
+
+                    oframe.serializeHead();
+
+                }
+                if (out.endData == out.off) {
+                    out = null;
+                    continue;
+                }
+            } finally {
+                framerLock.unlock();
+            }
+
+            if (getSpdyContext().debug) {
+                trace("> " + out);
+            }
+
+            try {
+                int toWrite = out.endData - out.off;
+                int wr = write(out.data, out.off, toWrite);
+                if (wr < 0) {
+                    return false;
+                }
+                if (wr < toWrite) {
+                    out.off += wr;
+                    return true; // non blocking connection
+                }
+                out.off += wr;
+                // Frame was sent
+                framerLock.lock();
+                try {
+                    outCondition.signalAll();
+                } finally {
+                    framerLock.unlock();
+                }
+                out = null;
+            } catch (IOException e) {
+                // connection closed - abort all streams
+                e.printStackTrace();
+                onClose();
+                return false;
+            }
+        }
+    }
+
+    /**
+     * Blocking call for sendFrame: must be called from a thread pool.
+     * 
+     * Will wait until the actual frame is sent.
+     */
+    public void sendFrameBlocking(SpdyFrame oframe, SpdyStream proc)
+            throws IOException {
+        queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
+
+        nonBlockingDrain();
+
+        while (!inClosed) {
+            framerLock.lock();
+            try {
+                if (oframe.off == oframe.endData) {
+                    return; // was sent
+                }
+                outCondition.await();
+            } catch (InterruptedException e) {
+            } finally {
+                framerLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Send as much as possible without blocking.
+     * 
+     * With a nb transport it should call drain directly.
+     */
+    public void nonBlockingDrain() {
+        // TODO: if (nonBlocking()) { drain() }
+        getSpdyContext().getExecutor().execute(nbDrain);
+    }
+
+    static int drainCnt = 0;
+
+    Runnable nbDrain = new Runnable() {
+        public void run() {
+            int i = drainCnt++;
+            long t0 = System.currentTimeMillis();
+            synchronized (nbDrain) {
+                if (draining) {
+                    return;
+                }
+                draining = true;
+            }
+
+            drain();
+            synchronized (nbDrain) {
+                draining = false;
+            }
+        }
+    };
+
+    public void sendFrameNonBlocking(SpdyFrame oframe, SpdyStream proc)
+            throws IOException {
+        queueFrame(oframe, proc, oframe.pri == 0 ? outQueue : prioriyQueue);
+        nonBlockingDrain();
+    }
+
+    private void queueFrame(SpdyFrame oframe, SpdyStream proc,
+            LinkedList<SpdyFrame> outQueue) throws IOException {
+
+        oframe.endData = oframe.off;
+        oframe.off = 0;
+        // We can't assing a stream ID until it is sent - priorities
+        // we can't compress either - it's stateful.
+        oframe.stream = proc;
+
+        framerLock.lock();
+        try {
+            outQueue.add(oframe);
+            outCondition.signalAll();
+        } finally {
+            framerLock.unlock();
+        }
+    }
+
+    public void onClose() {
+        // TODO: abort
+    }
+
+    private void trace(String s) {
+        System.err.println(s);
+    }
+
+    public SpdyFrame inFrame() {
+        return inFrame;
+    }
+
+    /**
+     * Process a SPDY connection using a blocking socket.
+     */
+    public int onBlockingSocket() {
+        try {
+            if (getSpdyContext().debug) {
+                trace("< onConnection() " + lastChannel);
+            }
+            int rc = processInput();
+
+            if (getSpdyContext().debug) {
+                trace("< onConnection() " + rc + " " + lastChannel);
+            }
+            return rc;
+        } catch (Throwable t) {
+            t.printStackTrace();
+            trace("< onData-ERROR() " + lastChannel);
+            abort("Error processing socket" + t);
+            return CLOSE;
+        }
+    }
+
+    public static final int LONG = 1;
+
+    public static final int CLOSE = -1;
+
+    private SpdyFrame nextFrame;
+
+    /**
+     * Non-blocking method, read as much as possible and return.
+     */
+    public int processInput() throws IOException {
+        while (true) {
+            if (inFrame == null) {
+                inFrame = getSpdyContext().getFrame();
+            }
+
+            if (inFrame.data == null) {
+                inFrame.data = new byte[16 * 1024];
+            }
+            // we might already have data from previous frame
+            if (inFrame.endData < 8 || // we don't have the header
+                    inFrame.endData < inFrame.endFrame) { // size != 0 - we
+                                                          // parsed the header
+
+                int rd = read(inFrame.data, inFrame.endData,
+                        inFrame.data.length - inFrame.endData);
+                if (rd < 0) {
+                    abort("Closed");
+                    return CLOSE;
+                }
+                if (rd == 0) {
+                    return LONG;
+                    // Non-blocking channel - will resume reading at off
+                }
+                inFrame.endData += rd;
+            }
+            if (inFrame.endData < 8) {
+                continue; // keep reading
+            }
+            // We got the frame head
+            if (inFrame.endFrame == 0) {
+                inFrame.parse();
+                if (inFrame.version != 2) {
+                    abort("Wrong version");
+                    return CLOSE;
+                }
+
+                // MAx_FRAME_SIZE
+                if (inFrame.endFrame < 0 || inFrame.endFrame > 32000) {
+                    abort("Framing error, size = " + inFrame.endFrame);
+                    return CLOSE;
+                }
+
+                // grow the buffer if needed. no need to copy the head, parsed
+                // ( maybe for debugging ).
+                if (inFrame.data.length < inFrame.endFrame) {
+                    inFrame.data = new byte[inFrame.endFrame];
+                }
+            }
+
+            if (inFrame.endData < inFrame.endFrame) {
+                continue; // keep reading to fill current frame
+            }
+            // else: we have at least the current frame
+            int extra = inFrame.endData - inFrame.endFrame;
+            if (extra > 0) {
+                // and a bit more - to keep things simple for now we
+                // copy them to next frame, at least we saved reads.
+                // it is possible to avoid copy - but later.
+                nextFrame = getSpdyContext().getFrame();
+                nextFrame.makeSpace(extra);
+                System.arraycopy(inFrame.data, inFrame.endFrame,
+                        nextFrame.data, 0, extra);
+                nextFrame.endData = extra;
+                inFrame.endData = inFrame.endFrame;
+            }
+
+            // decompress
+            if (inFrame.type == TYPE_SYN_STREAM) {
+                inFrame.streamId = inFrame.readInt(); // 4
+                lastChannel = inFrame.streamId;
+                inFrame.associated = inFrame.readInt(); // 8
+                inFrame.pri = inFrame.read16(); // 10 pri and unused
+                if (compressSupport != null) {
+                    compressSupport.decompress(inFrame, 18);
+                }
+                inFrame.nvCount = inFrame.read16();
+
+            } else if (inFrame.type == TYPE_SYN_REPLY
+                    || inFrame.type == TYPE_HEADERS) {
+                inFrame.streamId = inFrame.readInt(); // 4
+                inFrame.read16();
+                if (compressSupport != null) {
+                    compressSupport.decompress(inFrame, 14);
+                }
+                inFrame.nvCount = inFrame.read16();
+            }
+
+            if (getSpdyContext().debug) {
+                trace("< " + inFrame);
+            }
+
+            try {
+                int state = handleFrame();
+                if (state == CLOSE) {
+                    return state;
+                }
+            } catch (Throwable t) {
+                abort("Error handling frame");
+                t.printStackTrace();
+                return CLOSE;
+            }
+
+            if (inFrame != null) {
+                inFrame.recyle();
+                if (nextFrame != null) {
+                    getSpdyContext().releaseFrame(inFrame);
+                    inFrame = nextFrame;
+                    nextFrame = null;
+                }
+            } else {
+                inFrame = nextFrame;
+                nextFrame = null;
+                if (inFrame == null) {
+                    inFrame = getSpdyContext().getFrame();
+                }
+            }
+        }
+    }
+
+    // Framing error or shutdown- close all streams.
+    public void abort(String msg) {
+        System.err.println(msg);
+        inClosed = true;
+        // TODO: close all streams
+
+    }
+
+    /**
+     * Process a SPDY connection. Called in a separate thread.
+     * 
+     * @return
+     * @throws IOException
+     */
+    public int handleFrame() throws IOException {
+        if (inFrame.c) {
+            switch (inFrame.type) {
+            case TYPE_SETTINGS: {
+                int cnt = inFrame.readInt();
+                for (int i = 0; i < cnt; i++) {
+                    int flag = inFrame.readByte();
+                    int id = inFrame.read24();
+                    int value = inFrame.readInt();
+                }
+                break;
+                // receivedHello = currentInFrame;
+            }
+            case TYPE_GOAWAY: {
+                int lastStream = inFrame.readInt();
+                log.info("GOAWAY last=" + lastStream);
+                abort("GOAWAY");
+                return CLOSE;
+            }
+            case TYPE_RST_STREAM: {
+                inFrame.streamId = inFrame.read32();
+                int errCode = inFrame.read32();
+                trace("> RST "
+                        + inFrame.streamId
+                        + " "
+                        + ((errCode < RST_ERRORS.length) ? RST_ERRORS[errCode]
+                                : errCode));
+                SpdyStream sch = channels.get(inFrame.streamId);
+                if (sch == null) {
+                    abort("Missing channel " + inFrame.streamId);
+                    return CLOSE;
+                }
+                sch.onCtlFrame(inFrame);
+                inFrame = null;
+                break;
+            }
+            case TYPE_SYN_STREAM: {
+
+                SpdyStream ch = getSpdyContext().getStream(this);
+
+                synchronized (channels) {
+                    channels.put(inFrame.streamId, ch);
+                }
+
+                try {
+                    ch.onCtlFrame(inFrame);
+                    inFrame = null;
+                } catch (Throwable t) {
+                    log.log(Level.SEVERE, "Error parsing head SYN_STREAM", t);
+                    abort("Error reading headers " + t);
+                    return CLOSE;
+                }
+                spdyContext.onSynStream(this, ch);
+                break;
+            }
+            case TYPE_SYN_REPLY: {
+                SpdyStream sch = channels.get(inFrame.streamId);
+                if (sch == null) {
+                    abort("Missing channel");
+                    return CLOSE;
+                }
+                try {
+                    sch.onCtlFrame(inFrame);
+                    inFrame = null;
+                } catch (Throwable t) {
+                    log.info("Error parsing head SYN_STREAM" + t);
+                    abort("Error reading headers " + t);
+                    return CLOSE;
+                }
+                break;
+            }
+            case TYPE_PING: {
+
+                SpdyFrame oframe = getSpdyContext().getFrame();
+                oframe.type = TYPE_PING;
+                oframe.c = true;
+
+                oframe.append32(inFrame.read32());
+                oframe.pri = 0x80;
+
+                sendFrameNonBlocking(oframe, null);
+                break;
+            }
+            }
+        } else {
+            // Data frame
+            SpdyStream sch = channels.get(inFrame.streamId);
+            if (sch == null) {
+                abort("Missing channel");
+                return CLOSE;
+            }
+            sch.onDataFrame(inFrame);
+            inFrame = null;
+        }
+        return LONG;
+    }
+
+    public SpdyContext getSpdyContext() {
+        return spdyContext;
+    }
+
+    public void setSpdyContext(SpdyContext spdyContext) {
+        this.spdyContext = spdyContext;
+    }
+    
+    public SpdyStream get(String host, String url) throws IOException {
+        SpdyStream sch = new SpdyStream(this);
+        sch.addHeader("host", host);
+        sch.addHeader("url", url);
+
+        sch.send();
+
+        return sch;
+    }
+
+    /**
+     * Abstract compression support. When using spdy on intranet ( between load
+     * balancer and tomcat) there is no need for the compression overhead. There
+     * are also multiple possible implementations.
+     */
+    public static interface CompressSupport {
+        public void compress(SpdyFrame frame, int start) throws IOException;
+
+        public void decompress(SpdyFrame frame, int start) throws IOException;
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContext.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Will implement polling/reuse of heavy objects, allow additional
+ * configuration.
+ * 
+ * The abstract methods allow integration with different libraries (
+ * compression, request handling )
+ * 
+ * In 'external' mode it must be used with APR library and compression.
+ * 
+ * In 'intranet' mode - it is supposed to be used behind a load balancer that
+ * handles SSL and compression. Test with: --user-data-dir=/tmp/test
+ * --use-spdy=no-compress,no-ssl
+ */
+public class SpdyContext {
+
+    public static final byte[] SPDY_NPN;
+
+    public static final byte[] SPDY_NPN_OUT;
+    static {
+        SPDY_NPN = "spdy/2".getBytes();
+        SPDY_NPN_OUT = new byte[SPDY_NPN.length + 2];
+        System.arraycopy(SPDY_NPN, 0, SPDY_NPN_OUT, 1, SPDY_NPN.length);
+        SPDY_NPN_OUT[0] = (byte) SPDY_NPN.length;
+    }
+
+    private Executor executor;
+
+    int defaultFrameSize = 8196;
+
+    public static boolean debug = true;
+
+    /**
+     * Get a frame - frames are heavy buffers, may be reused.
+     */
+    public SpdyFrame getFrame() {
+        return new SpdyFrame(defaultFrameSize);
+    }
+
+    public void releaseFrame(SpdyFrame done) {
+    }
+
+    /** 
+     * Override for server side to return a custom stream.
+     */
+    public SpdyStream getStream(SpdyConnection framer) {
+        SpdyStream spdyStream = new SpdyStream(framer);
+        return spdyStream;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+    
+    /**
+     * SPDY is a multiplexed protocol - the SpdyProcessors will be executed on
+     * this executor.
+     * 
+     * If the context returns null - we'll assume the SpdyProcessors are fully
+     * non blocking, and will execute them in the spdy thread.
+     */
+    public Executor getExecutor() {
+        if (executor == null) {
+            executor = Executors.newCachedThreadPool();
+        }
+        return executor;
+    }
+
+    /** 
+     * Override for servers.
+     * @throws IOException 
+     */
+    protected void onSynStream(SpdyConnection spdyCon, SpdyStream ch) throws IOException {
+    }
+
+    /**
+     * Client mode: return a connection for host/port. 
+     * @throws IOException 
+     */
+    public SpdyConnection getConnection(String host, int port) throws IOException {
+        return null;
+    }
+
+    public void releaseConnection(SpdyConnection con) {
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyContextProxy.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,81 @@
+/*
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+
+/**
+ * Spdy context for 'proxy' or test mode spdy - no NPN, no SSL, no compression.
+ * 
+ * This can be supported without JNI dependencies. 
+ * It can be modified to support SSL and compression - but so far the only way
+ * to use NPN is via JNI.
+ */
+public class SpdyContextProxy extends SpdyContext {
+
+
+    @Override
+    public SpdyConnection getConnection(String host, int port) throws IOException {
+        try {
+            Socket sock = new Socket(host, port);
+
+            sock.getInputStream();
+            SpdyConnectionSocket con = new SpdyConnectionSocket(this, sock);
+
+            getExecutor().execute(con.inputThread);
+            return con;
+        } catch (IOException ex) {
+            ex.printStackTrace();
+            throw ex;
+        }
+
+    }
+    
+    public SpdyConnection getConnection(Socket socket) {
+        return new SpdyConnectionSocket(this, socket);
+    }    
+
+    public static class SpdyConnectionSocket extends SpdyConnection {
+        Socket socket;
+
+
+        Runnable inputThread = new Runnable() {
+            @Override
+            public void run() {
+                onBlockingSocket();
+                try {
+                    inClosed = true;
+                    socket.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        public SpdyConnectionSocket(SpdyContext spdyContext) {
+            super(spdyContext);
+        }
+        
+        public SpdyConnectionSocket(SpdyContext spdyContext, Socket socket) {
+            super(spdyContext);
+            this.socket = socket;
+        }
+        
+        @Override
+        public synchronized int write(byte[] data, int off, int len) throws IOException {
+            socket.getOutputStream().write(data, off, len);
+            return len;
+        }
+
+        @Override
+        public int read(byte[] data, int off, int len) throws IOException {
+            try {
+                return socket.getInputStream().read(data, off, len);
+            } catch (SocketTimeoutException ex) {
+                return 0;
+            }
+        }
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyFrame.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,304 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.spdy;
+
+public class SpdyFrame {
+    // This is a bit more complicated, to avoid multiple reads/writes.
+    // We'll read as much as possible - possible past frame end. This may
+    // cost an extra copy - or even more complexity for dealing with slices
+    // if we want to save the copy.
+    public byte[] data;
+
+    public int off = 8; // used when reading - current offset
+
+    public int endFrame; // end of frame == size + 8
+
+    // On write it is incremented.
+
+    public int endData; // end of data in the buffer (may be past frame end)
+
+    // Processed data from the frame
+    boolean c; // for control
+
+    int version;
+
+    private int flags;
+
+    public int type;
+
+    // For control frames
+    public int streamId;
+
+    public int pri;
+
+    public int associated;
+
+    public int nvCount;
+
+    public SpdyStream stream;
+
+    public SpdyFrame(int size) {
+        data = new byte[size];
+    }
+
+    public int getDataSize() {
+        return endData - 8;
+    }
+
+    public void recyle() {
+        type = 0;
+        c = false;
+        endFrame = 0;
+        off = 8;
+        streamId = 0;
+        nvCount = 0;
+        endData = 0;
+    }
+
+    public String toString() {
+        if (c) {
+            if (type == 6) {
+                return "C PING " + read32(data, 0);
+            }
+            return "C" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
+                    + (version != 2 ? "  v" + version : "") + " t=" + type
+                    + " L=" + endFrame + "/" + off;
+        } else {
+            return "D" + " S=" + streamId + (flags != 0 ? " F=" + flags : "")
+                    + " L=" + endFrame + "/" + off;
+        }
+    }
+
+    public int serializeHead() {
+        if (c) {
+            data[0] = (byte) 0x80;
+            data[1] = 2;
+            data[2] = 0;
+            data[3] = (byte) type;
+            data[4] = (byte) flags;
+            append24(data, 5, endData - 8);
+            if (type == SpdyConnection.TYPE_SYN_STREAM) {
+                // nvcount is added before
+                append32(data, 8, streamId);
+                append32(data, 12, associated);
+                data[16] = 0; // TODO: priority
+                data[17] = 0;
+                return 18;
+            } else if (type == SpdyConnection.TYPE_SYN_REPLY) {
+                append32(data, 8, streamId);
+                data[12] = 0;
+                data[13] = 0;
+                return 14;
+            } else if (type == SpdyConnection.TYPE_HEADERS) {
+                append32(data, 8, streamId);
+                data[12] = 0;
+                data[13] = 0;
+                return 14;
+            }
+        } else {
+            append32(data, 0, streamId);
+            data[4] = (byte) flags;
+            append24(data, 5, endData - 8);
+        }
+        return 8;
+    }
+
+    public boolean parse() {
+        endFrame = 0;
+        streamId = 0;
+        nvCount = 0;
+
+        int b0 = data[0] & 0xFF;
+        if (b0 < 128) {
+            // data frame
+            c = false;
+            streamId = read32(data, 0);
+            version = 2;
+        } else {
+            c = true;
+            b0 -= 128;
+            version = ((b0 << 8) | data[1] & 0xFF);
+            if (version > 2) {
+                return false;
+            }
+            b0 = data[2] & 0xFF;
+            type = ((b0 << 8) | (data[3] & 0xFF));
+        }
+
+        flags = data[4] & 0xFF;
+        for (int i = 5; i < 8; i++) {
+            b0 = data[i] & 0xFF;
+            endFrame = endFrame << 8 | b0;
+        }
+
+        // size will represent the end of the data ( header is held in same
+        // buffer)
+        endFrame += 8;
+
+        return true;
+    }
+
+    public boolean isHalfClose() {
+        return (flags & SpdyConnection.FLAG_HALF_CLOSE) != 0;
+    }
+
+    public void halfClose() {
+        flags = SpdyConnection.FLAG_HALF_CLOSE;
+    }
+
+    public boolean closed() {
+        return (flags & SpdyConnection.FLAG_HALF_CLOSE) != 0;
+    }
+
+    static void append24(byte[] buff, int off, int v) {
+        buff[off++] = (byte) ((v & 0xFF0000) >> 16);
+        buff[off++] = (byte) ((v & 0xFF00) >> 8);
+        buff[off++] = (byte) ((v & 0xFF));
+    }
+
+    static void append32(byte[] buff, int off, int v) {
+        buff[off++] = (byte) ((v & 0xFF000000) >> 24);
+        buff[off++] = (byte) ((v & 0xFF0000) >> 16);
+        buff[off++] = (byte) ((v & 0xFF00) >> 8);
+        buff[off++] = (byte) ((v & 0xFF));
+    }
+
+    public void append32(int v) {
+        makeSpace(4);
+        data[off++] = (byte) ((v & 0xFF000000) >> 24);
+        data[off++] = (byte) ((v & 0xFF0000) >> 16);
+        data[off++] = (byte) ((v & 0xFF00) >> 8);
+        data[off++] = (byte) ((v & 0xFF));
+    }
+
+    public void append16(int v) {
+        makeSpace(2);
+        data[off++] = (byte) ((v & 0xFF00) >> 8);
+        data[off++] = (byte) ((v & 0xFF));
+    }
+
+    void fixNV(int nvPos) {
+        data[nvPos++] = (byte) ((nvCount & 0xFF00) >> 8);
+        data[nvPos] = (byte) ((nvCount & 0xFF));
+    }
+
+    public void append(byte[] buf, int soff, int len) {
+        makeSpace(len + off);
+        System.arraycopy(buf, soff, data, off, len);
+        off += len;
+    }
+
+    public void headerValue(byte[] buf, int soff, int len) {
+        makeSpace(len + 4);
+        append16(len);
+        System.arraycopy(buf, soff, data, off, len);
+        off += len;
+    }
+
+    public void headerName(byte[] buf, int soff, int len) {
+        // if it's the first header, leave space for extra params and NV count.
+        // they'll be filled in by send.
+        if (off == 8) {
+            if (type == SpdyConnection.TYPE_SYN_REPLY) {
+                off = 16;
+            } else if (type == SpdyConnection.TYPE_SYN_STREAM) {
+                off = 20;
+            } else if (type != SpdyConnection.TYPE_HEADERS) {
+                off = 16;
+            } else {
+                throw new RuntimeException("Wrong frame type");
+            }
+        }
+        nvCount++;
+        headerValue(buf, soff, len);
+    }
+
+    // TODO: instead of that, use byte[][]
+    void makeSpace(int len) {
+        if (len < 256) {
+            len = 256;
+        }
+        if (data == null) {
+            data = new byte[len];
+            return;
+        }
+        int newEnd = off + len;
+
+        if (data.length < newEnd) {
+            byte[] tmp = new byte[newEnd];
+            System.err.println("cp " + off + " " + data.length + " " + len
+                    + " " + tmp.length);
+            System.arraycopy(data, 0, tmp, 0, off);
+            data = tmp;
+        }
+
+    }
+
+    public int read16() {
+        int res = data[off++] & 0xFF;
+        return res << 8 | (data[off++] & 0xFF);
+    }
+
+    int readInt() {
+        int res = 0;
+        for (int i = 0; i < 4; i++) {
+            int b0 = data[off++] & 0xFF;
+            res = res << 8 | b0;
+        }
+        return res;
+    }
+
+    int read24() {
+        int res = 0;
+        for (int i = 0; i < 3; i++) {
+            int b0 = data[off++] & 0xFF;
+            res = res << 8 | b0;
+        }
+        return res;
+    }
+
+    int read32(byte[] data, int off) {
+        int res = 0;
+        for (int i = 0; i < 4; i++) {
+            int b0 = data[off++] & 0xFF;
+            res = res << 8 | b0;
+        }
+        return res;
+    }
+
+    int read32() {
+        int res = 0;
+        for (int i = 0; i < 4; i++) {
+            int b0 = data[off++] & 0xFF;
+            res = res << 8 | b0;
+        }
+        return res;
+    }
+
+    public int readByte() {
+        return data[off++] & 0xFF;
+    }
+
+    public int remaining() {
+        return endFrame - off;
+    }
+
+    public void advance(int cnt) {
+        off += cnt;
+    }
+
+}
\ No newline at end of file

Added: tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java?rev=1292133&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/spdy/SpdyStream.java Wed Feb 22 05:47:22 2012
@@ -0,0 +1,196 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.spdy;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * One SPDY stream.
+ * 
+ * Created by SpdyContext.getProcessor(framer).
+ * 
+ * The methods are called in a IO thread when the framer received a frame for
+ * this stream.
+ * 
+ * They should not block.
+ * 
+ * The frame must be either consumed or popInFrame must be called, after the
+ * call is done the frame will be reused.
+ */
+public class SpdyStream {
+    public static final Charset UTF8 = Charset.forName("UTF-8");
+
+    protected SpdyConnection spdy;
+
+    public SpdyFrame reqFrame;
+
+    SpdyFrame resFrame;
+
+    BlockingQueue<SpdyFrame> inData = new LinkedBlockingQueue<SpdyFrame>();
+
+    public static final SpdyFrame END_FRAME = new SpdyFrame(16);
+
+    boolean finSent;
+
+    protected boolean finRcvd;
+
+    public SpdyStream(SpdyConnection spdy) {
+        this.spdy = spdy;
+    }
+    
+    /**
+     * Non-blocking, called when a data frame is received.
+     * 
+     * The processor must consume the data, or set frame.data to null or a fresh
+     * buffer ( to avoid a copy ).
+     */
+    public void onDataFrame(SpdyFrame inFrame) {
+        inData.add(inFrame);
+        if (inFrame.closed()) {
+            finRcvd = true;
+            inData.add(END_FRAME);
+        }
+    }
+
+    /**
+     * Non-blocking - handles a syn stream package. The processor must consume
+     * frame.data or set it to null.
+     * 
+     * The base method is for client implementation - servers need to override
+     * and process the frame as a request. 
+     */
+    public void onCtlFrame(SpdyFrame frame) throws IOException {
+        // TODO: handle RST
+        if (frame.type == SpdyConnection.TYPE_SYN_STREAM) {
+            reqFrame = frame;
+        } else if (frame.type == SpdyConnection.TYPE_SYN_REPLY) {
+            resFrame = frame;
+        }
+        if (frame.isHalfClose()) {
+            finRcvd = true;
+        }
+    }
+
+    /**
+     * True if the channel both received and sent FIN frames.
+     * 
+     * This is tracked by the processor, to avoid extra storage in framer.
+     */
+    public boolean isFinished() {
+        return finSent && finRcvd;
+    }
+
+    public SpdyFrame getIn(long to) throws IOException {
+        SpdyFrame in;
+        try {
+            if (inData.size() == 0 && finRcvd) {
+                return null;
+            }
+            in = inData.poll(to, TimeUnit.MILLISECONDS);
+
+            if (in == END_FRAME) {
+                return null;
+            }
+            return in;
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    public void getHeaders(Map<String, String> resHeaders) {
+        SpdyFrame f = resFrame;
+        int nvCount = f.nvCount;
+        for (int i = 0; i < nvCount; i++) {
+            int len = f.read16();
+            String n = new String(f.data, f.off, len, UTF8);
+            f.advance(len);
+            len = f.read16();
+            String v = new String(f.data, f.off, len, UTF8);
+            f.advance(len);
+            resHeaders.put(n, v);
+        }
+    }
+    
+    public SpdyFrame getRequest() {
+        if (reqFrame == null) {
+            reqFrame = spdy.getFrame(SpdyConnection.TYPE_SYN_STREAM);
+        }
+        return reqFrame;
+    }
+
+    public void addHeader(String name, String value) {
+        byte[] nameB = name.getBytes();
+        getRequest().headerName(nameB, 0, nameB.length);
+        nameB = value.getBytes();
+        reqFrame.headerValue(nameB, 0, nameB.length);
+    }
+    
+    
+    public synchronized void sendDataFrame(byte[] data, int start,
+            int length, boolean close) throws IOException {
+
+        SpdyFrame oframe = spdy.getDataFrame();
+
+        // Options:
+        // 1. wrap the byte[] data, use a separate header[], wait frame sent
+        // -> 2 socket writes
+        // 2. copy the data to frame byte[] -> non-blocking queue
+        // 3. copy the data, blocking drain -> like 1, trade one copy to
+        // avoid
+        // 1 tcp packet. That's the current choice, seems closer to rest of
+        // tomcat
+
+        oframe.streamId = reqFrame.streamId;
+        if (close)
+            oframe.halfClose();
+
+        oframe.append(data, start, length);
+        spdy.sendFrameBlocking(oframe, this);
+    }
+
+    public void send() throws IOException {
+        send("http", "GET");
+    }
+
+    public void send(String host, String url, String scheme, String method) throws IOException {
+        addHeader("host", host);
+        addHeader("url", url);
+
+        send(scheme, method);
+    }
+    
+    public void send(String scheme, String method) throws IOException {
+        getRequest();
+        if ("GET".equalsIgnoreCase(method)) {
+            // TODO: add the others
+            reqFrame.halfClose();
+        }
+        addHeader("scheme", "http"); // todo
+        addHeader("method", method);
+        addHeader("version", "HTTP/1.1");
+        if (reqFrame.isHalfClose()) {
+            finSent = true;
+        }
+        spdy.sendFrameBlocking(reqFrame, this);
+    }
+    
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message