Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 84928 invoked from network); 22 Jun 2006 19:09:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 22 Jun 2006 19:09:42 -0000 Received: (qmail 64279 invoked by uid 500); 22 Jun 2006 19:07:33 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 63010 invoked by uid 500); 22 Jun 2006 19:07:25 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 60668 invoked by uid 500); 22 Jun 2006 19:07:12 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 56783 invoked by uid 99); 22 Jun 2006 19:05:27 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2006 12:05:27 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2006 12:05:24 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 897421A9842; Thu, 22 Jun 2006 12:05:04 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r416444 [2/2] - in /tomcat/tc6.0.x/trunk/java/org/apache: coyote/http11/Http11NioProcessor.java coyote/http11/InternalNioInputBuffer.java tomcat/util/net/NioEndpoint.java Date: Thu, 22 Jun 2006 19:05:03 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060622190504.897421A9842@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=416444&r1=416443&r2=416444&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jun 22 12:05:02 2006 @@ -1,21 +1,21 @@ -/* - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - +/* + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util.net; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -38,1808 +38,1821 @@ import org.apache.tomcat.jni.Poll; import org.apache.tomcat.jni.SSL; import org.apache.tomcat.jni.Status; -import org.apache.tomcat.util.res.StringManager; - -/** - * NIO tailored thread pool, providing the following services: - *
    - *
  • Socket acceptor thread
  • - *
  • Socket poller thread
  • - *
  • Sendfile thread
  • - *
  • Worker threads pool
  • - *
- * - * When switching to Java 5, there's an opportunity to use the virtual - * machine's thread pool. - * - * @author Mladen Turk - * @author Remy Maucherat - * @author Filip Hanik - */ -public class NioEndpoint { - - - // -------------------------------------------------------------- Constants - - - protected static Log log = LogFactory.getLog(NioEndpoint.class); - - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - - - // ----------------------------------------------------------------- Fields - - - /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** - * Running state of the endpoint. - */ - protected volatile boolean running = false; - - - /** - * Will be set to true whenever the endpoint is paused. - */ - protected volatile boolean paused = false; - - - /** - * Track the initialization state of the endpoint. - */ - protected boolean initialized = false; - - - /** - * Current worker threads busy count. - */ - protected int curThreadsBusy = 0; - - - /** - * Current worker threads count. - */ - protected int curThreads = 0; - - - /** - * Sequence number used to generate thread names. - */ - protected int sequence = 0; - - - /** - * Root APR memory pool. - */ - protected long rootPool = 0; - - - /** - * Server socket "pointer". - */ - protected ServerSocketChannel serverSock = null; - - - /** - * APR memory pool for the server socket. - */ - protected long serverSockPool = 0; - - - /** - * SSL context. - */ - protected long sslContext = 0; - - - // ------------------------------------------------------------- Properties - - - /** - * External Executor based thread pool. - */ - protected Executor executor = null; - public void setExecutor(Executor executor) { this.executor = executor; } - public Executor getExecutor() { return executor; } - - - /** - * Maximum amount of worker threads. - */ - protected int maxThreads = 40; - public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } - public int getMaxThreads() { return maxThreads; } - - - /** - * Priority of the acceptor and poller threads. - */ - protected int threadPriority = Thread.NORM_PRIORITY; - public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } - public int getThreadPriority() { return threadPriority; } - - - /** - * Size of the socket poller. - */ - protected int pollerSize = 8 * 1024; - public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } - public int getPollerSize() { return pollerSize; } - - - /** - * Size of the sendfile (= concurrent files which can be served). - */ - protected int sendfileSize = 1 * 1024; - public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; } - public int getSendfileSize() { return sendfileSize; } - - - /** - * Server socket port. - */ - protected int port; - public int getPort() { return port; } - public void setPort(int port ) { this.port=port; } - - - /** - * Address for the server socket. - */ - protected InetAddress address; - public InetAddress getAddress() { return address; } - public void setAddress(InetAddress address) { this.address = address; } - - - /** - * Handling of accepted sockets. - */ - protected Handler handler = null; - public void setHandler(Handler handler ) { this.handler = handler; } - public Handler getHandler() { return handler; } - - - /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - protected int backlog = 100; - public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } - public int getBacklog() { return backlog; } - - - /** - * Socket TCP no delay. - */ - protected boolean tcpNoDelay = false; - public boolean getTcpNoDelay() { return tcpNoDelay; } - public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } - - - /** - * Socket linger. - */ - protected int soLinger = 100; - public int getSoLinger() { return soLinger; } - public void setSoLinger(int soLinger) { this.soLinger = soLinger; } - - - /** - * Socket timeout. - */ - protected int soTimeout = -1; - public int getSoTimeout() { return soTimeout; } - public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } - - - /** - * Timeout on first request read before going to the poller, in ms. - */ - protected int firstReadTimeout = 60000; - public int getFirstReadTimeout() { return firstReadTimeout; } - public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } - - - /** - * Poll interval, in microseconds. The smaller the value, the more CPU the poller - * will use, but the more responsive to activity it will be. - */ - protected int pollTime = 2000; - public int getPollTime() { return pollTime; } - public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } - - - /** - * The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - protected boolean daemon = true; - public void setDaemon(boolean b) { daemon = b; } - public boolean getDaemon() { return daemon; } - - - /** - * Name of the thread pool, which will be used for naming child threads. - */ - protected String name = "TP"; - public void setName(String name) { this.name = name; } - public String getName() { return name; } - - - /** - * Use endfile for sending static files. - */ - protected boolean useSendfile = Library.APR_HAS_SENDFILE; - public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; } - public boolean getUseSendfile() { return useSendfile; } - - - /** - * Allow comet request handling. - */ - protected boolean useComet = true; - public void setUseComet(boolean useComet) { this.useComet = useComet; } - public boolean getUseComet() { return useComet; } - - - /** - * Acceptor thread count. - */ - protected int acceptorThreadCount = 0; - public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } - public int getAcceptorThreadCount() { return acceptorThreadCount; } - - - /** - * Sendfile thread count. - */ - protected int sendfileThreadCount = 0; - public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; } - public int getSendfileThreadCount() { return sendfileThreadCount; } - - - /** - * Poller thread count. - */ - protected int pollerThreadCount = 0; - public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } - public int getPollerThreadCount() { return pollerThreadCount; } - - protected long selectorTimeout = 5000; - public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} - public long getSelectorTimeout(){ return this.selectorTimeout; } - /** - * The socket poller. - */ - protected Poller[] pollers = null; - protected int pollerRoundRobin = 0; - public Poller getPoller() { - pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; - Poller poller = pollers[pollerRoundRobin]; - poller.comet = false; - return poller; - } - - - /** - * The socket poller used for Comet support. - */ - public Poller getCometPoller() { - Poller poller = getPoller(); - poller.comet = true; - return poller; - } - - - /** - * The static file sender. - */ - protected Sendfile[] sendfiles = null; - protected int sendfileRoundRobin = 0; - public Sendfile getSendfile() { - sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length; - return sendfiles[sendfileRoundRobin]; - } - - - /** - * Dummy maxSpareThreads property. - */ - public int getMaxSpareThreads() { return 0; } - - - /** - * Dummy minSpareThreads property. - */ - public int getMinSpareThreads() { return 0; } - - - /** - * SSL engine. - */ - protected String SSLEngine = "off"; - public String getSSLEngine() { return SSLEngine; } - public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } - - - /** - * SSL protocols. - */ - protected String SSLProtocol = "all"; - public String getSSLProtocol() { return SSLProtocol; } - public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } - - - /** - * SSL password (if a cert is encrypted, and no password has been provided, a callback - * will ask for a password). - */ - protected String SSLPassword = null; - public String getSSLPassword() { return SSLPassword; } - public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } - - - /** - * SSL cipher suite. - */ - protected String SSLCipherSuite = "ALL"; - public String getSSLCipherSuite() { return SSLCipherSuite; } - public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } - - - /** - * SSL certificate file. - */ - protected String SSLCertificateFile = null; - public String getSSLCertificateFile() { return SSLCertificateFile; } - public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } - - - /** - * SSL certificate key file. - */ - protected String SSLCertificateKeyFile = null; - public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } - public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } - - - /** - * SSL certificate chain file. - */ - protected String SSLCertificateChainFile = null; - public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } - public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } - - - /** - * SSL CA certificate path. - */ - protected String SSLCACertificatePath = null; - public String getSSLCACertificatePath() { return SSLCACertificatePath; } - public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } - - - /** - * SSL CA certificate file. - */ - protected String SSLCACertificateFile = null; - public String getSSLCACertificateFile() { return SSLCACertificateFile; } - public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } - - - /** - * SSL CA revocation path. - */ - protected String SSLCARevocationPath = null; - public String getSSLCARevocationPath() { return SSLCARevocationPath; } - public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } - - - /** - * SSL CA revocation file. - */ - protected String SSLCARevocationFile = null; - public String getSSLCARevocationFile() { return SSLCARevocationFile; } - public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } - - - /** - * SSL verify client. - */ - protected String SSLVerifyClient = "none"; - public String getSSLVerifyClient() { return SSLVerifyClient; } - public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } - - - /** - * SSL verify depth. - */ - protected int SSLVerifyDepth = 10; - public int getSSLVerifyDepth() { return SSLVerifyDepth; } - public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } - - - // --------------------------------------------------------- Public Methods - - - /** - * Number of keepalive sockets. - */ - public int getKeepAliveCount() { - if (pollers == null) { - return 0; - } else { - int keepAliveCount = 0; - for (int i = 0; i < pollers.length; i++) { - keepAliveCount += pollers[i].getKeepAliveCount(); - } - return keepAliveCount; - } - } - - - /** - * Number of sendfile sockets. - */ - public int getSendfileCount() { - if (sendfiles == null) { - return 0; - } else { - int sendfileCount = 0; - for (int i = 0; i < sendfiles.length; i++) { - sendfileCount += sendfiles[i].getSendfileCount(); - } - return sendfileCount; - } - } - - - /** - * Return the amount of threads that are managed by the pool. - * - * @return the amount of threads that are managed by the pool - */ - public int getCurrentThreadCount() { - return curThreads; - } - - - /** - * Return the amount of threads currently busy. - * - * @return the amount of threads currently busy - */ - public int getCurrentThreadsBusy() { - return curThreadsBusy; - } - - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is running, false otherwise - */ - public boolean isRunning() { - return running; - } - - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is paused, false otherwise - */ - public boolean isPaused() { - return paused; - } - - - // ----------------------------------------------- Public Lifecycle Methods - - - /** - * Initialize the endpoint. - */ - public void init() - throws Exception { - - if (initialized) - return; - - serverSock = ServerSocketChannel.open(); - InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); - serverSock.socket().bind(addr,100); //todo, set backlog value - serverSock.configureBlocking(true); //mimic APR behavior - // Sendfile usage on systems which don't support it cause major problems - if (useSendfile) { - log.warn(sm.getString("endpoint.sendfile.nosupport")); - useSendfile = false; - } - - // Initialize thread count defaults for acceptor, poller and sendfile - if (acceptorThreadCount == 0) { - // FIXME: Doesn't seem to work that well with multiple accept threads - acceptorThreadCount = 1; - } - if (pollerThreadCount != 1) { - // limit to one poller, no need for others - pollerThreadCount = 1; - } - if (sendfileThreadCount != 0) { - sendfileThreadCount = 0; - } - - // Initialize SSL if needed - if (!"off".equalsIgnoreCase(SSLEngine)) { - // Initialize SSL - // FIXME: one per VM call ? - if ("on".equalsIgnoreCase(SSLEngine)) { - SSL.initialize(null); - } else { - SSL.initialize(SSLEngine); - } - // SSL protocol - int value = SSL.SSL_PROTOCOL_ALL; - if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { - value = SSL.SSL_PROTOCOL_SSLV2; - } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { - value = SSL.SSL_PROTOCOL_SSLV3; - } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { - value = SSL.SSL_PROTOCOL_TLSV1; - } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { - value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; - } -// // Create SSL Context -// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); -// // List the ciphers that the client is permitted to negotiate -// SSLContext.setCipherSuite(sslContext, SSLCipherSuite); -// // Load Server key and certificate -// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); -// // Set certificate chain file -// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); -// // Support Client Certificates -// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); -// // Set revocation -// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); -// // Client certificate verification -// value = SSL.SSL_CVERIFY_NONE; -// if ("optional".equalsIgnoreCase(SSLVerifyClient)) { -// value = SSL.SSL_CVERIFY_OPTIONAL; -// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { -// value = SSL.SSL_CVERIFY_REQUIRE; -// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { -// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; -// } -// SSLContext.setVerify(sslContext, value, SSLVerifyDepth); - // For now, sendfile is not supported with SSL - useSendfile = false; - } - - initialized = true; - - } - - - /** - * Start the APR endpoint, creating acceptor, poller and sendfile threads. - */ - public void start() - throws Exception { - // Initialize socket if not done before - if (!initialized) { - init(); - } - if (!running) { - running = true; - paused = false; - - // Create worker collection - if (executor == null) { - workers = new WorkerStack(maxThreads); - } - - // Start acceptor threads - for (int i = 0; i < acceptorThreadCount; i++) { - Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); - acceptorThread.setPriority(threadPriority); - acceptorThread.setDaemon(daemon); - acceptorThread.start(); - } - - // Start poller threads - pollers = new Poller[pollerThreadCount]; - for (int i = 0; i < pollerThreadCount; i++) { - pollers[i] = new Poller(false); - pollers[i].init(); - Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); - } - - // Start sendfile threads - if (useSendfile) { - sendfiles = new Sendfile[sendfileThreadCount]; - for (int i = 0; i < sendfileThreadCount; i++) { - sendfiles[i] = new Sendfile(); - sendfiles[i].init(); - Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i); - sendfileThread.setPriority(threadPriority); - sendfileThread.setDaemon(true); - sendfileThread.start(); - } - } - } - } - - - /** - * Pause the endpoint, which will make it stop accepting new sockets. - */ - public void pause() { - if (running && !paused) { - paused = true; - unlockAccept(); - } - } - - - /** - * Resume the endpoint, which will make it start accepting new sockets - * again. - */ - public void resume() { - if (running) { - paused = false; - } - } - - - /** - * Stop the endpoint. This will cause all processing threads to stop. - */ - public void stop() { - if (running) { - running = false; - unlockAccept(); - for (int i = 0; i < pollers.length; i++) { - pollers[i].destroy(); - } - pollers = null; - if (useSendfile) { - for (int i = 0; i < sendfiles.length; i++) { - sendfiles[i].destroy(); - } - sendfiles = null; - } - } - } - - - /** - * Deallocate APR memory pools, and close server socket. - */ - public void destroy() throws Exception { - if (running) { - stop(); - } - // Close server socket - serverSock.socket().close(); - serverSock.close(); - serverSock = null; - sslContext = 0; - initialized = false; - } - - - // ------------------------------------------------------ Protected Methods - - - /** - * Get a sequence number used for thread naming. - */ - protected int getSequence() { - return sequence++; - } - - - /** - * Unlock the server socket accept using a bugus connection. - */ - protected void unlockAccept() { - java.net.Socket s = null; - try { - // Need to create a connection to unlock the accept(); - if (address == null) { - s = new java.net.Socket("127.0.0.1", port); - } else { - s = new java.net.Socket(address, port); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - } - } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } - - - /** - * Process the specified connection. - */ - protected boolean setSocketOptions(SocketChannel socket) { - // Process the connection - int step = 1; - try { - //disable blocking, APR style, we are gonna be polling it - socket.configureBlocking(false); - - // 1: Set socket options: timeout, linger, etc - if (soLinger >= 0) - socket.socket().setSoLinger(true,soLinger); - if (tcpNoDelay) - socket.socket().setTcpNoDelay(true); - if (soTimeout > 0) - socket.socket().setSoTimeout(soTimeout); - - - // 2: SSL handshake - step = 2; - if (sslContext != 0) { -// SSLSocket.attach(sslContext, socket); -// if (SSLSocket.handshake(socket) != 0) { -// if (log.isDebugEnabled()) { -// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); -// } -// return false; -// } - } - - getPoller().register(socket); - - } catch (Throwable t) { - if (log.isDebugEnabled()) { - if (step == 2) { - log.debug(sm.getString("endpoint.err.handshake"), t); - } else { - log.debug(sm.getString("endpoint.err.unexpected"), t); - } - } - // Tell to close the socket - return false; - } - return true; - } - - - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return (workers.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); - } - } - - - /** - * Allocate a new poller of the specified size. - */ - protected long allocatePoller(int size, long pool, int timeout) { - try { - return Poll.create(size, pool, 0, timeout * 1000); - } catch (Error e) { - if (Status.APR_STATUS_IS_EINVAL(e.getError())) { - log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); - return 0; - } else { - log.error(sm.getString("endpoint.poll.initfail"), e); - return -1; - } - } - } - - - /** - * Process given socket. - */ - protected boolean processSocket(SocketChannel socket) { - try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketProcessor(socket)); - } - } catch (Throwable t) { - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; - } - - - /** - * Process given socket for an event. - */ - protected boolean processSocket(SocketChannel socket, boolean error) { - try { - if (executor == null) { - getWorkerThread().assign(socket, error); - } else { - executor.execute(new SocketEventProcessor(socket, error)); - } - } catch (Throwable t) { - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; - } - - - // --------------------------------------------------- Acceptor Inner Class - - - /** - * Server socket acceptor thread. - */ - protected class Acceptor implements Runnable { - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - - try { - // Accept the next incoming connection from the server socket - SocketChannel socket = serverSock.accept(); - // Hand this socket off to an appropriate processor - if(!setSocketOptions(socket)) - { - // Close socket right away - socket.socket().close(); - socket.close(); - } - } catch (Throwable t) { - log.error(sm.getString("endpoint.accept.fail"), t); - } - - // The processor will recycle itself when it finishes - - } - - } - - } - - - // ----------------------------------------------------- Poller Inner Class - - - /** - * Poller class. - */ - public class Poller implements Runnable { - - protected Selector selector; - protected LinkedList events = new LinkedList(); - protected boolean close = false; - protected boolean comet = true; - - protected int keepAliveCount = 0; - public int getKeepAliveCount() { return keepAliveCount; } - - - - public Poller(boolean comet) throws IOException { - this.comet = comet; - this.selector = Selector.open(); - } - - public Selector getSelector() { return selector;} - - /** - * Create the poller. With some versions of APR, the maximum poller size will - * be 62 (reocmpiling APR is necessary to remove this limitation). - */ - protected void init() { - keepAliveCount = 0; - } - - /** - * Destroy the poller. - */ - protected void destroy() { - // Wait for polltime before doing anything, so that the poller threads - // exit, otherwise parallel descturction of sockets which are still - // in the poller can cause problems - try { - synchronized (this) { - this.wait(pollTime / 1000); - } - } catch (InterruptedException e) { - // Ignore - } - close = true; - } - - /** - * Add specified socket and associated pool to the poller. The socket will - * be added to a temporary array, and polled first after a maximum amount - * of time equal to pollTime (in most cases, latency will be much lower, - * however). - * - * @param socket to add to the poller - */ - public void add(final SocketChannel socket) { - final SelectionKey key = socket.keyFor(selector); - Runnable r = new Runnable() { - public void run() { - if ( key != null ) key.interestOps(SelectionKey.OP_READ); - } - }; - synchronized (events) { - events.add(r); - } - selector.wakeup(); - } - - public void events() { - synchronized (events) { - Runnable r = null; - while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { - try { - r.run(); - } catch ( Exception x ) { - log.error("",x); - } - } - events.clear(); - } - } - - public void register(final SocketChannel socket) - { - SelectionKey key = socket.keyFor(selector); - Runnable r = new Runnable() { - public void run() { - try { - socket.register(selector, SelectionKey.OP_READ, new KeyAttachment()); - } catch (Exception x) { - log.error("", x); - } - } - - }; - synchronized (events) { - events.add(r); - } - selector.wakeup(); - } - - public void cancelledKey(SelectionKey key) { - try { - KeyAttachment ka = (KeyAttachment) key.attachment(); - key.cancel(); - if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true); - key.channel().close(); - } catch (IOException e) { - if ( log.isDebugEnabled() ) log.debug("",e); - // Ignore - } - } - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Loop until we receive a shutdown command - while (running) { - // Loop if endpoint is paused - while (paused) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - - events(); - // Time to terminate? - if (close) return; - - int keyCount = 0; - try { - keyCount = selector.select(selectorTimeout); - } catch (IOException x) { - log.error("",x); - continue; - } - //timeout - Set keys = selector.keys(); - long now = System.currentTimeMillis(); - for (Iterator iter = keys.iterator(); iter.hasNext(); ) { - SelectionKey key = (SelectionKey) iter.next(); - try { - if (key.interestOps() == SelectionKey.OP_READ) { - //only timeout sockets that we are waiting for a read from - KeyAttachment ka = (KeyAttachment) key.attachment(); - long delta = now - ka.getLastAccess(); - if (delta > (long) soTimeout) { - cancelledKey(key); - } - } - }catch ( CancelledKeyException ckx ) { - cancelledKey(key); - } - } - - - if (keyCount == 0) continue; - - Iterator iterator = selector.selectedKeys().iterator(); - // Walk through the collection of ready keys and dispatch - // any active event. - while (iterator.hasNext()) { - SelectionKey sk = (SelectionKey) iterator.next(); - iterator.remove(); - KeyAttachment attachment = (KeyAttachment)sk.attachment(); - try { - if(attachment == null) attachment = new KeyAttachment(); - attachment.access(); - sk.attach(attachment); - - int readyOps = sk.readyOps(); - sk.interestOps(sk.interestOps() & ~readyOps); - SocketChannel channel = (SocketChannel)sk.channel(); - boolean read = sk.isReadable(); - if (read) { - if ( comet ) { - if (!processSocket(channel,false)) processSocket(channel,true); - } else { - boolean close = (!processSocket(channel)); - if ( close ) { - channel.socket().close(); - channel.close(); - } - } - } - if (sk.isValid() && sk.isWritable()) { - } - } catch ( CancelledKeyException ckx ) { - if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true); - try { - sk.channel().close(); - }catch ( Exception ignore){} - } catch (Throwable t) { - log.error("",t); - } - }//while - - - } - synchronized (this) { - this.notifyAll(); - } - - } - - } - - public static class KeyAttachment { - - public long getLastAccess() { return lastAccess; } - public void access() { access(System.currentTimeMillis()); } - public void access(long access) { lastAccess = access; } - public void setComet(boolean comet) { this.comet = comet; } - public boolean getComet() { return comet; } - public boolean getCurrentAccess() { return currentAccess; } - public void setCurrentAccess(boolean access) { currentAccess = access; } - - protected long lastAccess = System.currentTimeMillis(); - protected boolean currentAccess = false; - protected boolean comet = false; - - } - - - - // ----------------------------------------------------- Worker Inner Class - - - /** - * Server processor class. - */ - protected class Worker implements Runnable { - - - protected Thread thread = null; - protected boolean available = false; - protected SocketChannel socket = null; - protected boolean event = false; - protected boolean error = false; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assign(SocketChannel socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - event = false; - error = false; - available = true; - notifyAll(); - - } - - - protected synchronized void assign(SocketChannel socket, boolean error) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - event = true; - this.error = error; - available = true; - notifyAll(); - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - protected synchronized SocketChannel await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - SocketChannel socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - - // Wait for the next socket to be assigned - SocketChannel socket = await(); - if (socket == null) - continue; - - // Process the request from this socket - if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { - // Close socket and pool - try { - socket.socket().close(); - socket.close(); - }catch ( Exception x ) { - log.error("",x); - } - } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { - // Close socket and pool - try { - socket.socket().close(); - socket.close(); - }catch ( Exception x ) { - log.error("",x); - } - } - - // Finish up this request - recycleWorkerThread(this); - - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.start(); - } - - - } - - - // ----------------------------------------------- SendfileData Inner Class - - - /** - * SendfileData class. - */ - public static class SendfileData { - // File - public String fileName; - public long fd; - public long fdpool; - // Range information - public long start; - public long end; - // Socket and socket pool - public SocketChannel socket; - // Position - public long pos; - // KeepAlive flag - public boolean keepAlive; - } - - - // --------------------------------------------------- Sendfile Inner Class - - - /** - * Sendfile class. - */ - public class Sendfile implements Runnable { - - protected long sendfilePollset = 0; - protected long pool = 0; - protected long[] desc; - protected HashMap sendfileData; - - protected int sendfileCount; - public int getSendfileCount() { return sendfileCount; } - - protected ArrayList addS; - - /** - * Create the sendfile poller. With some versions of APR, the maximum poller size will - * be 62 (reocmpiling APR is necessary to remove this limitation). - */ - protected void init() { -// pool = Pool.create(serverSockPool); -// int size = sendfileSize / sendfileThreadCount; -// sendfilePollset = allocatePoller(size, pool, soTimeout); -// if (sendfilePollset == 0 && size > 1024) { -// size = 1024; -// sendfilePollset = allocatePoller(size, pool, soTimeout); -// } -// if (sendfilePollset == 0) { -// size = 62; -// sendfilePollset = allocatePoller(size, pool, soTimeout); -// } -// desc = new long[size * 2]; -// sendfileData = new HashMap(size); -// addS = new ArrayList(); - } - - /** - * Destroy the poller. - */ - protected void destroy() { -// // Wait for polltime before doing anything, so that the poller threads -// // exit, otherwise parallel descturction of sockets which are still -// // in the poller can cause problems -// try { -// synchronized (this) { -// this.wait(pollTime / 1000); -// } -// } catch (InterruptedException e) { -// // Ignore -// } -// // Close any socket remaining in the add queue -// for (int i = (addS.size() - 1); i >= 0; i--) { -// SendfileData data = addS.get(i); -// Socket.destroy(data.socket); -// } -// // Close all sockets still in the poller -// int rv = Poll.pollset(sendfilePollset, desc); -// if (rv > 0) { -// for (int n = 0; n < rv; n++) { -// Socket.destroy(desc[n*2+1]); -// } -// } -// Pool.destroy(pool); -// sendfileData.clear(); - } - - /** - * Add the sendfile data to the sendfile poller. Note that in most cases, - * the initial non blocking calls to sendfile will return right away, and - * will be handled asynchronously inside the kernel. As a result, - * the poller will never be used. - * - * @param data containing the reference to the data which should be snet - * @return true if all the data has been sent right away, and false - * otherwise - */ - public boolean add(SendfileData data) { -// // Initialize fd from data given -// try { -// data.fdpool = Socket.pool(data.socket); -// data.fd = File.open -// (data.fileName, File.APR_FOPEN_READ -// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY, -// 0, data.fdpool); -// data.pos = data.start; -// // Set the socket to nonblocking mode -// Socket.timeoutSet(data.socket, 0); -// while (true) { -// long nw = Socket.sendfilen(data.socket, data.fd, -// data.pos, data.end - data.pos, 0); -// if (nw < 0) { -// if (!(-nw == Status.EAGAIN)) { -// Socket.destroy(data.socket); -// data.socket = 0; -// return false; -// } else { -// // Break the loop and add the socket to poller. -// break; -// } -// } else { -// data.pos = data.pos + nw; -// if (data.pos >= data.end) { -// // Entire file has been sent -// Pool.destroy(data.fdpool); -// // Set back socket to blocking mode -// Socket.timeoutSet(data.socket, soTimeout * 1000); -// return true; -// } -// } -// } -// } catch (Exception e) { -// log.error(sm.getString("endpoint.sendfile.error"), e); -// return false; -// } -// // Add socket to the list. Newly added sockets will wait -// // at most for pollTime before being polled -// synchronized (this) { -// addS.add(data); -// this.notify(); -// } - return false; - } - - /** - * Remove socket from the poller. - * - * @param data the sendfile data which should be removed - */ - protected void remove(SendfileData data) { -// int rv = Poll.remove(sendfilePollset, data.socket); -// if (rv == Status.APR_SUCCESS) { -// sendfileCount--; -// } -// sendfileData.remove(data); - } - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - -// // Loop until we receive a shutdown command -// while (running) { -// -// // Loop if endpoint is paused -// while (paused) { -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// // Ignore -// } -// } -// -// while (sendfileCount < 1 && addS.size() < 1) { -// try { -// synchronized (this) { -// this.wait(); -// } -// } catch (InterruptedException e) { -// // Ignore -// } -// } -// -// try { -// // Add socket to the poller -// if (addS.size() > 0) { -// synchronized (this) { -// for (int i = (addS.size() - 1); i >= 0; i--) { -// SendfileData data = addS.get(i); -// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT); -// if (rv == Status.APR_SUCCESS) { -// sendfileData.put(new Long(data.socket), data); -// sendfileCount++; -// } else { -// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv))); -// // Can't do anything: close the socket right away -// Socket.destroy(data.socket); -// } -// } -// addS.clear(); -// } -// } -// // Pool for the specified interval -// int rv = Poll.poll(sendfilePollset, pollTime, desc, false); -// if (rv > 0) { -// for (int n = 0; n < rv; n++) { -// // Get the sendfile state -// SendfileData state = -// sendfileData.get(new Long(desc[n*2+1])); -// // Problem events -// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) -// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { -// // Close socket and clear pool -// remove(state); -// // Destroy file descriptor pool, which should close the file -// // Close the socket, as the reponse would be incomplete -// Socket.destroy(state.socket); -// continue; -// } -// // Write some data using sendfile -// long nw = Socket.sendfilen(state.socket, state.fd, -// state.pos, -// state.end - state.pos, 0); -// if (nw < 0) { -// // Close socket and clear pool -// remove(state); -// // Close the socket, as the reponse would be incomplete -// // This will close the file too. -// Socket.destroy(state.socket); -// continue; -// } -// -// state.pos = state.pos + nw; -// if (state.pos >= state.end) { -// remove(state); -// if (state.keepAlive) { -// // Destroy file descriptor pool, which should close the file -// Pool.destroy(state.fdpool); -// Socket.timeoutSet(state.socket, soTimeout * 1000); -// // If all done hand this socket off to a worker for -// // processing of further requests -// if (!processSocket(state.socket)) { -// Socket.destroy(state.socket); -// } -// } else { -// // Close the socket since this is -// // the end of not keep-alive request. -// Socket.destroy(state.socket); -// } -// } -// } -// } else if (rv < 0) { -// int errn = -rv; -// /* Any non timeup or interrupted error is critical */ -// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { -// if (errn > Status.APR_OS_START_USERERR) { -// errn -= Status.APR_OS_START_USERERR; -// } -// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn))); -// // Handle poll critical failure -// synchronized (this) { -// destroy(); -// init(); -// } -// continue; -// } -// } -// /* TODO: See if we need to call the maintain for sendfile poller */ -// } catch (Throwable t) { -// log.error(sm.getString("endpoint.poll.error"), t); -// } -// } -// -// synchronized (this) { -// this.notifyAll(); -// } - - } - - } - - - // ------------------------------------------------ Handler Inner Interface - - - /** - * Bare bones interface used for socket processing. Per thread data is to be - * stored in the ThreadWithAttributes extra folders, or alternately in - * thread local fields. - */ - public interface Handler { - public enum SocketState { - OPEN, CLOSED, LONG - } - public SocketState process(SocketChannel socket); - public SocketState event(SocketChannel socket, boolean error); - } - - - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. - * - * @param object the object to be appended to the queue (first element). - */ - public void push(Worker worker) { - workers[end++] = worker; - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - } - - - // ---------------------------------------------- SocketProcessor Inner Class - - - /** - * This class is the equivalent of the Worker, but will simply use in an - * external Executor thread pool. - */ - protected class SocketProcessor implements Runnable { - - protected SocketChannel socket = null; - - public SocketProcessor(SocketChannel socket) { - this.socket = socket; - } - - public void run() { - - // Process the request from this socket - if (handler.process(socket) == Handler.SocketState.CLOSED) { - // Close socket and pool - try { - socket.socket().close(); - socket.close(); - } catch ( Exception x ) { - log.error("",x); - } - socket = null; - } - - } - - } - - - // --------------------------------------- SocketEventProcessor Inner Class - - - /** - * This class is the equivalent of the Worker, but will simply use in an - * external Executor thread pool. - */ - protected class SocketEventProcessor implements Runnable { - - protected SocketChannel socket = null; - protected boolean error = false; - - public SocketEventProcessor(SocketChannel socket, boolean error) { - this.socket = socket; - this.error = error; - } - - public void run() { - - // Process the request from this socket - if (handler.event(socket, error) == Handler.SocketState.CLOSED) { - // Close socket and pool - try { - socket.socket().close(); - socket.close(); - } catch ( Exception x ) { - log.error("",x); - } - socket = null; - } - - } - - } - - -} +import org.apache.tomcat.util.res.StringManager; + +/** + * NIO tailored thread pool, providing the following services: + *
    + *
  • Socket acceptor thread
  • + *
  • Socket poller thread
  • + *
  • Sendfile thread
  • + *
  • Worker threads pool
  • + *
+ * + * When switching to Java 5, there's an opportunity to use the virtual + * machine's thread pool. + * + * @author Mladen Turk + * @author Remy Maucherat + * @author Filip Hanik + */ +public class NioEndpoint { + + + // -------------------------------------------------------------- Constants + + + protected static Log log = LogFactory.getLog(NioEndpoint.class); + + protected static StringManager sm = + StringManager.getManager("org.apache.tomcat.util.net.res"); + + + /** + * The Request attribute key for the cipher suite. + */ + public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; + + /** + * The Request attribute key for the key size. + */ + public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; + + /** + * The Request attribute key for the client certificate chain. + */ + public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; + + /** + * The Request attribute key for the session id. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; + + + // ----------------------------------------------------------------- Fields + + + /** + * Available workers. + */ + protected WorkerStack workers = null; + + + /** + * Running state of the endpoint. + */ + protected volatile boolean running = false; + + + /** + * Will be set to true whenever the endpoint is paused. + */ + protected volatile boolean paused = false; + + + /** + * Track the initialization state of the endpoint. + */ + protected boolean initialized = false; + + + /** + * Current worker threads busy count. + */ + protected int curThreadsBusy = 0; + + + /** + * Current worker threads count. + */ + protected int curThreads = 0; + + + /** + * Sequence number used to generate thread names. + */ + protected int sequence = 0; + + + /** + * Root APR memory pool. + */ + protected long rootPool = 0; + + + /** + * Server socket "pointer". + */ + protected ServerSocketChannel serverSock = null; + + + /** + * APR memory pool for the server socket. + */ + protected long serverSockPool = 0; + + + /** + * SSL context. + */ + protected long sslContext = 0; + + + // ------------------------------------------------------------- Properties + + + /** + * External Executor based thread pool. + */ + protected Executor executor = null; + public void setExecutor(Executor executor) { this.executor = executor; } + public Executor getExecutor() { return executor; } + + + /** + * Maximum amount of worker threads. + */ + protected int maxThreads = 40; + public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } + public int getMaxThreads() { return maxThreads; } + + + /** + * Priority of the acceptor and poller threads. + */ + protected int threadPriority = Thread.NORM_PRIORITY; + public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } + public int getThreadPriority() { return threadPriority; } + + + /** + * Size of the socket poller. + */ + protected int pollerSize = 8 * 1024; + public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } + public int getPollerSize() { return pollerSize; } + + + /** + * Size of the sendfile (= concurrent files which can be served). + */ + protected int sendfileSize = 1 * 1024; + public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; } + public int getSendfileSize() { return sendfileSize; } + + + /** + * Server socket port. + */ + protected int port; + public int getPort() { return port; } + public void setPort(int port ) { this.port=port; } + + + /** + * Address for the server socket. + */ + protected InetAddress address; + public InetAddress getAddress() { return address; } + public void setAddress(InetAddress address) { this.address = address; } + + + /** + * Handling of accepted sockets. + */ + protected Handler handler = null; + public void setHandler(Handler handler ) { this.handler = handler; } + public Handler getHandler() { return handler; } + + + /** + * Allows the server developer to specify the backlog that + * should be used for server sockets. By default, this value + * is 100. + */ + protected int backlog = 100; + public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } + public int getBacklog() { return backlog; } + + + /** + * Socket TCP no delay. + */ + protected boolean tcpNoDelay = false; + public boolean getTcpNoDelay() { return tcpNoDelay; } + public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } + + + /** + * Socket linger. + */ + protected int soLinger = 100; + public int getSoLinger() { return soLinger; } + public void setSoLinger(int soLinger) { this.soLinger = soLinger; } + + + /** + * Socket timeout. + */ + protected int soTimeout = -1; + public int getSoTimeout() { return soTimeout; } + public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } + + + /** + * Timeout on first request read before going to the poller, in ms. + */ + protected int firstReadTimeout = 60000; + public int getFirstReadTimeout() { return firstReadTimeout; } + public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } + + + /** + * Poll interval, in microseconds. The smaller the value, the more CPU the poller + * will use, but the more responsive to activity it will be. + */ + protected int pollTime = 2000; + public int getPollTime() { return pollTime; } + public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } + + + /** + * The default is true - the created threads will be + * in daemon mode. If set to false, the control thread + * will not be daemon - and will keep the process alive. + */ + protected boolean daemon = true; + public void setDaemon(boolean b) { daemon = b; } + public boolean getDaemon() { return daemon; } + + + /** + * Name of the thread pool, which will be used for naming child threads. + */ + protected String name = "TP"; + public void setName(String name) { this.name = name; } + public String getName() { return name; } + + + /** + * Use endfile for sending static files. + */ + protected boolean useSendfile = Library.APR_HAS_SENDFILE; + public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; } + public boolean getUseSendfile() { return useSendfile; } + + + /** + * Allow comet request handling. + */ + protected boolean useComet = true; + public void setUseComet(boolean useComet) { this.useComet = useComet; } + public boolean getUseComet() { return useComet; } + + + /** + * Acceptor thread count. + */ + protected int acceptorThreadCount = 0; + public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } + public int getAcceptorThreadCount() { return acceptorThreadCount; } + + + /** + * Sendfile thread count. + */ + protected int sendfileThreadCount = 0; + public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; } + public int getSendfileThreadCount() { return sendfileThreadCount; } + + + /** + * Poller thread count. + */ + protected int pollerThreadCount = 0; + public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } + public int getPollerThreadCount() { return pollerThreadCount; } + + protected long selectorTimeout = 5000; + public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} + public long getSelectorTimeout(){ return this.selectorTimeout; } + /** + * The socket poller. + */ + protected Poller[] pollers = null; + protected int pollerRoundRobin = 0; + public Poller getPoller() { + pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; + Poller poller = pollers[pollerRoundRobin]; + poller.comet = false; + return poller; + } + + + /** + * The socket poller used for Comet support. + */ + public Poller getCometPoller() { + Poller poller = getPoller(); + poller.comet = true; + return poller; + } + + + /** + * The static file sender. + */ + protected Sendfile[] sendfiles = null; + protected int sendfileRoundRobin = 0; + public Sendfile getSendfile() { + sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length; + return sendfiles[sendfileRoundRobin]; + } + + + /** + * Dummy maxSpareThreads property. + */ + public int getMaxSpareThreads() { return 0; } + + + /** + * Dummy minSpareThreads property. + */ + public int getMinSpareThreads() { return 0; } + + + /** + * SSL engine. + */ + protected String SSLEngine = "off"; + public String getSSLEngine() { return SSLEngine; } + public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } + + + /** + * SSL protocols. + */ + protected String SSLProtocol = "all"; + public String getSSLProtocol() { return SSLProtocol; } + public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } + + + /** + * SSL password (if a cert is encrypted, and no password has been provided, a callback + * will ask for a password). + */ + protected String SSLPassword = null; + public String getSSLPassword() { return SSLPassword; } + public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } + + + /** + * SSL cipher suite. + */ + protected String SSLCipherSuite = "ALL"; + public String getSSLCipherSuite() { return SSLCipherSuite; } + public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } + + + /** + * SSL certificate file. + */ + protected String SSLCertificateFile = null; + public String getSSLCertificateFile() { return SSLCertificateFile; } + public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } + + + /** + * SSL certificate key file. + */ + protected String SSLCertificateKeyFile = null; + public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } + public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } + + + /** + * SSL certificate chain file. + */ + protected String SSLCertificateChainFile = null; + public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } + public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } + + + /** + * SSL CA certificate path. + */ + protected String SSLCACertificatePath = null; + public String getSSLCACertificatePath() { return SSLCACertificatePath; } + public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } + + + /** + * SSL CA certificate file. + */ + protected String SSLCACertificateFile = null; + public String getSSLCACertificateFile() { return SSLCACertificateFile; } + public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } + + + /** + * SSL CA revocation path. + */ + protected String SSLCARevocationPath = null; + public String getSSLCARevocationPath() { return SSLCARevocationPath; } + public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } + + + /** + * SSL CA revocation file. + */ + protected String SSLCARevocationFile = null; + public String getSSLCARevocationFile() { return SSLCARevocationFile; } + public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } + + + /** + * SSL verify client. + */ + protected String SSLVerifyClient = "none"; + public String getSSLVerifyClient() { return SSLVerifyClient; } + public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } + + + /** + * SSL verify depth. + */ + protected int SSLVerifyDepth = 10; + public int getSSLVerifyDepth() { return SSLVerifyDepth; } + public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } + + + // --------------------------------------------------------- Public Methods + + + /** + * Number of keepalive sockets. + */ + public int getKeepAliveCount() { + if (pollers == null) { + return 0; + } else { + int keepAliveCount = 0; + for (int i = 0; i < pollers.length; i++) { + keepAliveCount += pollers[i].getKeepAliveCount(); + } + return keepAliveCount; + } + } + + + /** + * Number of sendfile sockets. + */ + public int getSendfileCount() { + if (sendfiles == null) { + return 0; + } else { + int sendfileCount = 0; + for (int i = 0; i < sendfiles.length; i++) { + sendfileCount += sendfiles[i].getSendfileCount(); + } + return sendfileCount; + } + } + + + /** + * Return the amount of threads that are managed by the pool. + * + * @return the amount of threads that are managed by the pool + */ + public int getCurrentThreadCount() { + return curThreads; + } + + + /** + * Return the amount of threads currently busy. + * + * @return the amount of threads currently busy + */ + public int getCurrentThreadsBusy() { + return curThreadsBusy; + } + + + /** + * Return the state of the endpoint. + * + * @return true if the endpoint is running, false otherwise + */ + public boolean isRunning() { + return running; + } + + + /** + * Return the state of the endpoint. + * + * @return true if the endpoint is paused, false otherwise + */ + public boolean isPaused() { + return paused; + } + + + // ----------------------------------------------- Public Lifecycle Methods + + + /** + * Initialize the endpoint. + */ + public void init() + throws Exception { + + if (initialized) + return; + + serverSock = ServerSocketChannel.open(); + InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); + serverSock.socket().bind(addr,100); //todo, set backlog value + serverSock.configureBlocking(true); //mimic APR behavior + // Sendfile usage on systems which don't support it cause major problems + if (useSendfile) { + log.warn(sm.getString("endpoint.sendfile.nosupport")); + useSendfile = false; + } + + // Initialize thread count defaults for acceptor, poller and sendfile + if (acceptorThreadCount == 0) { + // FIXME: Doesn't seem to work that well with multiple accept threads + acceptorThreadCount = 1; + } + if (pollerThreadCount != 1) { + // limit to one poller, no need for others + pollerThreadCount = 1; + } + if (sendfileThreadCount != 0) { + sendfileThreadCount = 0; + } + + // Initialize SSL if needed + if (!"off".equalsIgnoreCase(SSLEngine)) { + // Initialize SSL + // FIXME: one per VM call ? + if ("on".equalsIgnoreCase(SSLEngine)) { + SSL.initialize(null); + } else { + SSL.initialize(SSLEngine); + } + // SSL protocol + int value = SSL.SSL_PROTOCOL_ALL; + if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV2; + } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV3; + } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_TLSV1; + } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; + } +// // Create SSL Context +// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); +// // List the ciphers that the client is permitted to negotiate +// SSLContext.setCipherSuite(sslContext, SSLCipherSuite); +// // Load Server key and certificate +// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); +// // Set certificate chain file +// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); +// // Support Client Certificates +// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); +// // Set revocation +// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); +// // Client certificate verification +// value = SSL.SSL_CVERIFY_NONE; +// if ("optional".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_OPTIONAL; +// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_REQUIRE; +// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; +// } +// SSLContext.setVerify(sslContext, value, SSLVerifyDepth); + // For now, sendfile is not supported with SSL + useSendfile = false; + } + + initialized = true; + + } + + + /** + * Start the APR endpoint, creating acceptor, poller and sendfile threads. + */ + public void start() + throws Exception { + // Initialize socket if not done before + if (!initialized) { + init(); + } + if (!running) { + running = true; + paused = false; + + // Create worker collection + if (executor == null) { + workers = new WorkerStack(maxThreads); + } + + // Start acceptor threads + for (int i = 0; i < acceptorThreadCount; i++) { + Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); + acceptorThread.setPriority(threadPriority); + acceptorThread.setDaemon(daemon); + acceptorThread.start(); + } + + // Start poller threads + pollers = new Poller[pollerThreadCount]; + for (int i = 0; i < pollerThreadCount; i++) { + pollers[i] = new Poller(false); + pollers[i].init(); + Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); + } + + // Start sendfile threads + if (useSendfile) { + sendfiles = new Sendfile[sendfileThreadCount]; + for (int i = 0; i < sendfileThreadCount; i++) { + sendfiles[i] = new Sendfile(); + sendfiles[i].init(); + Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i); + sendfileThread.setPriority(threadPriority); + sendfileThread.setDaemon(true); + sendfileThread.start(); + } + } + } + } + + + /** + * Pause the endpoint, which will make it stop accepting new sockets. + */ + public void pause() { + if (running && !paused) { + paused = true; + unlockAccept(); + } + } + + + /** + * Resume the endpoint, which will make it start accepting new sockets + * again. + */ + public void resume() { + if (running) { + paused = false; + } + } + + + /** + * Stop the endpoint. This will cause all processing threads to stop. + */ + public void stop() { + if (running) { + running = false; + unlockAccept(); + for (int i = 0; i < pollers.length; i++) { + pollers[i].destroy(); + } + pollers = null; + if (useSendfile) { + for (int i = 0; i < sendfiles.length; i++) { + sendfiles[i].destroy(); + } + sendfiles = null; + } + } + } + + + /** + * Deallocate APR memory pools, and close server socket. + */ + public void destroy() throws Exception { + if (running) { + stop(); + } + // Close server socket + serverSock.socket().close(); + serverSock.close(); + serverSock = null; + sslContext = 0; + initialized = false; + } + + + // ------------------------------------------------------ Protected Methods + + + /** + * Get a sequence number used for thread naming. + */ + protected int getSequence() { + return sequence++; + } + + + /** + * Unlock the server socket accept using a bugus connection. + */ + protected void unlockAccept() { + java.net.Socket s = null; + try { + // Need to create a connection to unlock the accept(); + if (address == null) { + s = new java.net.Socket("127.0.0.1", port); + } else { + s = new java.net.Socket(address, port); + // setting soLinger to a small value will help shutdown the + // connection quicker + s.setSoLinger(true, 0); + } + } catch(Exception e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); + } + } finally { + if (s != null) { + try { + s.close(); + } catch (Exception e) { + // Ignore + } + } + } + } + + + /** + * Process the specified connection. + */ + protected boolean setSocketOptions(SocketChannel socket) { + // Process the connection + int step = 1; + try { + //disable blocking, APR style, we are gonna be polling it + socket.configureBlocking(false); + + // 1: Set socket options: timeout, linger, etc + if (soLinger >= 0) + socket.socket().setSoLinger(true,soLinger); + if (tcpNoDelay) + socket.socket().setTcpNoDelay(true); + if (soTimeout > 0) + socket.socket().setSoTimeout(soTimeout); + + + // 2: SSL handshake + step = 2; + if (sslContext != 0) { +// SSLSocket.attach(sslContext, socket); +// if (SSLSocket.handshake(socket) != 0) { +// if (log.isDebugEnabled()) { +// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); +// } +// return false; +// } + } + + getPoller().register(socket); + + } catch (Throwable t) { + if (log.isDebugEnabled()) { + if (step == 2) { + log.debug(sm.getString("endpoint.err.handshake"), t); + } else { + log.debug(sm.getString("endpoint.err.unexpected"), t); + } + } + // Tell to close the socket + return false; + } + return true; + } + + + /** + * Create (or allocate) and return an available processor for use in + * processing a specific HTTP request, if possible. If the maximum + * allowed processors have already been created and are in use, return + * null instead. + */ + protected Worker createWorkerThread() { + + synchronized (workers) { + if (workers.size() > 0) { + curThreadsBusy++; + return (workers.pop()); + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + if (maxThreads < 0) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + return (null); + } + } + } + + } + + + /** + * Create and return a new processor suitable for processing HTTP + * requests and returning the corresponding responses. + */ + protected Worker newWorkerThread() { + + Worker workerThread = new Worker(); + workerThread.start(); + return (workerThread); + + } + + + /** + * Return a new worker thread, and block while to worker is available. + */ + protected Worker getWorkerThread() { + // Allocate a new worker thread + Worker workerThread = createWorkerThread(); + while (workerThread == null) { + try { + synchronized (workers) { + workers.wait(); + } + } catch (InterruptedException e) { + // Ignore + } + workerThread = createWorkerThread(); + } + return workerThread; + } + + + /** + * Recycle the specified Processor so that it can be used again. + * + * @param workerThread The processor to be recycled + */ + protected void recycleWorkerThread(Worker workerThread) { + synchronized (workers) { [... 937 lines stripped ...] --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org