directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r122749 - incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket
Date Sun, 19 Dec 2004 09:47:10 GMT
Author: trustin
Date: Sun Dec 19 01:47:09 2004
New Revision: 122749

URL: http://svn.apache.org/viewcvs?view=rev&rev=122749
Log:
 * All TCP threads are now automatically started and ended
Modified:
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
   incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java&r2=122749
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpAcceptor.java
Sun Dec 19 01:47:09 2004
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
@@ -34,6 +35,7 @@
 import org.apache.mina.io.IoHandler;
 import org.apache.mina.io.IoHandlerFilter;
 import org.apache.mina.util.IoHandlerFilterManager;
+import org.apache.mina.util.Queue;
 
 /**
  * TODO Insert type comment.
@@ -53,6 +55,10 @@
 
     private final Map channels = new HashMap();
 
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
     private Worker worker;
 
     /**
@@ -71,9 +77,8 @@
         this.bind( address, 50, defaultHandler );
     }
 
-    public synchronized void bind( SocketAddress address, int backlog,
-                                  IoHandler defaultHandler )
-            throws IOException
+    public void bind( SocketAddress address, int backlog,
+                     IoHandler defaultHandler ) throws IOException
     {
         Validate.notNull( address );
         Validate.notNull( defaultHandler );
@@ -85,30 +90,49 @@
         ServerSocketChannel ssc = ServerSocketChannel.open();
         ssc.configureBlocking( false );
         ssc.socket().bind( address, backlog );
-        ssc.register( selector, SelectionKey.OP_ACCEPT, defaultHandler );
 
-        channels.put( address, ssc );
-
-        if( worker == null )
+        synchronized( this )
         {
-            worker = new Worker();
-            worker.start();
+            synchronized( registerQueue )
+            {
+                registerQueue.push( new RegistrationRequest( ssc,
+                                                             defaultHandler ) );
+            }
+            channels.put( address, ssc );
+
+            if( worker == null )
+            {
+                worker = new Worker();
+                worker.start();
+            }
         }
+
+        selector.wakeup();
     }
 
-    public synchronized void unbind( SocketAddress address )
+    public void unbind( SocketAddress address )
     {
         Validate.notNull( address );
 
-        ServerSocketChannel ssc = ( ServerSocketChannel ) channels
-                .get( address );
+        ServerSocketChannel ssc;
+
+        synchronized( this )
+        {
+            ssc = ( ServerSocketChannel ) channels.get( address );
 
-        if( ssc == null )
-            throw new IllegalArgumentException( "Unknown address: " + address );
+            if( ssc == null )
+                throw new IllegalArgumentException( "Unknown address: "
+                                                    + address );
+
+            SelectionKey key = ssc.keyFor( selector );
+            channels.remove( address );
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( key );
+            }
+        }
 
-        SelectionKey key = ssc.keyFor( selector );
-        key.cancel();
-        channels.remove( address );
+        selector.wakeup();
 
         try
         {
@@ -135,6 +159,22 @@
                 {
                     int nKeys = selector.select();
 
+                    registerNew();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( TcpAcceptor.this )
+                        {
+                            if( selector.keys().isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+
+                    cancelKeys();
+
                     if( nKeys == 0 )
                         continue;
 
@@ -179,6 +219,49 @@
         }
     }
 
+    private void registerNew() throws ClosedChannelException
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            req.channel.register( selector, SelectionKey.OP_ACCEPT,
+                                  req.handler );
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SelectionKey key;
+            synchronized( cancelQueue )
+            {
+                key = ( SelectionKey ) cancelQueue.pop();
+            }
+
+            if( key == null )
+                break;
+            else {
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
     public void addFilter( int priority, IoHandlerFilter filter )
     {
         filterManager.addFilter( priority, filter );
@@ -187,5 +270,19 @@
     public void removeFilter( IoHandlerFilter filter )
     {
         filterManager.removeFilter( filter );
+    }
+
+    private static class RegistrationRequest
+    {
+        private final ServerSocketChannel channel;
+
+        private final IoHandler handler;
+
+        private RegistrationRequest( ServerSocketChannel channel,
+                                    IoHandler handler )
+        {
+            this.channel = channel;
+            this.handler = handler;
+        }
     }
 }

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java&r2=122749
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpConnector.java
Sun Dec 19 01:47:09 2004
@@ -36,7 +36,7 @@
 import org.apache.mina.util.IoHandlerFilterManager;
 
 /**
- * TODO Insert type comment. TODO Stop worker thread when not used.
+ * TODO Insert type comment.
  * 
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
@@ -92,17 +92,15 @@
         else
         {
             ConnectEntry entry = new ConnectEntry( timeout, defaultHandler );
-            ch.register( selector, SelectionKey.OP_CONNECT, entry );
 
-            if( worker == null )
+            synchronized( this )
             {
-                synchronized( this )
+                ch.register( selector, SelectionKey.OP_CONNECT, entry );
+
+                if( worker == null )
                 {
-                    if( worker == null )
-                    {
-                        worker = new Worker();
-                        worker.start();
-                    }
+                    worker = new Worker();
+                    worker.start();
                 }
             }
 
@@ -222,6 +220,15 @@
                 try
                 {
                     int nKeys = selector.select( 1000 );
+
+                    if (selector.keys().isEmpty()) {
+                        synchronized (TcpConnector.this) {
+                            if (selector.keys().isEmpty()) {
+	                            worker = null;
+	                            break;
+                            }
+                        }
+                    }
 
                     if( nKeys > 0 )
                         processSessions( selector.selectedKeys() );

Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java?view=diff&rev=122749&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r1=122748&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java&r2=122749
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
(original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/socket/TcpIoProcessor.java
Sun Dec 19 01:47:09 2004
@@ -32,7 +32,7 @@
 import org.apache.mina.util.Queue;
 
 /**
- * TODO Document me. TODO Stop worker thread if there is no session to process
+ * TODO Document me.
  * 
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$,
@@ -68,6 +68,8 @@
 
     private final Queue flushingSessions = new Queue();
 
+    private final Queue readableSessions = new Queue();
+
     private Worker worker;
 
     private long lastIdleCheckTime = System.currentTimeMillis();
@@ -84,20 +86,17 @@
 
     public void addSession( TcpSession session )
     {
-        synchronized( newSessions )
+        synchronized( this )
         {
-            newSessions.push( session );
-        }
+            synchronized( newSessions )
+            {
+                newSessions.push( session );
+            }
 
-        if( worker == null )
-        {
-            synchronized( this )
+            if( worker == null )
             {
-                if( worker == null )
-                {
-                    worker = new Worker();
-                    worker.start();
-                }
+                worker = new Worker();
+                worker.start();
             }
         }
 
@@ -121,12 +120,18 @@
         SelectionKey key = session.getSelectionKey();
 
         if( ( key.interestOps() & SelectionKey.OP_READ ) == 0 )
-            key.interestOps( key.interestOps() | SelectionKey.OP_READ );
+        {
+            synchronized( readableSessions )
+            {
+                readableSessions.push( session );
+            }
+            selector.wakeup();
+        }
     }
 
     private void addSessions()
     {
-        if( newSessions.size() == 0 )
+        if( newSessions.isEmpty() )
             return;
 
         TcpSession session;
@@ -167,9 +172,31 @@
         }
     }
 
+    private void addReadableSessions()
+    {
+        if( readableSessions.isEmpty() )
+            return;
+
+        TcpSession session;
+
+        for( ;; )
+        {
+            synchronized( readableSessions )
+            {
+                session = ( TcpSession ) readableSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SelectionKey key = session.getSelectionKey();
+            key.interestOps( key.interestOps() | SelectionKey.OP_READ );
+        }
+    }
+
     private void removeSessions()
     {
-        if( removingSessions.size() == 0 )
+        if( removingSessions.isEmpty() )
             return;
 
         for( ;; )
@@ -485,7 +512,6 @@
         public Worker()
         {
             super( "TcpIoProcessor" );
-            setDaemon( true );
         }
 
         public void run()
@@ -496,6 +522,20 @@
                 {
                     int nKeys = selector.select( 1000 );
                     addSessions();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( TcpIoProcessor.this )
+                        {
+                            if( selector.keys().isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+
+                    addReadableSessions();
 
                     if( nKeys > 0 )
                     {

Mime
View raw message