Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 42323 invoked from network); 9 Aug 2006 17:13:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 9 Aug 2006 17:13:10 -0000 Received: (qmail 1337 invoked by uid 500); 9 Aug 2006 17:13:06 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 1298 invoked by uid 500); 9 Aug 2006 17:13:05 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 1287 invoked by uid 500); 9 Aug 2006 17:13:05 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 1284 invoked by uid 99); 9 Aug 2006 17:13:05 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Aug 2006 10:13:05 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Aug 2006 10:13:03 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id B2C661A981A; Wed, 9 Aug 2006 10:12:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r430097 - in /tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net: NioChannel.java NioEndpoint.java SecureNioChannel.java Date: Wed, 09 Aug 2006 17:12:42 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060809171243.B2C661A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: fhanik Date: Wed Aug 9 10:12:37 2006 New Revision: 430097 URL: http://svn.apache.org/viewvc?rev=430097&view=rev Log: Added in a cache for byte buffers Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430097&r1=430096&r2=430097&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed Aug 9 10:12:37 2006 @@ -46,6 +46,11 @@ this.sc = channel; this.bufHandler = bufHandler; } + + public void reset() throws IOException { + bufHandler.getReadBuffer().clear(); + bufHandler.getWriteBuffer().clear(); + } /** * returns true if the network buffer has @@ -119,7 +124,6 @@ public Poller getPoller() { return poller; } - /** * getIOChannel * @@ -156,6 +160,14 @@ public void setPoller(Poller poller) { this.poller = poller; + } + + public void setIOChannel(SocketChannel IOChannel) { + this.sc = IOChannel; + } + + public String toString() { + return super.toString()+":"+this.sc.toString(); } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430097&r1=430096&r2=430097&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Aug 9 10:12:37 2006 @@ -45,6 +45,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; +import java.net.Socket; /** * NIO tailored thread pool, providing the following services: @@ -149,11 +150,13 @@ protected ServerSocketChannel serverSock = null; - /** - * APR memory pool for the server socket. - */ - protected long serverSockPool = 0; - + protected ConcurrentLinkedQueue nioChannels = new ConcurrentLinkedQueue() { + public boolean offer(NioChannel o) { + //avoid over growing our cache or add after we have stopped + if ( running && (size() < curThreads) ) return super.offer(o); + else return false; + } + }; @@ -581,6 +584,7 @@ } pollers = null; } + nioChannels.clear(); } @@ -597,6 +601,7 @@ serverSock = null; sslContext = null; initialized = false; + nioChannels.clear(); } @@ -658,33 +663,36 @@ try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); - + Socket sock = socket.socket(); // 1: Set socket options: timeout, linger, etc if (soLinger >= 0) - socket.socket().setSoLinger(true,soLinger); + sock.setSoLinger(true,soLinger); if (tcpNoDelay) - socket.socket().setTcpNoDelay(true); + sock.setTcpNoDelay(true); if (soTimeout > 0) - socket.socket().setSoTimeout(soTimeout); + sock.setSoTimeout(soTimeout); - NioChannel channel = null; - // 2: SSL setup - step = 2; - if (sslContext != null) { - SSLEngine engine = sslContext.createSSLEngine(); - engine.setNeedClientAuth(getClientAuth()); - engine.setUseClientMode(false); - int appbufsize = engine.getSession().getApplicationBufferSize(); - int bufsize = Math.max(Math.max(getReadBufSize(),getWriteBufSize()),appbufsize); - NioBufferHandler bufhandler = new NioBufferHandler(bufsize,bufsize); - channel = new SecureNioChannel(socket,engine,bufhandler); - + NioChannel channel = nioChannels.poll(); + if ( channel == null ) { + // 2: SSL setup + step = 2; + + if (sslContext != null) { + SSLEngine engine = sslContext.createSSLEngine(); + engine.setNeedClientAuth(getClientAuth()); + engine.setUseClientMode(false); + int appbufsize = engine.getSession().getApplicationBufferSize(); + int bufsize = Math.max(Math.max(getReadBufSize(), getWriteBufSize()), appbufsize); + NioBufferHandler bufhandler = new NioBufferHandler(bufsize, bufsize); + channel = new SecureNioChannel(socket, engine, bufhandler); + } else { + NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(), getWriteBufSize()); + channel = new NioChannel(socket, bufhandler); + } } else { - NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(),getWriteBufSize()); - channel = new NioChannel(socket,bufhandler); + channel.setIOChannel(socket); + channel.reset(); } - - getPoller0().register(channel); } catch (Throwable t) { @@ -779,6 +787,21 @@ } + protected boolean processSocket(SocketChannel socket) { + try { + if (executor == null) { + getWorkerThread().assign(socket); + } else { + executor.execute(new SocketOptionsProcessor(socket)); + } + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } /** * Process given socket. */ @@ -849,13 +872,14 @@ try { // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); + processSocket(socket); // Hand this socket off to an appropriate processor - if(!setSocketOptions(socket)) - { - // Close socket right away - socket.socket().close(); - socket.close(); - } +// if(!setSocketOptions(socket)) +// { +// // Close socket right away +// socket.socket().close(); +// socket.close(); +// } } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } @@ -1187,7 +1211,7 @@ protected Thread thread = null; protected boolean available = false; - protected NioChannel socket = null; + protected Object socket = null; protected boolean event = false; protected boolean error = false; @@ -1201,7 +1225,7 @@ * * @param socket TCP socket to process */ - protected synchronized void assign(NioChannel socket) { + protected synchronized void assign(Object socket) { // Wait for the Processor to get the previous Socket while (available) { @@ -1210,7 +1234,6 @@ } catch (InterruptedException e) { } } - // Store the newly available Socket and notify our thread this.socket = socket; event = false; @@ -1221,7 +1244,7 @@ } - protected synchronized void assign(NioChannel socket, boolean error) { + protected synchronized void assign(Object socket, boolean error) { // Wait for the Processor to get the previous Socket while (available) { @@ -1244,7 +1267,7 @@ * Await a newly assigned Socket from our Connector, or null * if we are supposed to shut down. */ - protected synchronized NioChannel await() { + protected synchronized Object await() { // Wait for the Connector to provide a new Socket while (!available) { @@ -1255,7 +1278,7 @@ } // Notify the Connector that we have received this Socket - NioChannel socket = this.socket; + Object socket = this.socket; available = false; notifyAll(); @@ -1272,72 +1295,99 @@ // Process requests until we receive a shutdown signal while (running) { - // Wait for the next socket to be assigned - NioChannel socket = await(); - if (socket == null) - continue; - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - int handshake = -1; try { - handshake = socket.handshake(key.isReadable(), key.isWritable()); - }catch ( IOException x ) { - handshake = -1; - log.error("Error during SSL handshake",x); - }catch ( CancelledKeyException ckx ) { - handshake = -1; - } - if ( handshake == 0 ) { - // Process the request from this socket - if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { - // Close socket and pool - try { - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - }catch ( Exception x ) { - log.error("",x); + // Wait for the next socket to be assigned + Object channel = await(); + if (channel == null) + continue; + + if ( channel instanceof SocketChannel) { + SocketChannel sc = (SocketChannel)channel; + if ( !setSocketOptions(sc) ) { + try { + sc.socket().close(); + sc.close(); + }catch ( IOException ix ) { + if ( log.isDebugEnabled() ) log.debug("",ix); + } + } else { + //now we have it registered, remove it from the cache + } - } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { - // Close socket and pool + } else { + + NioChannel socket = (NioChannel)channel; + + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + int handshake = -1; try { - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - }catch ( Exception x ) { - log.error("",x); + handshake = socket.handshake(key.isReadable(), key.isWritable()); + }catch ( IOException x ) { + handshake = -1; + log.error("Error during SSL handshake",x); + }catch ( CancelledKeyException ckx ) { + handshake = -1; } - } - } else if (handshake == -1 ) { - if ( key.isValid() ) key.cancel(); - try {socket.close(true);}catch (IOException ignore){} - } else { - final SelectionKey fk = key; - final int intops = handshake; - final KeyAttachment ka = (KeyAttachment)fk.attachment(); - //register for handshake ops - Runnable r = new Runnable() { - public void run() { - try { - fk.interestOps(intops); - ka.interestOps(intops); - } catch (CancelledKeyException ckx) { + if ( handshake == 0 ) { + // Process the request from this socket + if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { + // Close socket and pool try { - if ( fk != null && fk.attachment() != null ) { - - ka.setError(true); //set to collect this socket immediately - try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){} - try {ka.getChannel().close();}catch(Exception ignore){} - ka.setWakeUp(false); - } - } catch (Exception ignore) {} + + try {socket.close();}catch (Exception ignore){} + if ( socket.isOpen() ) socket.close(true); + nioChannels.offer(socket); + }catch ( Exception x ) { + log.error("",x); + } + } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { + // Close socket and pool + try { + + try {socket.close();}catch (Exception ignore){} + if ( socket.isOpen() ) socket.close(true); + nioChannels.offer(socket); + }catch ( Exception x ) { + log.error("",x); + } } + } else if (handshake == -1 ) { + if ( key.isValid() ) key.cancel(); + try {socket.close(true);}catch (IOException ignore){} + nioChannels.offer(socket); + } else { + final SelectionKey fk = key; + final int intops = handshake; + final KeyAttachment ka = (KeyAttachment)fk.attachment(); + //register for handshake ops + Runnable r = new Runnable() { + public void run() { + try { + fk.interestOps(intops); + ka.interestOps(intops); + } catch (CancelledKeyException ckx) { + try { + if ( fk != null && fk.attachment() != null ) { + + ka.setError(true); //set to collect this socket immediately + try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){} + try {ka.getChannel().close();}catch(Exception ignore){} + ka.setWakeUp(false); + } + } catch (Exception ignore) {} + } + } + }; + ka.getPoller().addEvent(r); } - }; - ka.getPoller().addEvent(r); + } + } finally { + //dereference socket to let GC do its job + socket = null; + // Finish up this request + recycleWorkerThread(this); } - //dereference socket to let GC do its job - socket = null; - // Finish up this request - recycleWorkerThread(this); } } @@ -1446,6 +1496,32 @@ } + // ---------------------------------------------- SocketOptionsProcessor Inner Class + + + /** + * This class is the equivalent of the Worker, but will simply use in an + * external Executor thread pool. + */ + protected class SocketOptionsProcessor implements Runnable { + + protected SocketChannel sc = null; + + public SocketOptionsProcessor(SocketChannel socket) { + this.sc = socket; + } + + public void run() { + if ( !setSocketOptions(sc) ) { + try { + sc.socket().close(); + sc.close(); + }catch ( IOException ix ) { + if ( log.isDebugEnabled() ) log.debug("",ix); + } + } + } + } // ---------------------------------------------- SocketProcessor Inner Class Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=430097&r1=430096&r2=430097&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Wed Aug 9 10:12:37 2006 @@ -31,28 +31,31 @@ public SecureNioChannel(SocketChannel channel, SSLEngine engine, ApplicationBufferHandler bufHandler) throws IOException { super(channel,bufHandler); - this.sslEngine = engine; - - int appBufSize = sslEngine.getSession().getApplicationBufferSize(); int netBufSize = sslEngine.getSession().getPacketBufferSize(); - + //allocate network buffers - TODO, add in optional direct non-direct buffers + if ( netInBuffer == null ) netInBuffer = ByteBuffer.allocateDirect(netBufSize); + if ( netOutBuffer == null ) netOutBuffer = ByteBuffer.allocateDirect(netBufSize); + //ensure that the application has a large enough read/write buffers //by doing this, we should not encounter any buffer overflow errors bufHandler.expand(bufHandler.getReadBuffer(), appBufSize); bufHandler.expand(bufHandler.getWriteBuffer(), appBufSize); - //allocate network buffers - TODO, add in optional direct buffers - this.netInBuffer = ByteBuffer.allocate(netBufSize); - this.netOutBuffer = ByteBuffer.allocate(netBufSize); - this.netOutBuffer.position(0); - this.netOutBuffer.limit(0); - this.netInBuffer.position(0); - this.netInBuffer.limit(0); + reset(); + } + + public void reset() throws IOException { + super.reset(); + netOutBuffer.position(0); + netOutBuffer.limit(0); + netInBuffer.position(0); + netInBuffer.limit(0); //initiate handshake sslEngine.beginHandshake(); initHandshakeStatus = sslEngine.getHandshakeStatus(); + } //=========================================================================================== --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org