From commits-return-7131-apmail-directory-commits-archive=directory.apache.org@directory.apache.org Sat Dec 03 04:21:27 2005 Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 29315 invoked from network); 3 Dec 2005 04:21:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 3 Dec 2005 04:21:26 -0000 Received: (qmail 56131 invoked by uid 500); 3 Dec 2005 04:21:25 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 56058 invoked by uid 500); 3 Dec 2005 04:21:25 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 56005 invoked by uid 99); 3 Dec 2005 04:21:24 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2005 20:21:23 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 02 Dec 2005 20:22:49 -0800 Received: (qmail 28605 invoked by uid 65534); 3 Dec 2005 04:20:59 -0000 Message-ID: <20051203042059.28604.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r351884 [3/3] - in /directory/network/trunk/src: java/org/apache/mina/common/ java/org/apache/mina/common/support/ java/org/apache/mina/integration/spring/support/ java/org/apache/mina/transport/socket/nio/ java/org/apache/mina/transport/so... Date: Sat, 03 Dec 2005 04:20:55 -0000 To: commits@directory.apache.org From: trustin@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java Fri Dec 2 20:20:27 2005 @@ -1,378 +1,378 @@ -/* - * @(#) $Id$ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.transport.socket.nio.support; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.transport.socket.nio.SocketSessionManager; -import org.apache.mina.util.ExceptionUtil; -import org.apache.mina.util.Queue; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev$, $Date$ - */ -public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager -{ - private static volatile int nextId = 0; - - private final IoConnector wrapper; - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - private Selector selector; - private final Queue connectQueue = new Queue(); - private Worker worker; - private final SocketIoProcessor ioProcessor = new SocketIoProcessor( this, threadName + "-1" ); - - /** - * Creates a new instance. - */ - public SocketConnectorDelegate( IoConnector wrapper ) - { - this.wrapper = wrapper; - } - - public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - return connect( address, null, handler, filterChainBuilder ); - } - - public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - if( address == null ) - throw new NullPointerException( "address" ); - if( handler == null ) - throw new NullPointerException( "handler" ); - - if( ! ( address instanceof InetSocketAddress ) ) - throw new IllegalArgumentException( "Unexpected address type: " - + address.getClass() ); - - if( localAddress != null && !( localAddress instanceof InetSocketAddress ) ) - throw new IllegalArgumentException( "Unexpected local address type: " - + localAddress.getClass() ); - - if( filterChainBuilder == null ) - { - filterChainBuilder = IoFilterChainBuilder.NOOP; - } - - SocketChannel ch = null; - boolean success = false; - try - { - ch = SocketChannel.open(); - ch.socket().setReuseAddress( true ); - if( localAddress != null ) - { - ch.socket().bind( localAddress ); - } - - ch.configureBlocking( false ); - - if( ch.connect( address ) ) - { - SocketSessionImpl session = newSession( ch, handler, filterChainBuilder ); - success = true; - ConnectFuture future = new ConnectFuture(); - future.setSession( session ); - return future; - } - - success = true; - } - catch( IOException e ) - { - return ConnectFuture.newFailedFuture( e ); - } - finally - { - if( !success && ch != null ) - { - try - { - ch.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - } - } - - ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder ); - synchronized( this ) - { - try - { - startupWorker(); - } - catch( IOException e ) - { - try - { - ch.close(); - } - catch( IOException e2 ) - { - exceptionMonitor.exceptionCaught( this, e2 ); - } - - return ConnectFuture.newFailedFuture( e ); - } - synchronized( connectQueue ) - { - connectQueue.push( request ); - } - selector.wakeup(); - } - - return request; - } - - private synchronized void startupWorker() throws IOException - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - worker.start(); - } - } - - private void registerNew() - { - if( connectQueue.isEmpty() ) - return; - - for( ;; ) - { - ConnectionRequest req; - synchronized( connectQueue ) - { - req = ( ConnectionRequest ) connectQueue.pop(); - } - - if( req == null ) - break; - - SocketChannel ch = req.channel; - try - { - ch.register( selector, SelectionKey.OP_CONNECT, req ); - } - catch( IOException e ) - { - req.setException( e ); - } - } - } - - private void processSessions( Set keys ) - { - Iterator it = keys.iterator(); - - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - - if( !key.isConnectable() ) - continue; - - SocketChannel ch = ( SocketChannel ) key.channel(); - ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder ); - entry.setSession( session ); - success = true; - } - catch( Throwable e ) - { - entry.setException( e ); - } - finally - { - key.cancel(); - if( !success ) - { - try - { - ch.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions( Set keys ) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - - if( !key.isValid() ) - continue; - - ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); - - if( currentTime >= entry.deadline ) - { - entry.setException( new ConnectException() ); - key.cancel(); - } - } - } - - private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException - { - SocketSessionImpl session = new SocketSessionImpl( wrapper, ioProcessor, ch, handler ); - try - { - this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); - filterChainBuilder.buildFilterChain( session.getFilterChain() ); - ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session ); - } - catch( Throwable e ) - { - ExceptionUtil.throwException( e ); - } - session.getIoProcessor().addNew( session ); - return session; - } - - public int getProcessors() - { - throw new UnsupportedOperationException(); - } - - public void setProcessors( int nProcessor ) - { - throw new UnsupportedOperationException(); - } - - private class Worker extends Thread - { - public Worker() - { - super( SocketConnectorDelegate.this.threadName ); - } - - public void run() - { - for( ;; ) - { - try - { - int nKeys = selector.select( 1000 ); - - registerNew(); - - if( nKeys > 0 ) - { - processSessions( selector.selectedKeys() ); - } - - processTimedOutSessions( selector.keys() ); - - if( selector.keys().isEmpty() ) - { - synchronized( SocketConnectorDelegate.this ) - { - if( selector.keys().isEmpty() && - connectQueue.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - } - } - } - } - } - - private static class ConnectionRequest extends ConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoFilterChainBuilder filterChainBuilder; - - private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - this.channel = channel; - this.deadline = System.currentTimeMillis() + timeout * 1000L; - this.handler = handler; - this.filterChainBuilder = filterChainBuilder; - } - } +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio.support; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChainBuilder; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.transport.socket.nio.SocketSessionManager; +import org.apache.mina.util.ExceptionUtil; +import org.apache.mina.util.Queue; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager +{ + private static volatile int nextId = 0; + + private final IoConnector wrapper; + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private Selector selector; + private final Queue connectQueue = new Queue(); + private Worker worker; + + /** + * Creates a new instance. + */ + public SocketConnectorDelegate( IoConnector wrapper ) + { + this.wrapper = wrapper; + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + return connect( address, null, handler, filterChainBuilder ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + + if( ! ( address instanceof InetSocketAddress ) ) + throw new IllegalArgumentException( "Unexpected address type: " + + address.getClass() ); + + if( localAddress != null && !( localAddress instanceof InetSocketAddress ) ) + throw new IllegalArgumentException( "Unexpected local address type: " + + localAddress.getClass() ); + + if( filterChainBuilder == null ) + { + filterChainBuilder = IoFilterChainBuilder.NOOP; + } + + SocketChannel ch = null; + boolean success = false; + try + { + ch = SocketChannel.open(); + ch.socket().setReuseAddress( true ); + if( localAddress != null ) + { + ch.socket().bind( localAddress ); + } + + ch.configureBlocking( false ); + + if( ch.connect( address ) ) + { + SocketSessionImpl session = newSession( ch, handler, filterChainBuilder ); + success = true; + ConnectFuture future = new ConnectFuture(); + future.setSession( session ); + return future; + } + + success = true; + } + catch( IOException e ) + { + return ConnectFuture.newFailedFuture( e ); + } + finally + { + if( !success && ch != null ) + { + try + { + ch.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder ); + synchronized( this ) + { + try + { + startupWorker(); + } + catch( IOException e ) + { + try + { + ch.close(); + } + catch( IOException e2 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e2 ); + } + + return ConnectFuture.newFailedFuture( e ); + } + synchronized( connectQueue ) + { + connectQueue.push( request ); + } + selector.wakeup(); + } + + return request; + } + + private synchronized void startupWorker() throws IOException + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + worker.start(); + } + } + + private void registerNew() + { + if( connectQueue.isEmpty() ) + return; + + for( ;; ) + { + ConnectionRequest req; + synchronized( connectQueue ) + { + req = ( ConnectionRequest ) connectQueue.pop(); + } + + if( req == null ) + break; + + SocketChannel ch = req.channel; + try + { + ch.register( selector, SelectionKey.OP_CONNECT, req ); + } + catch( IOException e ) + { + req.setException( e ); + } + } + } + + private void processSessions( Set keys ) + { + Iterator it = keys.iterator(); + + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + if( !key.isConnectable() ) + continue; + + SocketChannel ch = ( SocketChannel ) key.channel(); + ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder ); + entry.setSession( session ); + success = true; + } + catch( Throwable e ) + { + entry.setException( e ); + } + finally + { + key.cancel(); + if( !success ) + { + try + { + ch.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions( Set keys ) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + if( !key.isValid() ) + continue; + + ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); + + if( currentTime >= entry.deadline ) + { + entry.setException( new ConnectException() ); + key.cancel(); + } + } + } + + private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException + { + SocketSessionImpl session = new SocketSessionImpl( wrapper, ch, handler ); + try + { + this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); + filterChainBuilder.buildFilterChain( session.getFilterChain() ); + ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session ); + } + catch( Throwable e ) + { + ExceptionUtil.throwException( e ); + } + session.getIoProcessor().addNew( session ); + return session; + } + + public int getProcessors() + { + throw new UnsupportedOperationException(); + } + + public void setProcessors( int nProcessor ) + { + throw new UnsupportedOperationException(); + } + + private class Worker extends Thread + { + public Worker() + { + super( SocketConnectorDelegate.this.threadName ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select( 1000 ); + + registerNew(); + + if( nKeys > 0 ) + { + processSessions( selector.selectedKeys() ); + } + + processTimedOutSessions( selector.keys() ); + + if( selector.keys().isEmpty() ) + { + synchronized( SocketConnectorDelegate.this ) + { + if( selector.keys().isEmpty() && + connectQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + } + + private static class ConnectionRequest extends ConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoFilterChainBuilder filterChainBuilder; + + private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + this.channel = channel; + this.deadline = System.currentTimeMillis() + timeout * 1000L; + this.handler = handler; + this.filterChainBuilder = filterChainBuilder; + } + } } Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Fri Dec 2 20:20:27 2005 @@ -1,599 +1,663 @@ -/* - * @(#) $Id$ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.transport.socket.nio.support; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSessionManager; -import org.apache.mina.common.WriteTimeoutException; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.util.Queue; - -/** - * Performs all I/O operations for sockets which is connected or bound. - * This class is used by MINA internally. - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev$, $Date$, - */ -class SocketIoProcessor -{ - private final IoSessionManager parent; - private final String threadName; - private Selector selector; - - private final Queue newSessions = new Queue(); - private final Queue removingSessions = new Queue(); - private final Queue flushingSessions = new Queue(); - private final Queue trafficControllingSessions = new Queue(); - - private Worker worker; - - private long lastIdleCheckTime = System.currentTimeMillis(); - - SocketIoProcessor( IoSessionManager parent, String threadName ) - { - this.parent = parent; - this.threadName = threadName; - } - - void addNew( SocketSessionImpl session ) throws IOException - { - synchronized( this ) - { - synchronized( newSessions ) - { - newSessions.push( session ); - } - startupWorker(); - } - - selector.wakeup(); - } - - void remove( SocketSessionImpl session ) throws IOException - { - scheduleRemove( session ); - startupWorker(); - selector.wakeup(); - } - - private synchronized void startupWorker() throws IOException - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - worker.start(); - } - } - - void flush( SocketSessionImpl session ) - { - scheduleFlush( session ); - selector.wakeup(); - } - - void updateTrafficMask( SocketSessionImpl session ) - { - scheduleTrafficControl( session ); - selector.wakeup(); - } - - private void scheduleRemove( SocketSessionImpl session ) - { - synchronized( removingSessions ) - { - removingSessions.push( session ); - } - } - - private void scheduleFlush( SocketSessionImpl session ) - { - synchronized( flushingSessions ) - { - flushingSessions.push( session ); - } - } - - private void scheduleTrafficControl( SocketSessionImpl session ) - { - synchronized( trafficControllingSessions ) - { - trafficControllingSessions.push( session ); - } - } - - private void doAddNew() - { - if( newSessions.isEmpty() ) - return; - - SocketSessionImpl session; - - for( ;; ) - { - synchronized( newSessions ) - { - session = ( SocketSessionImpl ) newSessions.pop(); - } - - if( session == null ) - break; - - SocketChannel ch = session.getChannel(); - boolean registered; - - try - { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - registered = true; - } - catch( IOException e ) - { - registered = false; - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - - if( registered ) - { - ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session ); - } - } - } - - private void doRemove() - { - if( removingSessions.isEmpty() ) - return; - - for( ;; ) - { - SocketSessionImpl session; - - synchronized( removingSessions ) - { - session = ( SocketSessionImpl ) removingSessions.pop(); - } - - if( session == null ) - break; - - SocketChannel ch = session.getChannel(); - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.close() is called before addSession() is processed) - if( key == null ) - { - scheduleRemove( session ); - break; - } - // skip if channel is already closed - if( !key.isValid() ) - { - continue; - } - - try - { - key.cancel(); - ch.close(); - } - catch( IOException e ) - { - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - finally - { - releaseWriteBuffers( session ); - - ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session ); - session.getCloseFuture().setClosed(); - } - } - } - - private void process( Set selectedKeys ) - { - Iterator it = selectedKeys.iterator(); - - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); - - if( key.isReadable() && session.getTrafficMask().isReadable() ) - { - read( session ); - } - - if( key.isWritable() && session.getTrafficMask().isWritable() ) - { - scheduleFlush( session ); - } - } - - selectedKeys.clear(); - } - - private void read( SocketSessionImpl session ) - { - ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() ); - SocketChannel ch = session.getChannel(); - - try - { - int readBytes = 0; - int ret; - - buf.clear(); - - try - { - while( ( ret = ch.read( buf.buf() ) ) > 0 ) - { - readBytes += ret; - } - } - finally - { - buf.flip(); - } - - session.increaseReadBytes( readBytes ); - - if( readBytes > 0 ) - { - ByteBuffer newBuf = ByteBuffer.allocate( readBytes ); - newBuf.put( buf ); - newBuf.flip(); - ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf ); - } - if( ret < 0 ) - { - scheduleRemove( session ); - } - } - catch( Throwable e ) - { - if( e instanceof IOException ) - scheduleRemove( session ); - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - finally - { - buf.release(); - } - } - - private void notifyIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if( ( currentTime - lastIdleCheckTime ) >= 1000 ) - { - lastIdleCheckTime = currentTime; - Set keys = selector.keys(); - if( keys != null ) - { - for( Iterator it = keys.iterator(); it.hasNext(); ) - { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); - notifyIdleness( session, currentTime ); - } - } - } - } - - private void notifyIdleness( SocketSessionImpl session, long currentTime ) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); - - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); - } - - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) - { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) - { - session.increaseIdleCount( status ); - ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status ); - } - } - - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) - { - SelectionKey key = session.getSelectionKey(); - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) - { - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() ); - } - } - - private void doFlush() - { - if( flushingSessions.size() == 0 ) - return; - - for( ;; ) - { - SocketSessionImpl session; - - synchronized( flushingSessions ) - { - session = ( SocketSessionImpl ) flushingSessions.pop(); - } - - if( session == null ) - break; - - if( !session.isConnected() ) - { - releaseWriteBuffers( session ); - continue; - } - - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.write() is called before addSession() is processed) - if( key == null ) - { - scheduleFlush( session ); - break; - } - // skip if channel is already closed - if( !key.isValid() ) - { - continue; - } - - try - { - doFlush( session ); - } - catch( IOException e ) - { - scheduleRemove( session ); - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - } - } - - private void releaseWriteBuffers( SocketSessionImpl session ) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - WriteRequest req; - - while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null ) - { - try - { - ( ( ByteBuffer ) req.getMessage() ).release(); - } - catch( IllegalStateException e ) - { - ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - finally - { - req.getFuture().setWritten( false ); - } - } - } - - private void doFlush( SocketSessionImpl session ) throws IOException - { - // Clear OP_WRITE - SelectionKey key = session.getSelectionKey(); - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); - - SocketChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); - - WriteRequest req; - for( ;; ) - { - synchronized( writeRequestQueue ) - { - req = ( WriteRequest ) writeRequestQueue.first(); - } - - if( req == null ) - break; - - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) - { - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } - - req.getFuture().setWritten( true ); - session.increaseWrittenWriteRequests(); - ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() ); - continue; - } - - int writtenBytes = ch.write( buf.buf() ); - if( writtenBytes > 0 ) - { - session.increaseWrittenBytes( writtenBytes ); - } - - if( buf.hasRemaining() ) - { - // Kernel buffer is full - key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); - break; - } - } - } - - private void doUpdateTrafficMask() - { - if( trafficControllingSessions.isEmpty() ) - return; - - for( ;; ) - { - SocketSessionImpl session; - - synchronized( trafficControllingSessions ) - { - session = ( SocketSessionImpl ) trafficControllingSessions.pop(); - } - - if( session == null ) - break; - - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - if( key == null ) - { - scheduleTrafficControl( session ); - break; - } - // skip if channel is already closed - if( !key.isValid() ) - { - continue; - } - - // The normal is OP_READ and, if there are write requests in the - // session's write queue, set OP_WRITE to trigger flushing. - int ops = SelectionKey.OP_READ; - Queue writeRequestQueue = session.getWriteRequestQueue(); - synchronized( writeRequestQueue ) - { - if( !writeRequestQueue.isEmpty() ) - { - ops |= SelectionKey.OP_WRITE; - } - } - - // Now mask the preferred ops with the mask of the current session - int mask = session.getTrafficMask().getInterestOps(); - key.interestOps( ops & mask ); - } - } - - private class Worker extends Thread - { - public Worker() - { - super( SocketIoProcessor.this.threadName ); - } - - public void run() - { - for( ;; ) - { - try - { - int nKeys = selector.select( 1000 ); - doAddNew(); - doUpdateTrafficMask(); - - if( nKeys > 0 ) - { - process( selector.selectedKeys() ); - } - - doFlush(); - doRemove(); - notifyIdleness(); - - if( selector.keys().isEmpty() ) - { - synchronized( SocketIoProcessor.this ) - { - if( selector.keys().isEmpty() && - newSessions.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - parent.getExceptionMonitor().exceptionCaught( parent, e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( Throwable t ) - { - parent.getExceptionMonitor().exceptionCaught( parent, t ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - } - } - } - } - } +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio.support; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.WriteTimeoutException; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.util.Queue; + +/** + * Performs all I/O operations for sockets which is connected or bound. + * This class is used by MINA internally. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$, + */ +class SocketIoProcessor +{ + private static final String PROCESSORS_PROPERTY = "mina.socket.processors"; + private static final String THREAD_PREFIX = "SocketIoProcessor-"; + private static final int DEFAULT_PROCESSORS = 1; + private static final int PROCESSOR_COUNT; + private static final SocketIoProcessor[] PROCESSORS; + + private static int nextId; + + static + { + PROCESSOR_COUNT = configureProcessorCount(); + PROCESSORS = createProcessors(); + } + + /** + * Returns the {@link SocketIoProcessor} to be used for a newly + * created session + * + * @return The processor to be employed + */ + static synchronized SocketIoProcessor getInstance() + { + SocketIoProcessor processor = PROCESSORS[nextId++]; + nextId %= PROCESSOR_COUNT; + return processor; + } + + private final String threadName; + private Selector selector; + + private final Queue newSessions = new Queue(); + private final Queue removingSessions = new Queue(); + private final Queue flushingSessions = new Queue(); + private final Queue trafficControllingSessions = new Queue(); + + private Worker worker; + private long lastIdleCheckTime = System.currentTimeMillis(); + + private SocketIoProcessor( String threadName ) + { + this.threadName = threadName; + } + + void addNew( SocketSessionImpl session ) throws IOException + { + synchronized( this ) + { + synchronized( newSessions ) + { + newSessions.push( session ); + } + startupWorker(); + } + + selector.wakeup(); + } + + void remove( SocketSessionImpl session ) throws IOException + { + scheduleRemove( session ); + startupWorker(); + selector.wakeup(); + } + + private synchronized void startupWorker() throws IOException + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + worker.start(); + } + } + + void flush( SocketSessionImpl session ) + { + scheduleFlush( session ); + selector.wakeup(); + } + + void updateTrafficMask( SocketSessionImpl session ) + { + scheduleTrafficControl( session ); + selector.wakeup(); + } + + private void scheduleRemove( SocketSessionImpl session ) + { + synchronized( removingSessions ) + { + removingSessions.push( session ); + } + } + + private void scheduleFlush( SocketSessionImpl session ) + { + synchronized( flushingSessions ) + { + flushingSessions.push( session ); + } + } + + private void scheduleTrafficControl( SocketSessionImpl session ) + { + synchronized( trafficControllingSessions ) + { + trafficControllingSessions.push( session ); + } + } + + private void doAddNew() + { + if( newSessions.isEmpty() ) + return; + + SocketSessionImpl session; + + for( ;; ) + { + synchronized( newSessions ) + { + session = ( SocketSessionImpl ) newSessions.pop(); + } + + if( session == null ) + break; + + SocketChannel ch = session.getChannel(); + boolean registered; + + try + { + ch.configureBlocking( false ); + session.setSelectionKey( ch.register( selector, + SelectionKey.OP_READ, + session ) ); + registered = true; + } + catch( IOException e ) + { + registered = false; + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + + if( registered ) + { + ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session ); + } + } + } + + private void doRemove() + { + if( removingSessions.isEmpty() ) + return; + + for( ;; ) + { + SocketSessionImpl session; + + synchronized( removingSessions ) + { + session = ( SocketSessionImpl ) removingSessions.pop(); + } + + if( session == null ) + break; + + SocketChannel ch = session.getChannel(); + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.close() is called before addSession() is processed) + if( key == null ) + { + scheduleRemove( session ); + break; + } + // skip if channel is already closed + if( !key.isValid() ) + { + continue; + } + + try + { + key.cancel(); + ch.close(); + } + catch( IOException e ) + { + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + finally + { + releaseWriteBuffers( session ); + + ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session ); + session.getCloseFuture().setClosed(); + } + } + } + + private void process( Set selectedKeys ) + { + Iterator it = selectedKeys.iterator(); + + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + + if( key.isReadable() && session.getTrafficMask().isReadable() ) + { + read( session ); + } + + if( key.isWritable() && session.getTrafficMask().isWritable() ) + { + scheduleFlush( session ); + } + } + + selectedKeys.clear(); + } + + private void read( SocketSessionImpl session ) + { + ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() ); + SocketChannel ch = session.getChannel(); + + try + { + int readBytes = 0; + int ret; + + buf.clear(); + + try + { + while( ( ret = ch.read( buf.buf() ) ) > 0 ) + { + readBytes += ret; + } + } + finally + { + buf.flip(); + } + + session.increaseReadBytes( readBytes ); + + if( readBytes > 0 ) + { + ByteBuffer newBuf = ByteBuffer.allocate( readBytes ); + newBuf.put( buf ); + newBuf.flip(); + ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf ); + } + if( ret < 0 ) + { + scheduleRemove( session ); + } + } + catch( Throwable e ) + { + if( e instanceof IOException ) + scheduleRemove( session ); + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + finally + { + buf.release(); + } + } + + private void notifyIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if( ( currentTime - lastIdleCheckTime ) >= 1000 ) + { + lastIdleCheckTime = currentTime; + Set keys = selector.keys(); + if( keys != null ) + { + for( Iterator it = keys.iterator(); it.hasNext(); ) + { + SelectionKey key = ( SelectionKey ) it.next(); + SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + notifyIdleness( session, currentTime ); + } + } + } + } + + private void notifyIdleness( SocketSessionImpl session, long currentTime ) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), + IdleStatus.BOTH_IDLE, + Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), + IdleStatus.READER_IDLE, + Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), + IdleStatus.WRITER_IDLE, + Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + + notifyWriteTimeout( session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + } + + private void notifyIdleness0( SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime ) + { + if( idleTime > 0 && lastIoTime != 0 + && ( currentTime - lastIoTime ) >= idleTime ) + { + session.increaseIdleCount( status ); + ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status ); + } + } + + private void notifyWriteTimeout( SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime ) + { + SelectionKey key = session.getSelectionKey(); + if( writeTimeout > 0 + && ( currentTime - lastIoTime ) >= writeTimeout + && key != null && key.isValid() + && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + { + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() ); + } + } + + private void doFlush() + { + if( flushingSessions.size() == 0 ) + return; + + for( ;; ) + { + SocketSessionImpl session; + + synchronized( flushingSessions ) + { + session = ( SocketSessionImpl ) flushingSessions.pop(); + } + + if( session == null ) + break; + + if( !session.isConnected() ) + { + releaseWriteBuffers( session ); + continue; + } + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + if( key == null ) + { + scheduleFlush( session ); + break; + } + // skip if channel is already closed + if( !key.isValid() ) + { + continue; + } + + try + { + doFlush( session ); + } + catch( IOException e ) + { + scheduleRemove( session ); + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + } + } + + private void releaseWriteBuffers( SocketSessionImpl session ) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + WriteRequest req; + + while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null ) + { + try + { + ( ( ByteBuffer ) req.getMessage() ).release(); + } + catch( IllegalStateException e ) + { + ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + finally + { + req.getFuture().setWritten( false ); + } + } + } + + private void doFlush( SocketSessionImpl session ) throws IOException + { + // Clear OP_WRITE + SelectionKey key = session.getSelectionKey(); + key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + + SocketChannel ch = session.getChannel(); + Queue writeRequestQueue = session.getWriteRequestQueue(); + + WriteRequest req; + for( ;; ) + { + synchronized( writeRequestQueue ) + { + req = ( WriteRequest ) writeRequestQueue.first(); + } + + if( req == null ) + break; + + ByteBuffer buf = ( ByteBuffer ) req.getMessage(); + if( buf.remaining() == 0 ) + { + synchronized( writeRequestQueue ) + { + writeRequestQueue.pop(); + } + + req.getFuture().setWritten( true ); + session.increaseWrittenWriteRequests(); + ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() ); + continue; + } + + int writtenBytes = ch.write( buf.buf() ); + if( writtenBytes > 0 ) + { + session.increaseWrittenBytes( writtenBytes ); + } + + if( buf.hasRemaining() ) + { + // Kernel buffer is full + key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); + break; + } + } + } + + private void doUpdateTrafficMask() + { + if( trafficControllingSessions.isEmpty() ) + return; + + for( ;; ) + { + SocketSessionImpl session; + + synchronized( trafficControllingSessions ) + { + session = ( SocketSessionImpl ) trafficControllingSessions.pop(); + } + + if( session == null ) + break; + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if( key == null ) + { + scheduleTrafficControl( session ); + break; + } + // skip if channel is already closed + if( !key.isValid() ) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + int ops = SelectionKey.OP_READ; + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized( writeRequestQueue ) + { + if( !writeRequestQueue.isEmpty() ) + { + ops |= SelectionKey.OP_WRITE; + } + } + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + key.interestOps( ops & mask ); + } + } + + /** + * Configures the number of processors employed. + * We first check for a system property "mina.IoProcessors". If this + * property is present and can be interpreted as an integer value greater + * or equal to 1, this value is used as the number of processors. + * Otherwise a default of 1 processor is employed. + * + * @return The nubmer of processors to employ + */ + private static int configureProcessorCount() + { + int processors = DEFAULT_PROCESSORS; + String processorProperty = System.getProperty( PROCESSORS_PROPERTY ); + if ( processorProperty != null ) + { + try + { + processors = Integer.parseInt( processorProperty ); + } + catch ( NumberFormatException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + processors = Math.max( processors, 1 ); + } + + return processors; + } + + private static SocketIoProcessor[] createProcessors() + { + SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ]; + for ( int i = 0; i < PROCESSOR_COUNT; i ++ ) + { + processors[i] = new SocketIoProcessor( THREAD_PREFIX + i ); + } + return processors; + } + + private class Worker extends Thread + { + public Worker() + { + super( SocketIoProcessor.this.threadName ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select( 1000 ); + doAddNew(); + doUpdateTrafficMask(); + + if( nKeys > 0 ) + { + process( selector.selectedKeys() ); + } + + doFlush(); + doRemove(); + notifyIdleness(); + + if( selector.keys().isEmpty() ) + { + synchronized( SocketIoProcessor.this ) + { + if( selector.keys().isEmpty() && + newSessions.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + } + } Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java Fri Dec 2 20:20:27 2005 @@ -1,253 +1,252 @@ -/* - * @(#) $Id$ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.transport.socket.nio.support; - -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; - -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoSessionManager; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.support.BaseIoSession; -import org.apache.mina.transport.socket.nio.SocketSession; -import org.apache.mina.util.Queue; - -/** - * An {@link IoSession} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev$, $Date$ - */ -class SocketSessionImpl extends BaseIoSession implements SocketSession -{ - private static final int DEFAULT_READ_BUFFER_SIZE = 1024; - - private final IoSessionManager manager; - private final SocketIoProcessor ioProcessor; - private final SocketFilterChain filterChain; - private final SocketChannel ch; - private final Queue writeRequestQueue; - private final IoHandler handler; - private final SocketAddress remoteAddress; - private final SocketAddress localAddress; - private SelectionKey key; - private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; - - /** - * Creates a new instance. - */ - public SocketSessionImpl( - IoSessionManager manager, - SocketIoProcessor ioProcessor, - SocketChannel ch, IoHandler defaultHandler ) - { - this.manager = manager; - this.ioProcessor = ioProcessor; - this.filterChain = new SocketFilterChain( this ); - this.ch = ch; - this.writeRequestQueue = new Queue(); - this.handler = defaultHandler; - this.remoteAddress = ch.socket().getRemoteSocketAddress(); - this.localAddress = ch.socket().getLocalSocketAddress(); - } - - public IoSessionManager getManager() - { - return manager; - } - - SocketIoProcessor getIoProcessor() - { - return ioProcessor; - } - - public IoFilterChain getFilterChain() - { - return filterChain; - } - - SocketChannel getChannel() - { - return ch; - } - - SelectionKey getSelectionKey() - { - return key; - } - - void setSelectionKey( SelectionKey key ) - { - this.key = key; - } - - public IoHandler getHandler() - { - return handler; - } - - protected void close0( CloseFuture closeFuture ) - { - filterChain.filterClose( this, closeFuture ); - } - - Queue getWriteRequestQueue() - { - return writeRequestQueue; - } - - public int getScheduledWriteRequests() - { - synchronized( writeRequestQueue ) - { - return writeRequestQueue.size(); - } - } - - protected void write0( WriteRequest writeRequest ) - { - filterChain.filterWrite( this, writeRequest ); - } - - public TransportType getTransportType() - { - return TransportType.SOCKET; - } - - public boolean isConnected() - { - return ch.isConnected(); - } - - public SocketAddress getRemoteAddress() - { - return remoteAddress; - } - - public SocketAddress getLocalAddress() - { - return localAddress; - } - - public boolean getKeepAlive() throws SocketException - { - return ch.socket().getKeepAlive(); - } - - public void setKeepAlive( boolean on ) throws SocketException - { - ch.socket().setKeepAlive( on ); - } - - public boolean getOOBInline() throws SocketException - { - return ch.socket().getOOBInline(); - } - - public void setOOBInline( boolean on ) throws SocketException - { - ch.socket().setOOBInline( on ); - } - - public boolean getReuseAddress() throws SocketException - { - return ch.socket().getReuseAddress(); - } - - public void setReuseAddress( boolean on ) throws SocketException - { - ch.socket().setReuseAddress( on ); - } - - public int getSoLinger() throws SocketException - { - return ch.socket().getSoLinger(); - } - - public void setSoLinger( boolean on, int linger ) throws SocketException - { - ch.socket().setSoLinger( on, linger ); - } - - public boolean getTcpNoDelay() throws SocketException - { - return ch.socket().getTcpNoDelay(); - } - - public void setTcpNoDelay( boolean on ) throws SocketException - { - ch.socket().setTcpNoDelay( on ); - } - - public int getTrafficClass() throws SocketException - { - return ch.socket().getTrafficClass(); - } - - public void setTrafficClass( int tc ) throws SocketException - { - ch.socket().setTrafficClass( tc ); - } - - public int getSendBufferSize() throws SocketException - { - return ch.socket().getSendBufferSize(); - } - - public void setSendBufferSize( int size ) throws SocketException - { - ch.socket().setSendBufferSize( size ); - } - - public int getReceiveBufferSize() throws SocketException - { - return ch.socket().getReceiveBufferSize(); - } - - public void setReceiveBufferSize( int size ) throws SocketException - { - ch.socket().setReceiveBufferSize( size ); - } - - public int getSessionReceiveBufferSize() - { - return readBufferSize; - } - - public void setSessionReceiveBufferSize( int size ) - { - if( size <= 0 ) - { - throw new IllegalArgumentException( "Invalid session receive buffer size: " + size ); - } - - this.readBufferSize = size; - } - - protected void updateTrafficMask() - { - this.ioProcessor.updateTrafficMask( this ); - } -} +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio.support; + +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionManager; +import org.apache.mina.common.TransportType; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.BaseIoSession; +import org.apache.mina.transport.socket.nio.SocketSession; +import org.apache.mina.util.Queue; + +/** + * An {@link IoSession} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +class SocketSessionImpl extends BaseIoSession implements SocketSession +{ + private static final int DEFAULT_READ_BUFFER_SIZE = 1024; + + private final IoSessionManager manager; + private final SocketIoProcessor ioProcessor; + private final SocketFilterChain filterChain; + private final SocketChannel ch; + private final Queue writeRequestQueue; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private SelectionKey key; + private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; + + /** + * Creates a new instance. + */ + public SocketSessionImpl( + IoSessionManager manager, + SocketChannel ch, IoHandler defaultHandler ) + { + this.manager = manager; + this.ioProcessor = SocketIoProcessor.getInstance(); + this.filterChain = new SocketFilterChain( this ); + this.ch = ch; + this.writeRequestQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + } + + public IoSessionManager getManager() + { + return manager; + } + + SocketIoProcessor getIoProcessor() + { + return ioProcessor; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + SocketChannel getChannel() + { + return ch; + } + + SelectionKey getSelectionKey() + { + return key; + } + + void setSelectionKey( SelectionKey key ) + { + this.key = key; + } + + public IoHandler getHandler() + { + return handler; + } + + protected void close0( CloseFuture closeFuture ) + { + filterChain.filterClose( this, closeFuture ); + } + + Queue getWriteRequestQueue() + { + return writeRequestQueue; + } + + public int getScheduledWriteRequests() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.size(); + } + } + + protected void write0( WriteRequest writeRequest ) + { + filterChain.filterWrite( this, writeRequest ); + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public boolean isConnected() + { + return ch.isConnected(); + } + + public SocketAddress getRemoteAddress() + { + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + return localAddress; + } + + public boolean getKeepAlive() throws SocketException + { + return ch.socket().getKeepAlive(); + } + + public void setKeepAlive( boolean on ) throws SocketException + { + ch.socket().setKeepAlive( on ); + } + + public boolean getOOBInline() throws SocketException + { + return ch.socket().getOOBInline(); + } + + public void setOOBInline( boolean on ) throws SocketException + { + ch.socket().setOOBInline( on ); + } + + public boolean getReuseAddress() throws SocketException + { + return ch.socket().getReuseAddress(); + } + + public void setReuseAddress( boolean on ) throws SocketException + { + ch.socket().setReuseAddress( on ); + } + + public int getSoLinger() throws SocketException + { + return ch.socket().getSoLinger(); + } + + public void setSoLinger( boolean on, int linger ) throws SocketException + { + ch.socket().setSoLinger( on, linger ); + } + + public boolean getTcpNoDelay() throws SocketException + { + return ch.socket().getTcpNoDelay(); + } + + public void setTcpNoDelay( boolean on ) throws SocketException + { + ch.socket().setTcpNoDelay( on ); + } + + public int getTrafficClass() throws SocketException + { + return ch.socket().getTrafficClass(); + } + + public void setTrafficClass( int tc ) throws SocketException + { + ch.socket().setTrafficClass( tc ); + } + + public int getSendBufferSize() throws SocketException + { + return ch.socket().getSendBufferSize(); + } + + public void setSendBufferSize( int size ) throws SocketException + { + ch.socket().setSendBufferSize( size ); + } + + public int getReceiveBufferSize() throws SocketException + { + return ch.socket().getReceiveBufferSize(); + } + + public void setReceiveBufferSize( int size ) throws SocketException + { + ch.socket().setReceiveBufferSize( size ); + } + + public int getSessionReceiveBufferSize() + { + return readBufferSize; + } + + public void setSessionReceiveBufferSize( int size ) + { + if( size <= 0 ) + { + throw new IllegalArgumentException( "Invalid session receive buffer size: " + size ); + } + + this.readBufferSize = size; + } + + protected void updateTrafficMask() + { + this.ioProcessor.updateTrafficMask( this ); + } +} Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Dec 2 20:20:27 2005 @@ -7,6 +7,7 @@ import java.net.SocketAddress; import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.ExceptionMonitor; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoFilterChainBuilder; import org.apache.mina.common.IoHandler; @@ -64,7 +65,7 @@ } catch( Throwable t ) { - remoteEntry.getAcceptor().getExceptionMonitor().exceptionCaught( remoteEntry.getAcceptor(), t ); + ExceptionMonitor.getInstance().exceptionCaught( t ); IOException e = new IOException( "Failed to initialize remote session." ); e.initCause( t ); throw e; Modified: directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java (original) +++ directory/network/trunk/src/test/org/apache/mina/integration/spring/support/AbstractIoSessionManagerFactoryBeanTest.java Fri Dec 2 20:20:27 2005 @@ -21,7 +21,6 @@ import junit.framework.TestCase; import org.apache.mina.common.DefaultIoFilterChainBuilder; -import org.apache.mina.common.ExceptionMonitor; import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoSessionManager; import org.apache.mina.integration.spring.IoFilterMapping; @@ -187,22 +186,18 @@ .getMock(); DefaultIoFilterChainBuilder ioFilterChainBuilder = ( DefaultIoFilterChainBuilder ) MockClassControl .createControl( DefaultIoFilterChainBuilder.class ).getMock(); - ExceptionMonitor exceptionMonitor = ( ExceptionMonitor ) MockControl - .createControl( ExceptionMonitor.class ).getMock(); /* * Record expectations. */ ioSessionManager.getFilterChain(); mockIoSessionManager.setReturnValue( ioFilterChainBuilder ); - ioSessionManager.setExceptionMonitor( exceptionMonitor ); /* * Replay. */ mockIoSessionManager.replay(); - factory.setExceptionMonitor( exceptionMonitor ); factory.initIoSessionManager( ioSessionManager ); /* @@ -286,22 +281,4 @@ { } } - - /** - * Tests that - * {@link AbstractIoSessionManagerFactoryBean#setExceptionMonitor(ExceptionMonitor)} - * validates the method arguments. - */ - public void testSetExceptionMonitor() - { - try - { - factory.setExceptionMonitor( null ); - fail( "null exceptionMonitor set. IllegalArgumentException expected." ); - } - catch( IllegalArgumentException iae ) - { - } - } - }