directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r351884 [3/3] - in /directory/network/trunk/src: java/org/apache/mina/common/ java/org/apache/mina/common/support/ java/org/apache/mina/integration/spring/support/ java/org/apache/mina/transport/socket/nio/ java/org/apache/mina/transport/so...
Date Sat, 03 Dec 2005 04:20:55 GMT
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java?rev=351884&r1=351883&r2=351884&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java Fri Dec  2 20:20:27 2005
@@ -1,378 +1,378 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.transport.socket.nio.SocketSessionManager;
-import org.apache.mina.util.ExceptionUtil;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoConnector} for socket transport (TCP/IP).
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager
-{
-    private static volatile int nextId = 0;
-
-    private final IoConnector wrapper;
-    private final int id = nextId++;
-    private final String threadName = "SocketConnector-" + id;
-    private Selector selector;
-    private final Queue connectQueue = new Queue();
-    private Worker worker;
-    private final SocketIoProcessor ioProcessor = new SocketIoProcessor( this, threadName + "-1" );
-
-    /**
-     * Creates a new instance.
-     */
-    public SocketConnectorDelegate( IoConnector wrapper )
-    {
-        this.wrapper = wrapper;
-    }
-
-    public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-    {
-        return connect( address, null, handler, filterChainBuilder );
-    }
-
-    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
-                                  IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-    {
-        if( address == null )
-            throw new NullPointerException( "address" );
-        if( handler == null )
-            throw new NullPointerException( "handler" );
-
-        if( ! ( address instanceof InetSocketAddress ) )
-            throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
-
-        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
-            throw new IllegalArgumentException( "Unexpected local address type: "
-                                                + localAddress.getClass() );
-
-        if( filterChainBuilder == null )
-        {
-            filterChainBuilder = IoFilterChainBuilder.NOOP;
-        }
-        
-        SocketChannel ch = null;
-        boolean success = false;
-        try
-        {
-            ch = SocketChannel.open();
-            ch.socket().setReuseAddress( true );
-            if( localAddress != null )
-            {
-                ch.socket().bind( localAddress );
-            }
-    
-            ch.configureBlocking( false );
-
-            if( ch.connect( address ) )
-            {
-                SocketSessionImpl session = newSession( ch, handler, filterChainBuilder );
-                success = true;
-                ConnectFuture future = new ConnectFuture();
-                future.setSession( session );
-                return future;
-            }
-            
-            success = true;
-        }
-        catch( IOException e )
-        {
-            return ConnectFuture.newFailedFuture( e );
-        }
-        finally
-        {
-            if( !success && ch != null )
-            {
-                try
-                {
-                    ch.close();
-                }
-                catch( IOException e )
-                {
-                    exceptionMonitor.exceptionCaught( this, e );
-                }
-            }
-        }
-        
-        ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder );
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                try
-                {
-                    ch.close();
-                }
-                catch( IOException e2 )
-                {
-                    exceptionMonitor.exceptionCaught( this, e2 );
-                }
-
-                return ConnectFuture.newFailedFuture( e );
-            }
-            synchronized( connectQueue )
-            {
-                connectQueue.push( request );
-            }
-            selector.wakeup();
-        }
-        
-        return request;
-    }
-    
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
-        }
-    }
-
-    private void registerNew()
-    {
-        if( connectQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            ConnectionRequest req;
-            synchronized( connectQueue )
-            {
-                req = ( ConnectionRequest ) connectQueue.pop();
-            }
-
-            if( req == null )
-                break;
-            
-            SocketChannel ch = req.channel;
-            try
-            {
-                ch.register( selector, SelectionKey.OP_CONNECT, req );
-            }
-            catch( IOException e )
-            {
-                req.setException( e );
-            }
-        }
-    }
-    
-    private void processSessions( Set keys )
-    {
-        Iterator it = keys.iterator();
-
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-
-            if( !key.isConnectable() )
-                continue;
-
-            SocketChannel ch = ( SocketChannel ) key.channel();
-            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
-            boolean success = false;
-            try
-            {
-                ch.finishConnect();
-                SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder );
-                entry.setSession( session );
-                success = true;
-            }
-            catch( Throwable e )
-            {
-                entry.setException( e );
-            }
-            finally
-            {
-                key.cancel();
-                if( !success )
-                {
-                    try
-                    {
-                        ch.close();
-                    }
-                    catch( IOException e )
-                    {
-                        exceptionMonitor.exceptionCaught( this, e );
-                    }
-                }
-            }
-        }
-
-        keys.clear();
-    }
-
-    private void processTimedOutSessions( Set keys )
-    {
-        long currentTime = System.currentTimeMillis();
-        Iterator it = keys.iterator();
-
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-
-            if( !key.isValid() )
-                continue;
-
-            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
-            if( currentTime >= entry.deadline )
-            {
-                entry.setException( new ConnectException() );
-                key.cancel();
-            }
-        }
-    }
-
-    private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
-    {
-        SocketSessionImpl session = new SocketSessionImpl( wrapper, ioProcessor, ch, handler );
-        try
-        {
-            this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
-            filterChainBuilder.buildFilterChain( session.getFilterChain() );
-            ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
-        }
-        catch( Throwable e )
-        {
-            ExceptionUtil.throwException( e );
-        }
-        session.getIoProcessor().addNew( session );
-        return session;
-    }
-
-    public int getProcessors()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void setProcessors( int nProcessor )
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( SocketConnectorDelegate.this.threadName );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select( 1000 );
-
-                    registerNew();
-                    
-                    if( nKeys > 0 )
-                    {
-                        processSessions( selector.selectedKeys() );
-                    }
-
-                    processTimedOutSessions( selector.keys() );
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( SocketConnectorDelegate.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                connectQueue.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( IOException e )
-                {
-                    exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-    }
-
-    private static class ConnectionRequest extends ConnectFuture
-    {
-        private final SocketChannel channel;
-        private final long deadline;
-        private final IoHandler handler;
-        private final IoFilterChainBuilder filterChainBuilder;
-
-        private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-        {
-            this.channel = channel;
-            this.deadline = System.currentTimeMillis() + timeout * 1000L;
-            this.handler = handler;
-            this.filterChainBuilder = filterChainBuilder;
-        }
-    }
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.transport.socket.nio.SocketSessionManager;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager
+{
+    private static volatile int nextId = 0;
+
+    private final IoConnector wrapper;
+    private final int id = nextId++;
+    private final String threadName = "SocketConnector-" + id;
+    private Selector selector;
+    private final Queue connectQueue = new Queue();
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     */
+    public SocketConnectorDelegate( IoConnector wrapper )
+    {
+        this.wrapper = wrapper;
+    }
+
+    public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+    {
+        return connect( address, null, handler, filterChainBuilder );
+    }
+
+    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+                                  IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( ! ( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+
+        if( filterChainBuilder == null )
+        {
+            filterChainBuilder = IoFilterChainBuilder.NOOP;
+        }
+        
+        SocketChannel ch = null;
+        boolean success = false;
+        try
+        {
+            ch = SocketChannel.open();
+            ch.socket().setReuseAddress( true );
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+    
+            ch.configureBlocking( false );
+
+            if( ch.connect( address ) )
+            {
+                SocketSessionImpl session = newSession( ch, handler, filterChainBuilder );
+                success = true;
+                ConnectFuture future = new ConnectFuture();
+                future.setSession( session );
+                return future;
+            }
+            
+            success = true;
+        }
+        catch( IOException e )
+        {
+            return ConnectFuture.newFailedFuture( e );
+        }
+        finally
+        {
+            if( !success && ch != null )
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+        
+        ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder );
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e2 )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e2 );
+                }
+
+                return ConnectFuture.newFailedFuture( e );
+            }
+            synchronized( connectQueue )
+            {
+                connectQueue.push( request );
+            }
+            selector.wakeup();
+        }
+        
+        return request;
+    }
+    
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    private void registerNew()
+    {
+        if( connectQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            ConnectionRequest req;
+            synchronized( connectQueue )
+            {
+                req = ( ConnectionRequest ) connectQueue.pop();
+            }
+
+            if( req == null )
+                break;
+            
+            SocketChannel ch = req.channel;
+            try
+            {
+                ch.register( selector, SelectionKey.OP_CONNECT, req );
+            }
+            catch( IOException e )
+            {
+                req.setException( e );
+            }
+        }
+    }
+    
+    private void processSessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isConnectable() )
+                continue;
+
+            SocketChannel ch = ( SocketChannel ) key.channel();
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            boolean success = false;
+            try
+            {
+                ch.finishConnect();
+                SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder );
+                entry.setSession( session );
+                success = true;
+            }
+            catch( Throwable e )
+            {
+                entry.setException( e );
+            }
+            finally
+            {
+                key.cancel();
+                if( !success )
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch( IOException e )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+
+        keys.clear();
+    }
+
+    private void processTimedOutSessions( Set keys )
+    {
+        long currentTime = System.currentTimeMillis();
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isValid() )
+                continue;
+
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            if( currentTime >= entry.deadline )
+            {
+                entry.setException( new ConnectException() );
+                key.cancel();
+            }
+        }
+    }
+
+    private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
+    {
+        SocketSessionImpl session = new SocketSessionImpl( wrapper, ch, handler );
+        try
+        {
+            this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+            filterChainBuilder.buildFilterChain( session.getFilterChain() );
+            ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+        }
+        catch( Throwable e )
+        {
+            ExceptionUtil.throwException( e );
+        }
+        session.getIoProcessor().addNew( session );
+        return session;
+    }
+
+    public int getProcessors()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setProcessors( int nProcessor )
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( SocketConnectorDelegate.this.threadName );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select( 1000 );
+
+                    registerNew();
+                    
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    processTimedOutSessions( selector.keys() );
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketConnectorDelegate.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                connectQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private static class ConnectionRequest extends ConnectFuture
+    {
+        private final SocketChannel channel;
+        private final long deadline;
+        private final IoHandler handler;
+        private final IoFilterChainBuilder filterChainBuilder;
+
+        private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+        {
+            this.channel = channel;
+            this.deadline = System.currentTimeMillis() + timeout * 1000L;
+            this.handler = handler;
+            this.filterChainBuilder = filterChainBuilder;
+        }
+    }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?rev=351884&r1=351883&r2=351884&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Fri Dec  2 20:20:27 2005
@@ -1,599 +1,663 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSessionManager;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.util.Queue;
-
-/**
- * Performs all I/O operations for sockets which is connected or bound.
- * This class is used by MINA internally.
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$,
- */
-class SocketIoProcessor
-{
-    private final IoSessionManager parent;
-    private final String threadName;
-    private Selector selector;
-
-    private final Queue newSessions = new Queue();
-    private final Queue removingSessions = new Queue();
-    private final Queue flushingSessions = new Queue();
-    private final Queue trafficControllingSessions = new Queue();
-
-    private Worker worker;
-
-    private long lastIdleCheckTime = System.currentTimeMillis();
-
-    SocketIoProcessor( IoSessionManager parent, String threadName )
-    {
-        this.parent = parent;
-        this.threadName = threadName;
-    }
-
-    void addNew( SocketSessionImpl session ) throws IOException
-    {
-        synchronized( this )
-        {
-            synchronized( newSessions )
-            {
-                newSessions.push( session );
-            }
-            startupWorker();
-        }
-
-        selector.wakeup();
-    }
-
-    void remove( SocketSessionImpl session ) throws IOException
-    {
-        scheduleRemove( session );
-        startupWorker();
-        selector.wakeup();
-    }
-
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
-        }
-    }
-
-    void flush( SocketSessionImpl session )
-    {
-        scheduleFlush( session );
-        selector.wakeup();
-    }
-
-    void updateTrafficMask( SocketSessionImpl session )
-    {
-        scheduleTrafficControl( session );
-        selector.wakeup();
-    }
-
-    private void scheduleRemove( SocketSessionImpl session )
-    {
-        synchronized( removingSessions )
-        {
-            removingSessions.push( session );
-        }
-    }
-
-    private void scheduleFlush( SocketSessionImpl session )
-    {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
-    }
-
-    private void scheduleTrafficControl( SocketSessionImpl session )
-    {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.push( session );
-        }
-    }
-
-    private void doAddNew()
-    {
-        if( newSessions.isEmpty() )
-            return;
-
-        SocketSessionImpl session;
-
-        for( ;; )
-        {
-            synchronized( newSessions )
-            {
-                session = ( SocketSessionImpl ) newSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            SocketChannel ch = session.getChannel();
-            boolean registered;
-
-            try
-            {
-                ch.configureBlocking( false );
-                session.setSelectionKey( ch.register( selector,
-                                                      SelectionKey.OP_READ,
-                                                      session ) );
-                registered = true;
-            }
-            catch( IOException e )
-            {
-                registered = false;
-                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-            }
-
-            if( registered )
-            {
-                ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session );
-            }
-        }
-    }
-
-    private void doRemove()
-    {
-        if( removingSessions.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            SocketSessionImpl session;
-
-            synchronized( removingSessions )
-            {
-                session = ( SocketSessionImpl ) removingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            SocketChannel ch = session.getChannel();
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.close() is called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleRemove( session );
-                break;
-            }
-            // skip if channel is already closed
-            if( !key.isValid() )
-            {
-                continue;
-            }
-
-            try
-            {
-                key.cancel();
-                ch.close();
-            }
-            catch( IOException e )
-            {
-                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-            }
-            finally
-            {
-                releaseWriteBuffers( session );
-
-                ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session );
-                session.getCloseFuture().setClosed();
-            }
-        }
-    }
-
-    private void process( Set selectedKeys )
-    {
-        Iterator it = selectedKeys.iterator();
-
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-            SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
-
-            if( key.isReadable() && session.getTrafficMask().isReadable() )
-            {
-                read( session );
-            }
-
-            if( key.isWritable() && session.getTrafficMask().isWritable() )
-            {
-                scheduleFlush( session );
-            }
-        }
-
-        selectedKeys.clear();
-    }
-
-    private void read( SocketSessionImpl session )
-    {
-        ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() ); 
-        SocketChannel ch = session.getChannel();
-
-        try
-        {
-            int readBytes = 0;
-            int ret;
-
-            buf.clear();
-
-            try
-            {
-                while( ( ret = ch.read( buf.buf() ) ) > 0 )
-                {
-                    readBytes += ret;
-                }
-            }
-            finally
-            {
-                buf.flip();
-            }
-
-            session.increaseReadBytes( readBytes );
-
-            if( readBytes > 0 )
-            {
-                ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
-                newBuf.put( buf );
-                newBuf.flip();
-                ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
-            }
-            if( ret < 0 )
-            {
-                scheduleRemove( session );
-            }
-        }
-        catch( Throwable e )
-        {
-            if( e instanceof IOException )
-                scheduleRemove( session );
-            ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-        }
-        finally
-        {
-            buf.release();
-        }
-    }
-
-    private void notifyIdleness()
-    {
-        // process idle sessions
-        long currentTime = System.currentTimeMillis();
-        if( ( currentTime - lastIdleCheckTime ) >= 1000 )
-        {
-            lastIdleCheckTime = currentTime;
-            Set keys = selector.keys();
-            if( keys != null )
-            {
-                for( Iterator it = keys.iterator(); it.hasNext(); )
-                {
-                    SelectionKey key = ( SelectionKey ) it.next();
-                    SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
-                    notifyIdleness( session, currentTime );
-                }
-            }
-        }
-    }
-
-    private void notifyIdleness( SocketSessionImpl session, long currentTime )
-    {
-        notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
-                IdleStatus.BOTH_IDLE,
-                Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
-        notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
-                IdleStatus.READER_IDLE,
-                Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
-        notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
-                IdleStatus.WRITER_IDLE,
-                Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
-
-        notifyWriteTimeout( session, currentTime, session
-                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
-    }
-
-    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
-                                    long idleTime, IdleStatus status,
-                                    long lastIoTime )
-    {
-        if( idleTime > 0 && lastIoTime != 0
-            && ( currentTime - lastIoTime ) >= idleTime )
-        {
-            session.increaseIdleCount( status );
-            ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status );
-        }
-    }
-
-    private void notifyWriteTimeout( SocketSessionImpl session,
-                                           long currentTime,
-                                           long writeTimeout, long lastIoTime )
-    {
-        SelectionKey key = session.getSelectionKey();
-        if( writeTimeout > 0
-            && ( currentTime - lastIoTime ) >= writeTimeout
-            && key != null && key.isValid()
-            && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
-        {
-            ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() );
-        }
-    }
-
-    private void doFlush()
-    {
-        if( flushingSessions.size() == 0 )
-            return;
-
-        for( ;; )
-        {
-            SocketSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( SocketSessionImpl ) flushingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            if( !session.isConnected() )
-            {
-                releaseWriteBuffers( session );
-                continue;
-            }
-            
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.write() is called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleFlush( session );
-                break;
-            }
-            // skip if channel is already closed
-            if( !key.isValid() )
-            {
-                continue;
-            }
-
-            try
-            {
-                doFlush( session );
-            }
-            catch( IOException e )
-            {
-                scheduleRemove( session );
-                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-            }
-        }
-    }
-    
-    private void releaseWriteBuffers( SocketSessionImpl session )
-    {
-        Queue writeRequestQueue = session.getWriteRequestQueue();
-        WriteRequest req;
-        
-        while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
-        {
-            try
-            {
-                ( ( ByteBuffer ) req.getMessage() ).release();
-            }
-            catch( IllegalStateException e )
-            {
-                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-            }
-            finally
-            {
-                req.getFuture().setWritten( false );
-            }
-        }
-    }
-
-    private void doFlush( SocketSessionImpl session ) throws IOException
-    {
-        // Clear OP_WRITE
-        SelectionKey key = session.getSelectionKey();
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
-
-        SocketChannel ch = session.getChannel();
-        Queue writeRequestQueue = session.getWriteRequestQueue();
-
-        WriteRequest req;
-        for( ;; )
-        {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
-
-            if( req == null )
-                break;
-
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
-            {
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-                
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() );
-                continue;
-            }
-
-            int writtenBytes = ch.write( buf.buf() );
-            if( writtenBytes > 0 )
-            {
-                session.increaseWrittenBytes( writtenBytes );
-            }
-
-            if( buf.hasRemaining() )
-            {
-                // Kernel buffer is full
-                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
-                break;
-            }
-        }
-    }
-
-    private void doUpdateTrafficMask() 
-    {
-        if( trafficControllingSessions.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            SocketSessionImpl session;
-
-            synchronized( trafficControllingSessions )
-            {
-                session = ( SocketSessionImpl ) trafficControllingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.suspend??() or session.resume??() is 
-            // called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleTrafficControl( session );
-                break;
-            }
-            // skip if channel is already closed
-            if( !key.isValid() )
-            {
-                continue;
-            }
-
-            // The normal is OP_READ and, if there are write requests in the
-            // session's write queue, set OP_WRITE to trigger flushing.
-            int ops = SelectionKey.OP_READ;
-            Queue writeRequestQueue = session.getWriteRequestQueue();
-            synchronized( writeRequestQueue )
-            {
-                if( !writeRequestQueue.isEmpty() )
-                {
-                    ops |= SelectionKey.OP_WRITE;
-                }
-            }
-
-            // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps( ops & mask );
-        }
-    }
-    
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( SocketIoProcessor.this.threadName );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select( 1000 );
-                    doAddNew();
-                    doUpdateTrafficMask();
-                    
-                    if( nKeys > 0 )
-                    {
-                        process( selector.selectedKeys() );
-                    }
-
-                    doFlush();
-                    doRemove();
-                    notifyIdleness();
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( SocketIoProcessor.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                newSessions.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    parent.getExceptionMonitor().exceptionCaught( parent, e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( Throwable t )
-                {
-                    parent.getExceptionMonitor().exceptionCaught( parent, t );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-    }
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.util.Queue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound.
+ * This class is used by MINA internally.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$,
+ */
+class SocketIoProcessor
+{
+    private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
+    private static final String THREAD_PREFIX = "SocketIoProcessor-";
+    private static final int DEFAULT_PROCESSORS = 1;
+    private static final int PROCESSOR_COUNT;
+    private static final SocketIoProcessor[] PROCESSORS;
+    
+    private static int nextId;
+    
+    static
+    {
+        PROCESSOR_COUNT = configureProcessorCount(); 
+        PROCESSORS = createProcessors();
+    }
+      
+    /**
+     * Returns the {@link SocketIoProcessor} to be used for a newly
+     * created session
+     * 
+     * @return  The processor to be employed
+     */
+    static synchronized SocketIoProcessor getInstance()
+    {
+        SocketIoProcessor processor = PROCESSORS[nextId++];
+        nextId %= PROCESSOR_COUNT;
+        return processor;
+    }
+      
+    private final String threadName;
+    private Selector selector;
+
+    private final Queue newSessions = new Queue();
+    private final Queue removingSessions = new Queue();
+    private final Queue flushingSessions = new Queue();
+    private final Queue trafficControllingSessions = new Queue();
+
+    private Worker worker;
+    private long lastIdleCheckTime = System.currentTimeMillis();
+
+    private SocketIoProcessor( String threadName )
+    {
+        this.threadName = threadName;
+    }
+
+    void addNew( SocketSessionImpl session ) throws IOException
+    {
+        synchronized( this )
+        {
+            synchronized( newSessions )
+            {
+                newSessions.push( session );
+            }
+            startupWorker();
+        }
+
+        selector.wakeup();
+    }
+
+    void remove( SocketSessionImpl session ) throws IOException
+    {
+        scheduleRemove( session );
+        startupWorker();
+        selector.wakeup();
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    void flush( SocketSessionImpl session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    void updateTrafficMask( SocketSessionImpl session )
+    {
+        scheduleTrafficControl( session );
+        selector.wakeup();
+    }
+
+    private void scheduleRemove( SocketSessionImpl session )
+    {
+        synchronized( removingSessions )
+        {
+            removingSessions.push( session );
+        }
+    }
+
+    private void scheduleFlush( SocketSessionImpl session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private void scheduleTrafficControl( SocketSessionImpl session )
+    {
+        synchronized( trafficControllingSessions )
+        {
+            trafficControllingSessions.push( session );
+        }
+    }
+
+    private void doAddNew()
+    {
+        if( newSessions.isEmpty() )
+            return;
+
+        SocketSessionImpl session;
+
+        for( ;; )
+        {
+            synchronized( newSessions )
+            {
+                session = ( SocketSessionImpl ) newSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            boolean registered;
+
+            try
+            {
+                ch.configureBlocking( false );
+                session.setSelectionKey( ch.register( selector,
+                                                      SelectionKey.OP_READ,
+                                                      session ) );
+                registered = true;
+            }
+            catch( IOException e )
+            {
+                registered = false;
+                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+            }
+
+            if( registered )
+            {
+                ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session );
+            }
+        }
+    }
+
+    private void doRemove()
+    {
+        if( removingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( removingSessions )
+            {
+                session = ( SocketSessionImpl ) removingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.close() is called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleRemove( session );
+                break;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            try
+            {
+                key.cancel();
+                ch.close();
+            }
+            catch( IOException e )
+            {
+                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+            }
+            finally
+            {
+                releaseWriteBuffers( session );
+
+                ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session );
+                session.getCloseFuture().setClosed();
+            }
+        }
+    }
+
+    private void process( Set selectedKeys )
+    {
+        Iterator it = selectedKeys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+
+            if( key.isReadable() && session.getTrafficMask().isReadable() )
+            {
+                read( session );
+            }
+
+            if( key.isWritable() && session.getTrafficMask().isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+
+        selectedKeys.clear();
+    }
+
+    private void read( SocketSessionImpl session )
+    {
+        ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() ); 
+        SocketChannel ch = session.getChannel();
+
+        try
+        {
+            int readBytes = 0;
+            int ret;
+
+            buf.clear();
+
+            try
+            {
+                while( ( ret = ch.read( buf.buf() ) ) > 0 )
+                {
+                    readBytes += ret;
+                }
+            }
+            finally
+            {
+                buf.flip();
+            }
+
+            session.increaseReadBytes( readBytes );
+
+            if( readBytes > 0 )
+            {
+                ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
+                newBuf.put( buf );
+                newBuf.flip();
+                ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
+            }
+            if( ret < 0 )
+            {
+                scheduleRemove( session );
+            }
+        }
+        catch( Throwable e )
+        {
+            if( e instanceof IOException )
+                scheduleRemove( session );
+            ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+        }
+        finally
+        {
+            buf.release();
+        }
+    }
+
+    private void notifyIdleness()
+    {
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+        if( ( currentTime - lastIdleCheckTime ) >= 1000 )
+        {
+            lastIdleCheckTime = currentTime;
+            Set keys = selector.keys();
+            if( keys != null )
+            {
+                for( Iterator it = keys.iterator(); it.hasNext(); )
+                {
+                    SelectionKey key = ( SelectionKey ) it.next();
+                    SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+                    notifyIdleness( session, currentTime );
+                }
+            }
+        }
+    }
+
+    private void notifyIdleness( SocketSessionImpl session, long currentTime )
+    {
+        notifyIdleness0(
+                session, currentTime,
+                session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+                IdleStatus.BOTH_IDLE,
+                Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+        notifyIdleness0(
+                session, currentTime,
+                session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+                IdleStatus.READER_IDLE,
+                Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+        notifyIdleness0(
+                session, currentTime,
+                session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+                IdleStatus.WRITER_IDLE,
+                Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+
+        notifyWriteTimeout( session, currentTime, session
+                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+    }
+
+    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
+                                    long idleTime, IdleStatus status,
+                                    long lastIoTime )
+    {
+        if( idleTime > 0 && lastIoTime != 0
+            && ( currentTime - lastIoTime ) >= idleTime )
+        {
+            session.increaseIdleCount( status );
+            ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status );
+        }
+    }
+
+    private void notifyWriteTimeout( SocketSessionImpl session,
+                                           long currentTime,
+                                           long writeTimeout, long lastIoTime )
+    {
+        SelectionKey key = session.getSelectionKey();
+        if( writeTimeout > 0
+            && ( currentTime - lastIoTime ) >= writeTimeout
+            && key != null && key.isValid()
+            && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+        {
+            ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() );
+        }
+    }
+
+    private void doFlush()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( SocketSessionImpl ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            if( !session.isConnected() )
+            {
+                releaseWriteBuffers( session );
+                continue;
+            }
+            
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.write() is called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleFlush( session );
+                break;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            try
+            {
+                doFlush( session );
+            }
+            catch( IOException e )
+            {
+                scheduleRemove( session );
+                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+            }
+        }
+    }
+    
+    private void releaseWriteBuffers( SocketSessionImpl session )
+    {
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequest req;
+        
+        while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
+        {
+            try
+            {
+                ( ( ByteBuffer ) req.getMessage() ).release();
+            }
+            catch( IllegalStateException e )
+            {
+                ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+            }
+            finally
+            {
+                req.getFuture().setWritten( false );
+            }
+        }
+    }
+
+    private void doFlush( SocketSessionImpl session ) throws IOException
+    {
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+
+        SocketChannel ch = session.getChannel();
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for( ;; )
+        {
+            synchronized( writeRequestQueue )
+            {
+                req = ( WriteRequest ) writeRequestQueue.first();
+            }
+
+            if( req == null )
+                break;
+
+            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+            if( buf.remaining() == 0 )
+            {
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+                
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() );
+                continue;
+            }
+
+            int writtenBytes = ch.write( buf.buf() );
+            if( writtenBytes > 0 )
+            {
+                session.increaseWrittenBytes( writtenBytes );
+            }
+
+            if( buf.hasRemaining() )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+                break;
+            }
+        }
+    }
+
+    private void doUpdateTrafficMask() 
+    {
+        if( trafficControllingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSessionImpl session;
+
+            synchronized( trafficControllingSessions )
+            {
+                session = ( SocketSessionImpl ) trafficControllingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleTrafficControl( session );
+                break;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, set OP_WRITE to trigger flushing.
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( !writeRequestQueue.isEmpty() )
+                {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getTrafficMask().getInterestOps();
+            key.interestOps( ops & mask );
+        }
+    }
+    
+    /**
+     * Configures the number of processors employed.
+     * We first check for a system property "mina.IoProcessors". If this
+     * property is present and can be interpreted as an integer value greater 
+     * or equal to 1, this value is used as the number of processors.
+     * Otherwise a default of 1 processor is employed.
+     * 
+     * @return  The nubmer of processors to employ
+     */
+    private static int configureProcessorCount() 
+    {
+        int processors = DEFAULT_PROCESSORS;
+        String processorProperty = System.getProperty( PROCESSORS_PROPERTY );
+        if ( processorProperty != null ) 
+        {
+            try 
+            {
+                processors = Integer.parseInt( processorProperty );
+            } 
+            catch ( NumberFormatException e )
+            {
+                ExceptionMonitor.getInstance().exceptionCaught( e );
+            }
+            processors = Math.max( processors, 1 );
+        }
+
+        return processors;
+    }
+    
+    private static SocketIoProcessor[] createProcessors()
+    {
+        SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
+        for ( int i = 0; i < PROCESSOR_COUNT; i ++ )
+        {
+            processors[i] = new SocketIoProcessor( THREAD_PREFIX + i );
+        }
+        return processors;
+    }
+    
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( SocketIoProcessor.this.threadName );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select( 1000 );
+                    doAddNew();
+                    doUpdateTrafficMask();
+                    
+                    if( nKeys > 0 )
+                    {
+                        process( selector.selectedKeys() );
+                    }
+
+                    doFlush();
+                    doRemove();
+                    notifyIdleness();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketIoProcessor.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                newSessions.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( Throwable t )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( t );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+    
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java?rev=351884&r1=351883&r2=351884&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java Fri Dec  2 20:20:27 2005
@@ -1,253 +1,252 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionManager;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.transport.socket.nio.SocketSession;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoSession} for socket transport (TCP/IP).
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-class SocketSessionImpl extends BaseIoSession implements SocketSession
-{
-    private static final int DEFAULT_READ_BUFFER_SIZE = 1024;
-
-    private final IoSessionManager manager;
-    private final SocketIoProcessor ioProcessor;
-    private final SocketFilterChain filterChain;
-    private final SocketChannel ch;
-    private final Queue writeRequestQueue;
-    private final IoHandler handler;
-    private final SocketAddress remoteAddress;
-    private final SocketAddress localAddress;
-    private SelectionKey key;
-    private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
-
-    /**
-     * Creates a new instance.
-     */
-    public SocketSessionImpl(
-            IoSessionManager manager,
-            SocketIoProcessor ioProcessor,
-            SocketChannel ch, IoHandler defaultHandler )
-    {
-        this.manager = manager;
-        this.ioProcessor = ioProcessor;
-        this.filterChain = new SocketFilterChain( this );
-        this.ch = ch;
-        this.writeRequestQueue = new Queue();
-        this.handler = defaultHandler;
-        this.remoteAddress = ch.socket().getRemoteSocketAddress();
-        this.localAddress = ch.socket().getLocalSocketAddress();
-    }
-    
-    public IoSessionManager getManager()
-    {
-        return manager;
-    }
-    
-    SocketIoProcessor getIoProcessor()
-    {
-        return ioProcessor;
-    }
-    
-    public IoFilterChain getFilterChain()
-    {
-        return filterChain;
-    }
-
-    SocketChannel getChannel()
-    {
-        return ch;
-    }
-
-    SelectionKey getSelectionKey()
-    {
-        return key;
-    }
-
-    void setSelectionKey( SelectionKey key )
-    {
-        this.key = key;
-    }
-
-    public IoHandler getHandler()
-    {
-        return handler;
-    }
-    
-    protected void close0( CloseFuture closeFuture )
-    {
-        filterChain.filterClose( this, closeFuture );
-    }
-    
-    Queue getWriteRequestQueue()
-    {
-        return writeRequestQueue;
-    }
-
-    public int getScheduledWriteRequests()
-    {
-        synchronized( writeRequestQueue )
-        {
-            return writeRequestQueue.size();
-        }
-    }
-
-    protected void write0( WriteRequest writeRequest )
-    {
-        filterChain.filterWrite( this, writeRequest );
-    }
-
-    public TransportType getTransportType()
-    {
-        return TransportType.SOCKET;
-    }
-
-    public boolean isConnected()
-    {
-        return ch.isConnected();
-    }
-
-    public SocketAddress getRemoteAddress()
-    {
-        return remoteAddress;
-    }
-
-    public SocketAddress getLocalAddress()
-    {
-        return localAddress;
-    }
-
-    public boolean getKeepAlive() throws SocketException
-    {
-        return ch.socket().getKeepAlive();
-    }
-
-    public void setKeepAlive( boolean on ) throws SocketException
-    {
-        ch.socket().setKeepAlive( on );
-    }
-
-    public boolean getOOBInline() throws SocketException
-    {
-        return ch.socket().getOOBInline();
-    }
-
-    public void setOOBInline( boolean on ) throws SocketException
-    {
-        ch.socket().setOOBInline( on );
-    }
-
-    public boolean getReuseAddress() throws SocketException
-    {
-        return ch.socket().getReuseAddress();
-    }
-
-    public void setReuseAddress( boolean on ) throws SocketException
-    {
-        ch.socket().setReuseAddress( on );
-    }
-
-    public int getSoLinger() throws SocketException
-    {
-        return ch.socket().getSoLinger();
-    }
-
-    public void setSoLinger( boolean on, int linger ) throws SocketException
-    {
-        ch.socket().setSoLinger( on, linger );
-    }
-
-    public boolean getTcpNoDelay() throws SocketException
-    {
-        return ch.socket().getTcpNoDelay();
-    }
-
-    public void setTcpNoDelay( boolean on ) throws SocketException
-    {
-        ch.socket().setTcpNoDelay( on );
-    }
-
-    public int getTrafficClass() throws SocketException
-    {
-        return ch.socket().getTrafficClass();
-    }
-
-    public void setTrafficClass( int tc ) throws SocketException
-    {
-        ch.socket().setTrafficClass( tc );
-    }
-
-    public int getSendBufferSize() throws SocketException
-    {
-        return ch.socket().getSendBufferSize();
-    }
-
-    public void setSendBufferSize( int size ) throws SocketException
-    {
-        ch.socket().setSendBufferSize( size );
-    }
-
-    public int getReceiveBufferSize() throws SocketException
-    {
-        return ch.socket().getReceiveBufferSize();
-    }
-
-    public void setReceiveBufferSize( int size ) throws SocketException
-    {
-        ch.socket().setReceiveBufferSize( size );
-    }
-    
-    public int getSessionReceiveBufferSize()
-    {
-        return readBufferSize;
-    }
-    
-    public void setSessionReceiveBufferSize( int size )
-    {
-        if( size <= 0 )
-        {
-            throw new IllegalArgumentException( "Invalid session receive buffer size: " + size );
-        }
-        
-        this.readBufferSize = size;
-    }
-
-    protected void updateTrafficMask()
-    {
-        this.ioProcessor.updateTrafficMask( this );
-    }
-}
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManager;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.transport.socket.nio.SocketSession;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+class SocketSessionImpl extends BaseIoSession implements SocketSession
+{
+    private static final int DEFAULT_READ_BUFFER_SIZE = 1024;
+
+    private final IoSessionManager manager;
+    private final SocketIoProcessor ioProcessor;
+    private final SocketFilterChain filterChain;
+    private final SocketChannel ch;
+    private final Queue writeRequestQueue;
+    private final IoHandler handler;
+    private final SocketAddress remoteAddress;
+    private final SocketAddress localAddress;
+    private SelectionKey key;
+    private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+
+    /**
+     * Creates a new instance.
+     */
+    public SocketSessionImpl(
+            IoSessionManager manager,
+            SocketChannel ch, IoHandler defaultHandler )
+    {
+        this.manager = manager;
+        this.ioProcessor = SocketIoProcessor.getInstance();
+        this.filterChain = new SocketFilterChain( this );
+        this.ch = ch;
+        this.writeRequestQueue = new Queue();
+        this.handler = defaultHandler;
+        this.remoteAddress = ch.socket().getRemoteSocketAddress();
+        this.localAddress = ch.socket().getLocalSocketAddress();
+    }
+    
+    public IoSessionManager getManager()
+    {
+        return manager;
+    }
+    
+    SocketIoProcessor getIoProcessor()
+    {
+        return ioProcessor;
+    }
+    
+    public IoFilterChain getFilterChain()
+    {
+        return filterChain;
+    }
+
+    SocketChannel getChannel()
+    {
+        return ch;
+    }
+
+    SelectionKey getSelectionKey()
+    {
+        return key;
+    }
+
+    void setSelectionKey( SelectionKey key )
+    {
+        this.key = key;
+    }
+
+    public IoHandler getHandler()
+    {
+        return handler;
+    }
+    
+    protected void close0( CloseFuture closeFuture )
+    {
+        filterChain.filterClose( this, closeFuture );
+    }
+    
+    Queue getWriteRequestQueue()
+    {
+        return writeRequestQueue;
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        synchronized( writeRequestQueue )
+        {
+            return writeRequestQueue.size();
+        }
+    }
+
+    protected void write0( WriteRequest writeRequest )
+    {
+        filterChain.filterWrite( this, writeRequest );
+    }
+
+    public TransportType getTransportType()
+    {
+        return TransportType.SOCKET;
+    }
+
+    public boolean isConnected()
+    {
+        return ch.isConnected();
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return localAddress;
+    }
+
+    public boolean getKeepAlive() throws SocketException
+    {
+        return ch.socket().getKeepAlive();
+    }
+
+    public void setKeepAlive( boolean on ) throws SocketException
+    {
+        ch.socket().setKeepAlive( on );
+    }
+
+    public boolean getOOBInline() throws SocketException
+    {
+        return ch.socket().getOOBInline();
+    }
+
+    public void setOOBInline( boolean on ) throws SocketException
+    {
+        ch.socket().setOOBInline( on );
+    }
+
+    public boolean getReuseAddress() throws SocketException
+    {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress( boolean on ) throws SocketException
+    {
+        ch.socket().setReuseAddress( on );
+    }
+
+    public int getSoLinger() throws SocketException
+    {
+        return ch.socket().getSoLinger();
+    }
+
+    public void setSoLinger( boolean on, int linger ) throws SocketException
+    {
+        ch.socket().setSoLinger( on, linger );
+    }
+
+    public boolean getTcpNoDelay() throws SocketException
+    {
+        return ch.socket().getTcpNoDelay();
+    }
+
+    public void setTcpNoDelay( boolean on ) throws SocketException
+    {
+        ch.socket().setTcpNoDelay( on );
+    }
+
+    public int getTrafficClass() throws SocketException
+    {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass( int tc ) throws SocketException
+    {
+        ch.socket().setTrafficClass( tc );
+    }
+
+    public int getSendBufferSize() throws SocketException
+    {
+        return ch.socket().getSendBufferSize();
+    }
+
+    public void setSendBufferSize( int size ) throws SocketException
+    {
+        ch.socket().setSendBufferSize( size );
+    }
+
+    public int getReceiveBufferSize() throws SocketException
+    {
+        return ch.socket().getReceiveBufferSize();
+    }
+
+    public void setReceiveBufferSize( int size ) throws SocketException
+    {
+        ch.socket().setReceiveBufferSize( size );
+    }
+    
+    public int getSessionReceiveBufferSize()
+    {
+        return readBufferSize;
+    }
+    
+    public void setSessionReceiveBufferSize( int size )
+    {
+        if( size <= 0 )
+        {
+            throw new IllegalArgumentException( "Invalid session receive buffer size: " + size );
+        }
+        
+        this.readBufferSize = size;
+    }
+
+    protected void updateTrafficMask()
+    {
+        this.ioProcessor.updateTrafficMask( this );
+    }
+}

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=351884&r1=351883&r2=351884&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Dec  2 20:20:27 2005
@@ -7,6 +7,7 @@
 import java.net.SocketAddress;
 
 import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoFilterChainBuilder;
 import org.apache.mina.common.IoHandler;
@@ -64,7 +65,7 @@
         }
         catch( Throwable t )
         {
-            remoteEntry.getAcceptor().getExceptionMonitor().exceptionCaught( remoteEntry.getAcceptor(), t );
+            ExceptionMonitor.getInstance().exceptionCaught( t );
             IOException e = new IOException( "Failed to initialize remote session." );
             e.initCause( t );
             throw e;

Modified: directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java?rev=351884&r1=351883&r2=351884&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java Fri Dec  2 20:20:27 2005
@@ -21,7 +21,6 @@
 import junit.framework.TestCase;
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoSessionManager;
 import org.apache.mina.integration.spring.IoFilterMapping;
@@ -187,22 +186,18 @@
                 .getMock();
         DefaultIoFilterChainBuilder ioFilterChainBuilder = ( DefaultIoFilterChainBuilder ) MockClassControl
                 .createControl( DefaultIoFilterChainBuilder.class ).getMock();
-        ExceptionMonitor exceptionMonitor = ( ExceptionMonitor ) MockControl
-                .createControl( ExceptionMonitor.class ).getMock();
 
         /*
          * Record expectations.
          */
         ioSessionManager.getFilterChain();
         mockIoSessionManager.setReturnValue( ioFilterChainBuilder );
-        ioSessionManager.setExceptionMonitor( exceptionMonitor );
 
         /*
          * Replay.
          */
         mockIoSessionManager.replay();
 
-        factory.setExceptionMonitor( exceptionMonitor );
         factory.initIoSessionManager( ioSessionManager );
 
         /*
@@ -286,22 +281,4 @@
         {
         }
     }
-
-    /**
-     * Tests that
-     * {@link AbstractIoSessionManagerFactoryBean#setExceptionMonitor(ExceptionMonitor)}
-     * validates the method arguments.
-     */
-    public void testSetExceptionMonitor()
-    {
-        try
-        {
-            factory.setExceptionMonitor( null );
-            fail( "null exceptionMonitor set. IllegalArgumentException expected." );
-        }
-        catch( IllegalArgumentException iae )
-        {
-        }
-    }
-
 }



Mime
View raw message