Return-Path: Delivered-To: apmail-incubator-directory-cvs-archive@www.apache.org Received: (qmail 19360 invoked from network); 19 Dec 2004 09:47:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 19 Dec 2004 09:47:13 -0000 Received: (qmail 23357 invoked by uid 500); 19 Dec 2004 09:47:12 -0000 Delivered-To: apmail-incubator-directory-cvs-archive@incubator.apache.org Received: (qmail 23305 invoked by uid 500); 19 Dec 2004 09:47:12 -0000 Mailing-List: contact directory-cvs-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: directory-dev@incubator.apache.org Delivered-To: mailing list directory-cvs@incubator.apache.org Received: (qmail 23290 invoked by uid 99); 19 Dec 2004 09:47:12 -0000 X-ASF-Spam-Status: No, hits=-9.8 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from minotaur.apache.org (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.28) with SMTP; Sun, 19 Dec 2004 01:47:11 -0800 Received: (qmail 19317 invoked by uid 65534); 19 Dec 2004 09:47:10 -0000 Date: 19 Dec 2004 09:47:10 -0000 Message-ID: <20041219094710.19309.qmail@minotaur.apache.org> From: trustin@apache.org To: directory-cvs@incubator.apache.org Subject: svn commit: r122749 - incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N Author: trustin Date: Sun Dec 19 01:47:09 2004 New Revision: 122749 URL: http://svn.apache.org/viewcvs?view=rev&rev=122749 Log: * All TCP threads are now automatically started and ended Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r2=122749 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java Sun Dec 19 01:47:09 2004 @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -34,6 +35,7 @@ import org.apache.mina.io.IoHandler; import org.apache.mina.io.IoHandlerFilter; import org.apache.mina.util.IoHandlerFilterManager; +import org.apache.mina.util.Queue; /** * TODO Insert type comment. @@ -53,6 +55,10 @@ private final Map channels = new HashMap(); + private final Queue registerQueue = new Queue(); + + private final Queue cancelQueue = new Queue(); + private Worker worker; /** @@ -71,9 +77,8 @@ this.bind( address, 50, defaultHandler ); } - public synchronized void bind( SocketAddress address, int backlog, - IoHandler defaultHandler ) - throws IOException + public void bind( SocketAddress address, int backlog, + IoHandler defaultHandler ) throws IOException { Validate.notNull( address ); Validate.notNull( defaultHandler ); @@ -85,30 +90,49 @@ ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking( false ); ssc.socket().bind( address, backlog ); - ssc.register( selector, SelectionKey.OP_ACCEPT, defaultHandler ); - channels.put( address, ssc ); - - if( worker == null ) + synchronized( this ) { - worker = new Worker(); - worker.start(); + synchronized( registerQueue ) + { + registerQueue.push( new RegistrationRequest( ssc, + defaultHandler ) ); + } + channels.put( address, ssc ); + + if( worker == null ) + { + worker = new Worker(); + worker.start(); + } } + + selector.wakeup(); } - public synchronized void unbind( SocketAddress address ) + public void unbind( SocketAddress address ) { Validate.notNull( address ); - ServerSocketChannel ssc = ( ServerSocketChannel ) channels - .get( address ); + ServerSocketChannel ssc; + + synchronized( this ) + { + ssc = ( ServerSocketChannel ) channels.get( address ); - if( ssc == null ) - throw new IllegalArgumentException( "Unknown address: " + address ); + if( ssc == null ) + throw new IllegalArgumentException( "Unknown address: " + + address ); + + SelectionKey key = ssc.keyFor( selector ); + channels.remove( address ); + synchronized( cancelQueue ) + { + cancelQueue.push( key ); + } + } - SelectionKey key = ssc.keyFor( selector ); - key.cancel(); - channels.remove( address ); + selector.wakeup(); try { @@ -135,6 +159,22 @@ { int nKeys = selector.select(); + registerNew(); + + if( selector.keys().isEmpty() ) + { + synchronized( TcpAcceptor.this ) + { + if( selector.keys().isEmpty() ) + { + worker = null; + break; + } + } + } + + cancelKeys(); + if( nKeys == 0 ) continue; @@ -179,6 +219,49 @@ } } + private void registerNew() throws ClosedChannelException + { + if( registerQueue.isEmpty() ) + return; + + for( ;; ) + { + RegistrationRequest req; + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + break; + + req.channel.register( selector, SelectionKey.OP_ACCEPT, + req.handler ); + } + } + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + return; + + for( ;; ) + { + SelectionKey key; + synchronized( cancelQueue ) + { + key = ( SelectionKey ) cancelQueue.pop(); + } + + if( key == null ) + break; + else { + key.cancel(); + selector.wakeup(); // wake up again to trigger thread death + } + } + } + public void addFilter( int priority, IoHandlerFilter filter ) { filterManager.addFilter( priority, filter ); @@ -187,5 +270,19 @@ public void removeFilter( IoHandlerFilter filter ) { filterManager.removeFilter( filter ); + } + + private static class RegistrationRequest + { + private final ServerSocketChannel channel; + + private final IoHandler handler; + + private RegistrationRequest( ServerSocketChannel channel, + IoHandler handler ) + { + this.channel = channel; + this.handler = handler; + } } } Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r2=122749 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java Sun Dec 19 01:47:09 2004 @@ -36,7 +36,7 @@ import org.apache.mina.util.IoHandlerFilterManager; /** - * TODO Insert type comment. TODO Stop worker thread when not used. + * TODO Insert type comment. * * @author Trustin Lee (trustin@apache.org) * @version $Rev$, $Date$ @@ -92,17 +92,15 @@ else { ConnectEntry entry = new ConnectEntry( timeout, defaultHandler ); - ch.register( selector, SelectionKey.OP_CONNECT, entry ); - if( worker == null ) + synchronized( this ) { - synchronized( this ) + ch.register( selector, SelectionKey.OP_CONNECT, entry ); + + if( worker == null ) { - if( worker == null ) - { - worker = new Worker(); - worker.start(); - } + worker = new Worker(); + worker.start(); } } @@ -222,6 +220,15 @@ try { int nKeys = selector.select( 1000 ); + + if (selector.keys().isEmpty()) { + synchronized (TcpConnector.this) { + if (selector.keys().isEmpty()) { + worker = null; + break; + } + } + } if( nKeys > 0 ) processSessions( selector.selectedKeys() ); Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r2=122749 ============================================================================== --- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java (original) +++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java Sun Dec 19 01:47:09 2004 @@ -32,7 +32,7 @@ import org.apache.mina.util.Queue; /** - * TODO Document me. TODO Stop worker thread if there is no session to process + * TODO Document me. * * @author Trustin Lee (trustin@apache.org) * @version $Rev$, $Date$, @@ -68,6 +68,8 @@ private final Queue flushingSessions = new Queue(); + private final Queue readableSessions = new Queue(); + private Worker worker; private long lastIdleCheckTime = System.currentTimeMillis(); @@ -84,20 +86,17 @@ public void addSession( TcpSession session ) { - synchronized( newSessions ) + synchronized( this ) { - newSessions.push( session ); - } + synchronized( newSessions ) + { + newSessions.push( session ); + } - if( worker == null ) - { - synchronized( this ) + if( worker == null ) { - if( worker == null ) - { - worker = new Worker(); - worker.start(); - } + worker = new Worker(); + worker.start(); } } @@ -121,12 +120,18 @@ SelectionKey key = session.getSelectionKey(); if( ( key.interestOps() & SelectionKey.OP_READ ) == 0 ) - key.interestOps( key.interestOps() | SelectionKey.OP_READ ); + { + synchronized( readableSessions ) + { + readableSessions.push( session ); + } + selector.wakeup(); + } } private void addSessions() { - if( newSessions.size() == 0 ) + if( newSessions.isEmpty() ) return; TcpSession session; @@ -167,9 +172,31 @@ } } + private void addReadableSessions() + { + if( readableSessions.isEmpty() ) + return; + + TcpSession session; + + for( ;; ) + { + synchronized( readableSessions ) + { + session = ( TcpSession ) readableSessions.pop(); + } + + if( session == null ) + break; + + SelectionKey key = session.getSelectionKey(); + key.interestOps( key.interestOps() | SelectionKey.OP_READ ); + } + } + private void removeSessions() { - if( removingSessions.size() == 0 ) + if( removingSessions.isEmpty() ) return; for( ;; ) @@ -485,7 +512,6 @@ public Worker() { super( "TcpIoProcessor" ); - setDaemon( true ); } public void run() @@ -496,6 +522,20 @@ { int nKeys = selector.select( 1000 ); addSessions(); + + if( selector.keys().isEmpty() ) + { + synchronized( TcpIoProcessor.this ) + { + if( selector.keys().isEmpty() ) + { + worker = null; + break; + } + } + } + + addReadableSessions(); if( nKeys > 0 ) {