Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 29136 invoked from network); 3 Dec 2005 04:21:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 3 Dec 2005 04:21:24 -0000 Received: (qmail 55967 invoked by uid 500); 3 Dec 2005 04:21:23 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 55879 invoked by uid 500); 3 Dec 2005 04:21:23 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 55847 invoked by uid 99); 3 Dec 2005 04:21:22 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2005 20:21:22 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 02 Dec 2005 20:22:49 -0800 Received: (qmail 28602 invoked by uid 65534); 3 Dec 2005 04:20:59 -0000 Message-ID: <20051203042059.28600.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r351884 [2/3] - in /directory/network/trunk/src: java/org/apache/mina/common/ java/org/apache/mina/common/support/ java/org/apache/mina/integration/spring/support/ java/org/apache/mina/transport/socket/nio/ java/org/apache/mina/transport/so... Date: Sat, 03 Dec 2005 04:20:55 -0000 To: commits@directory.apache.org From: trustin@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Dec 2 20:20:27 2005 @@ -1,594 +1,594 @@ -/* - * @(#) $Id$ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.transport.socket.nio.support; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Set; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.util.Queue; - -/** - * {@link IoConnector} for datagram transport (UDP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev$, $Date$ - */ -public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager -{ - private static volatile int nextId = 0; - - private final IoConnector wrapper; - private final int id = nextId ++ ; - private Selector selector; - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - private final Queue flushingSessions = new Queue(); - private final Queue trafficControllingSessions = new Queue(); - private Worker worker; - - /** - * Creates a new instance. - */ - public DatagramConnectorDelegate( IoConnector wrapper ) - { - this.wrapper = wrapper; - } - - public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - return connect( address, null, handler, filterChainBuilder ); - } - - public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - if( address == null ) - throw new NullPointerException( "address" ); - if( handler == null ) - throw new NullPointerException( "handler" ); - - if( !( address instanceof InetSocketAddress ) ) - throw new IllegalArgumentException( "Unexpected address type: " - + address.getClass() ); - - if( localAddress != null && !( localAddress instanceof InetSocketAddress ) ) - { - throw new IllegalArgumentException( "Unexpected local address type: " - + localAddress.getClass() ); - } - - if( filterChainBuilder == null ) - { - filterChainBuilder = IoFilterChainBuilder.NOOP; - } - - DatagramChannel ch = null; - boolean initialized = false; - try - { - ch = DatagramChannel.open(); - ch.socket().setReuseAddress( true ); - if( localAddress != null ) - { - ch.socket().bind( localAddress ); - } - ch.connect( address ); - ch.configureBlocking( false ); - initialized = true; - } - catch( IOException e ) - { - return ConnectFuture.newFailedFuture( e ); - } - finally - { - if( !initialized && ch != null ) - { - try - { - ch.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - } - } - - RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder ); - synchronized( this ) - { - try - { - startupWorker(); - } - catch( IOException e ) - { - try - { - ch.close(); - } - catch( IOException e2 ) - { - exceptionMonitor.exceptionCaught( this, e2 ); - } - - return ConnectFuture.newFailedFuture( e ); - } - - synchronized( registerQueue ) - { - registerQueue.push( request ); - } - } - - selector.wakeup(); - return request; - } - - private synchronized void startupWorker() throws IOException - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - worker.start(); - } - } - - public void closeSession( DatagramSessionImpl session ) - { - synchronized( this ) - { - try - { - startupWorker(); - } - catch( IOException e ) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply return - // silently here because it we can simply conclude that - // this session is not managed by this connector or - // already closed. - return; - } - - synchronized( cancelQueue ) - { - cancelQueue.push( session ); - } - } - - selector.wakeup(); - } - - public void flushSession( DatagramSessionImpl session ) - { - scheduleFlush( session ); - selector.wakeup(); - } - - private void scheduleFlush( DatagramSessionImpl session ) - { - synchronized( flushingSessions ) - { - flushingSessions.push( session ); - } - } - - public void updateTrafficMask( DatagramSessionImpl session ) - { - scheduleTrafficControl( session ); - selector.wakeup(); - } - - private void scheduleTrafficControl( DatagramSessionImpl session ) - { - synchronized( trafficControllingSessions ) - { - trafficControllingSessions.push( session ); - } - } - - private void doUpdateTrafficMask() - { - if( trafficControllingSessions.isEmpty() ) - return; - - for( ;; ) - { - DatagramSessionImpl session; - - synchronized( trafficControllingSessions ) - { - session = ( DatagramSessionImpl ) trafficControllingSessions.pop(); - } - - if( session == null ) - break; - - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - if( key == null ) - { - scheduleTrafficControl( session ); - break; - } - // skip if channel is already closed - if( !key.isValid() ) - { - continue; - } - - // The normal is OP_READ and, if there are write requests in the - // session's write queue, set OP_WRITE to trigger flushing. - int ops = SelectionKey.OP_READ; - Queue writeRequestQueue = session.getWriteRequestQueue(); - synchronized( writeRequestQueue ) - { - if( !writeRequestQueue.isEmpty() ) - { - ops |= SelectionKey.OP_WRITE; - } - } - - // Now mask the preferred ops with the mask of the current session - int mask = session.getTrafficMask().getInterestOps(); - key.interestOps( ops & mask ); - } - } - - private class Worker extends Thread - { - public Worker() - { - super( "DatagramConnector-" + id ); - } - - public void run() - { - for( ;; ) - { - try - { - int nKeys = selector.select(); - - registerNew(); - doUpdateTrafficMask(); - - if( nKeys > 0 ) - { - processReadySessions( selector.selectedKeys() ); - } - - flushSessions(); - cancelKeys(); - - if( selector.keys().isEmpty() ) - { - synchronized( DatagramConnectorDelegate.this ) - { - if( selector.keys().isEmpty() && - registerQueue.isEmpty() && - cancelQueue.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( DatagramConnectorDelegate.this, e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( DatagramConnectorDelegate.this, - e ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - } - } - } - } - } - - private void processReadySessions( Set keys ) - { - Iterator it = keys.iterator(); - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - it.remove(); - - DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment(); - - if( key.isReadable() && session.getTrafficMask().isReadable() ) - { - readSession( session ); - } - - if( key.isWritable() && session.getTrafficMask().isWritable() ) - { - scheduleFlush( session ); - } - } - } - - private void readSession( DatagramSessionImpl session ) - { - - ByteBuffer readBuf = ByteBuffer.allocate( 2048 ); - try - { - int readBytes = session.getChannel().read( readBuf.buf() ); - if( readBytes > 0 ) - { - readBuf.flip(); - ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() ); - newBuf.put( readBuf ); - newBuf.flip(); - - session.increaseReadBytes( readBytes ); - ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf ); - } - } - catch( IOException e ) - { - ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - finally - { - readBuf.release(); - } - } - - private void flushSessions() - { - if( flushingSessions.size() == 0 ) - return; - - for( ;; ) - { - DatagramSessionImpl session; - - synchronized( flushingSessions ) - { - session = ( DatagramSessionImpl ) flushingSessions.pop(); - } - - if( session == null ) - break; - - try - { - flush( session ); - } - catch( IOException e ) - { - ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); - } - } - } - - private void flush( DatagramSessionImpl session ) throws IOException - { - DatagramChannel ch = session.getChannel(); - - Queue writeRequestQueue = session.getWriteRequestQueue(); - - WriteRequest req; - for( ;; ) - { - synchronized( writeRequestQueue ) - { - req = ( WriteRequest ) writeRequestQueue.first(); - } - - if( req == null ) - break; - - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) - { - // pop and fire event - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } - - req.getFuture().setWritten( true ); - session.increaseWrittenWriteRequests(); - ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf ); - continue; - } - - SelectionKey key = session.getSelectionKey(); - if( key == null ) - { - scheduleFlush( session ); - break; - } - if( !key.isValid() ) - { - continue; - } - - int pos = buf.position(); - int writtenBytes = ch.write( buf.buf() ); - - if( writtenBytes == 0 ) - { - // Kernel buffer is full - key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); - } - else if( writtenBytes > 0 ) - { - key.interestOps( key.interestOps() - & ( ~SelectionKey.OP_WRITE ) ); - - // pop and fire event - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } - - session.increaseWrittenBytes( writtenBytes ); - req.getFuture().setWritten( true ); - session.increaseWrittenWriteRequests(); - ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) ); - } - } - } - - private void registerNew() - { - if( registerQueue.isEmpty() ) - return; - - for( ;; ) - { - RegistrationRequest req; - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } - - if( req == null ) - break; - - DatagramSessionImpl session = - new DatagramSessionImpl( wrapper, this, req.channel, req.handler ); - - boolean success = false; - try - { - this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); - req.filterChainBuilder.buildFilterChain( session.getFilterChain() ); - ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session ); - - SelectionKey key = req.channel.register( selector, - SelectionKey.OP_READ, session ); - - session.setSelectionKey( key ); - - req.setSession( session ); - success = true; - } - catch( Throwable t ) - { - req.setException( t ); - } - finally - { - if( !success ) - { - try - { - req.channel.close(); - } - catch (IOException e) - { - exceptionMonitor.exceptionCaught( this, e ); - } - } - } - } - } - - private void cancelKeys() - { - if( cancelQueue.isEmpty() ) - return; - - for( ;; ) - { - DatagramSessionImpl session; - synchronized( cancelQueue ) - { - session = ( DatagramSessionImpl ) cancelQueue.pop(); - } - - if( session == null ) - break; - else - { - SelectionKey key = session.getSelectionKey(); - DatagramChannel ch = ( DatagramChannel ) key.channel(); - try - { - ch.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - session.getCloseFuture().setClosed(); - key.cancel(); - selector.wakeup(); // wake up again to trigger thread death - } - } - } - - private static class RegistrationRequest extends ConnectFuture - { - private final DatagramChannel channel; - private final IoHandler handler; - private final IoFilterChainBuilder filterChainBuilder; - - private RegistrationRequest( DatagramChannel channel, - IoHandler handler, - IoFilterChainBuilder filterChainBuilder ) - { - this.channel = channel; - this.handler = handler; - this.filterChainBuilder = filterChainBuilder; - } - } -} +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio.support; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Set; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChainBuilder; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.util.Queue; + +/** + * {@link IoConnector} for datagram transport (UDP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager +{ + private static volatile int nextId = 0; + + private final IoConnector wrapper; + private final int id = nextId ++ ; + private Selector selector; + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + private final Queue flushingSessions = new Queue(); + private final Queue trafficControllingSessions = new Queue(); + private Worker worker; + + /** + * Creates a new instance. + */ + public DatagramConnectorDelegate( IoConnector wrapper ) + { + this.wrapper = wrapper; + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + return connect( address, null, handler, filterChainBuilder ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + + if( !( address instanceof InetSocketAddress ) ) + throw new IllegalArgumentException( "Unexpected address type: " + + address.getClass() ); + + if( localAddress != null && !( localAddress instanceof InetSocketAddress ) ) + { + throw new IllegalArgumentException( "Unexpected local address type: " + + localAddress.getClass() ); + } + + if( filterChainBuilder == null ) + { + filterChainBuilder = IoFilterChainBuilder.NOOP; + } + + DatagramChannel ch = null; + boolean initialized = false; + try + { + ch = DatagramChannel.open(); + ch.socket().setReuseAddress( true ); + if( localAddress != null ) + { + ch.socket().bind( localAddress ); + } + ch.connect( address ); + ch.configureBlocking( false ); + initialized = true; + } + catch( IOException e ) + { + return ConnectFuture.newFailedFuture( e ); + } + finally + { + if( !initialized && ch != null ) + { + try + { + ch.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder ); + synchronized( this ) + { + try + { + startupWorker(); + } + catch( IOException e ) + { + try + { + ch.close(); + } + catch( IOException e2 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e2 ); + } + + return ConnectFuture.newFailedFuture( e ); + } + + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + } + + selector.wakeup(); + return request; + } + + private synchronized void startupWorker() throws IOException + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + worker.start(); + } + } + + public void closeSession( DatagramSessionImpl session ) + { + synchronized( this ) + { + try + { + startupWorker(); + } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply return + // silently here because it we can simply conclude that + // this session is not managed by this connector or + // already closed. + return; + } + + synchronized( cancelQueue ) + { + cancelQueue.push( session ); + } + } + + selector.wakeup(); + } + + public void flushSession( DatagramSessionImpl session ) + { + scheduleFlush( session ); + selector.wakeup(); + } + + private void scheduleFlush( DatagramSessionImpl session ) + { + synchronized( flushingSessions ) + { + flushingSessions.push( session ); + } + } + + public void updateTrafficMask( DatagramSessionImpl session ) + { + scheduleTrafficControl( session ); + selector.wakeup(); + } + + private void scheduleTrafficControl( DatagramSessionImpl session ) + { + synchronized( trafficControllingSessions ) + { + trafficControllingSessions.push( session ); + } + } + + private void doUpdateTrafficMask() + { + if( trafficControllingSessions.isEmpty() ) + return; + + for( ;; ) + { + DatagramSessionImpl session; + + synchronized( trafficControllingSessions ) + { + session = ( DatagramSessionImpl ) trafficControllingSessions.pop(); + } + + if( session == null ) + break; + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if( key == null ) + { + scheduleTrafficControl( session ); + break; + } + // skip if channel is already closed + if( !key.isValid() ) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + int ops = SelectionKey.OP_READ; + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized( writeRequestQueue ) + { + if( !writeRequestQueue.isEmpty() ) + { + ops |= SelectionKey.OP_WRITE; + } + } + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + key.interestOps( ops & mask ); + } + } + + private class Worker extends Thread + { + public Worker() + { + super( "DatagramConnector-" + id ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + doUpdateTrafficMask(); + + if( nKeys > 0 ) + { + processReadySessions( selector.selectedKeys() ); + } + + flushSessions(); + cancelKeys(); + + if( selector.keys().isEmpty() ) + { + synchronized( DatagramConnectorDelegate.this ) + { + if( selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + } + + private void processReadySessions( Set keys ) + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + it.remove(); + + DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment(); + + if( key.isReadable() && session.getTrafficMask().isReadable() ) + { + readSession( session ); + } + + if( key.isWritable() && session.getTrafficMask().isWritable() ) + { + scheduleFlush( session ); + } + } + } + + private void readSession( DatagramSessionImpl session ) + { + + ByteBuffer readBuf = ByteBuffer.allocate( 2048 ); + try + { + int readBytes = session.getChannel().read( readBuf.buf() ); + if( readBytes > 0 ) + { + readBuf.flip(); + ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() ); + newBuf.put( readBuf ); + newBuf.flip(); + + session.increaseReadBytes( readBytes ); + ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf ); + } + } + catch( IOException e ) + { + ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + finally + { + readBuf.release(); + } + } + + private void flushSessions() + { + if( flushingSessions.size() == 0 ) + return; + + for( ;; ) + { + DatagramSessionImpl session; + + synchronized( flushingSessions ) + { + session = ( DatagramSessionImpl ) flushingSessions.pop(); + } + + if( session == null ) + break; + + try + { + flush( session ); + } + catch( IOException e ) + { + ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e ); + } + } + } + + private void flush( DatagramSessionImpl session ) throws IOException + { + DatagramChannel ch = session.getChannel(); + + Queue writeRequestQueue = session.getWriteRequestQueue(); + + WriteRequest req; + for( ;; ) + { + synchronized( writeRequestQueue ) + { + req = ( WriteRequest ) writeRequestQueue.first(); + } + + if( req == null ) + break; + + ByteBuffer buf = ( ByteBuffer ) req.getMessage(); + if( buf.remaining() == 0 ) + { + // pop and fire event + synchronized( writeRequestQueue ) + { + writeRequestQueue.pop(); + } + + req.getFuture().setWritten( true ); + session.increaseWrittenWriteRequests(); + ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf ); + continue; + } + + SelectionKey key = session.getSelectionKey(); + if( key == null ) + { + scheduleFlush( session ); + break; + } + if( !key.isValid() ) + { + continue; + } + + int pos = buf.position(); + int writtenBytes = ch.write( buf.buf() ); + + if( writtenBytes == 0 ) + { + // Kernel buffer is full + key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); + } + else if( writtenBytes > 0 ) + { + key.interestOps( key.interestOps() + & ( ~SelectionKey.OP_WRITE ) ); + + // pop and fire event + synchronized( writeRequestQueue ) + { + writeRequestQueue.pop(); + } + + session.increaseWrittenBytes( writtenBytes ); + req.getFuture().setWritten( true ); + session.increaseWrittenWriteRequests(); + ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) ); + } + } + } + + private void registerNew() + { + if( registerQueue.isEmpty() ) + return; + + for( ;; ) + { + RegistrationRequest req; + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + break; + + DatagramSessionImpl session = + new DatagramSessionImpl( wrapper, this, req.channel, req.handler ); + + boolean success = false; + try + { + this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); + req.filterChainBuilder.buildFilterChain( session.getFilterChain() ); + ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session ); + + SelectionKey key = req.channel.register( selector, + SelectionKey.OP_READ, session ); + + session.setSelectionKey( key ); + + req.setSession( session ); + success = true; + } + catch( Throwable t ) + { + req.setException( t ); + } + finally + { + if( !success ) + { + try + { + req.channel.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + } + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + return; + + for( ;; ) + { + DatagramSessionImpl session; + synchronized( cancelQueue ) + { + session = ( DatagramSessionImpl ) cancelQueue.pop(); + } + + if( session == null ) + break; + else + { + SelectionKey key = session.getSelectionKey(); + DatagramChannel ch = ( DatagramChannel ) key.channel(); + try + { + ch.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + session.getCloseFuture().setClosed(); + key.cancel(); + selector.wakeup(); // wake up again to trigger thread death + } + } + } + + private static class RegistrationRequest extends ConnectFuture + { + private final DatagramChannel channel; + private final IoHandler handler; + private final IoFilterChainBuilder filterChainBuilder; + + private RegistrationRequest( DatagramChannel channel, + IoHandler handler, + IoFilterChainBuilder filterChainBuilder ) + { + this.channel = channel; + this.handler = handler; + this.filterChainBuilder = filterChainBuilder; + } + } +} Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java?rev=351884&r1=351883&r2=351884&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java Fri Dec 2 20:20:27 2005 @@ -1,524 +1,524 @@ -/* - * @(#) $Id$ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.transport.socket.nio.support; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.transport.socket.nio.SocketSessionManager; -import org.apache.mina.util.Queue; - -/** - * {@link IoAcceptor} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev$, $Date$ - */ -public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager -{ - private static volatile int nextId = 0; - - private final IoAcceptor wrapper; - private final int id = nextId ++ ; - private final String threadName = "SocketAcceptor-" + id; - private boolean reuseAddress = false; - private int backlog = 50; - private int receiveBufferSize = -1; - private Selector selector; - private final Map channels = new HashMap(); - - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - - private Worker worker; - private final SocketIoProcessor ioProcessor = new SocketIoProcessor( this, threadName + "-1" ); - - /** - * Creates a new instance. - */ - public SocketAcceptorDelegate( IoAcceptor wrapper ) - { - this.wrapper = wrapper; - } - - /** - * Binds to the specified address and handles incoming - * connections with the specified handler. Backlog value - * is configured to the value of backlog property. - * - * @throws IOException if failed to bind - */ - public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException - { - if( address == null ) - { - throw new NullPointerException( "address" ); - } - - if( handler == null ) - { - throw new NullPointerException( "handler" ); - } - - if( !( address instanceof InetSocketAddress ) ) - { - throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); - } - - if( ( ( InetSocketAddress ) address ).getPort() == 0 ) - { - throw new IllegalArgumentException( "Unsupported port number: 0" ); - } - - if( filterChainBuilder == null ) - { - filterChainBuilder = IoFilterChainBuilder.NOOP; - } - - RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder ); - - synchronized( this ) - { - synchronized( registerQueue ) - { - registerQueue.push( request ); - } - startupWorker(); - } - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - } - } - } - - if( request.exception != null ) - { - throw request.exception; - } - } - - - private synchronized void startupWorker() throws IOException - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - - worker.start(); - } - } - - public void unbind( SocketAddress address ) - { - // TODO: DIRMINA-93 - if( address == null ) - { - throw new NullPointerException( "address" ); - } - - CancellationRequest request = new CancellationRequest( address ); - synchronized( this ) - { - try - { - startupWorker(); - } - catch( IOException e ) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply throw - // IllegalArgumentException here because we can simply - // conclude that nothing is bound to the selector. - throw new IllegalArgumentException( "Address not bound: " + address ); - } - - synchronized( cancelQueue ) - { - cancelQueue.push( request ); - } - } - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - } - } - } - - if( request.exception != null ) - { - request.exception.fillInStackTrace(); - - throw request.exception; - } - } - - public int getProcessors() - { - throw new UnsupportedOperationException(); - } - - public void setProcessors( int nProcessor ) - { - throw new UnsupportedOperationException(); - } - - private class Worker extends Thread - { - public Worker() - { - super( SocketAcceptorDelegate.this.threadName ); - } - - public void run() - { - for( ;; ) - { - try - { - int nKeys = selector.select(); - - registerNew(); - cancelKeys(); - - if( nKeys > 0 ) - { - processSessions( selector.selectedKeys() ); - } - - if( selector.keys().isEmpty() ) - { - synchronized( SocketAcceptorDelegate.this ) - { - if( selector.keys().isEmpty() && - registerQueue.isEmpty() && - cancelQueue.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, e ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - } - } - } - } - - private void processSessions( Set keys ) throws IOException - { - Iterator it = keys.iterator(); - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - - it.remove(); - - if( !key.isAcceptable() ) - { - continue; - } - - ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); - - SocketChannel ch = ssc.accept(); - - if( ch == null ) - { - continue; - } - - boolean success = false; - try - { - RegistrationRequest req = ( RegistrationRequest ) key.attachment(); - SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ioProcessor, ch, req.handler ); - SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); - req.filterChainBuilder.buildFilterChain( session.getFilterChain() ); - ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session ); - session.getIoProcessor().addNew( session ); - success = true; - } - catch( Throwable t ) - { - exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, t ); - } - finally - { - if( !success ) - { - ch.close(); - } - } - } - } - } - - - private void registerNew() - { - if( registerQueue.isEmpty() ) - { - return; - } - - for( ;; ) - { - RegistrationRequest req; - - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } - - if( req == null ) - { - break; - } - - ServerSocketChannel ssc = null; - - try - { - ssc = ServerSocketChannel.open(); - ssc.configureBlocking( false ); - - // Configure the server socket, - ssc.socket().setReuseAddress( isReuseAddress() ); - if( getReceiveBufferSize() > 0 ) - { - ssc.socket().setReceiveBufferSize( getReceiveBufferSize() ); - } - - // and bind. - ssc.socket().bind( req.address, getBacklog() ); - ssc.register( selector, SelectionKey.OP_ACCEPT, req ); - - channels.put( req.address, ssc ); - } - catch( IOException e ) - { - req.exception = e; - } - finally - { - synchronized( req ) - { - req.done = true; - - req.notify(); - } - - if( ssc != null && req.exception != null ) - { - try - { - ssc.close(); - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - } - } - } - } - - - private void cancelKeys() - { - if( cancelQueue.isEmpty() ) - { - return; - } - - for( ;; ) - { - CancellationRequest request; - - synchronized( cancelQueue ) - { - request = ( CancellationRequest ) cancelQueue.pop(); - } - - if( request == null ) - { - break; - } - - ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address ); - - // close the channel - try - { - if( ssc == null ) - { - request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); - } - else - { - SelectionKey key = ssc.keyFor( selector ); - - key.cancel(); - - selector.wakeup(); // wake up again to trigger thread death - - ssc.close(); - } - } - catch( IOException e ) - { - exceptionMonitor.exceptionCaught( this, e ); - } - finally - { - synchronized( request ) - { - request.done = true; - - request.notify(); - } - } - } - } - - public int getReceiveBufferSize() - { - return receiveBufferSize; - } - - /** - * @param receiveBufferSize -1 to use the default value. - */ - public void setReceiveBufferSize( int receiveBufferSize ) - { - this.receiveBufferSize = receiveBufferSize; - } - - public boolean isReuseAddress() - { - return reuseAddress; - } - - public void setReuseAddress( boolean reuseAddress ) - { - this.reuseAddress = reuseAddress; - } - - public int getBacklog() - { - return backlog; - } - - public void setBacklog( int backlog ) - { - if( backlog <= 0 ) - { - throw new IllegalArgumentException( "backlog: " + backlog ); - } - this.backlog = backlog; - } - - private static class RegistrationRequest - { - private final SocketAddress address; - private final IoHandler handler; - private final IoFilterChainBuilder filterChainBuilder; - private IOException exception; - private boolean done; - - private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) - { - this.address = address; - this.handler = handler; - this.filterChainBuilder = filterChainBuilder; - } - } - - - private static class CancellationRequest - { - private final SocketAddress address; - - private boolean done; - - private RuntimeException exception; - - private CancellationRequest( SocketAddress address ) - { - this.address = address; - } - } -} +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio.support; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoFilterChainBuilder; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.transport.socket.nio.SocketSessionManager; +import org.apache.mina.util.Queue; + +/** + * {@link IoAcceptor} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager +{ + private static volatile int nextId = 0; + + private final IoAcceptor wrapper; + private final int id = nextId ++ ; + private final String threadName = "SocketAcceptor-" + id; + private boolean reuseAddress = false; + private int backlog = 50; + private int receiveBufferSize = -1; + private Selector selector; + private final Map channels = new HashMap(); + + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + + private Worker worker; + + /** + * Creates a new instance. + */ + public SocketAcceptorDelegate( IoAcceptor wrapper ) + { + this.wrapper = wrapper; + } + + /** + * Binds to the specified address and handles incoming + * connections with the specified handler. Backlog value + * is configured to the value of backlog property. + * + * @throws IOException if failed to bind + */ + public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException + { + if( address == null ) + { + throw new NullPointerException( "address" ); + } + + if( handler == null ) + { + throw new NullPointerException( "handler" ); + } + + if( !( address instanceof InetSocketAddress ) ) + { + throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); + } + + if( ( ( InetSocketAddress ) address ).getPort() == 0 ) + { + throw new IllegalArgumentException( "Unsupported port number: 0" ); + } + + if( filterChainBuilder == null ) + { + filterChainBuilder = IoFilterChainBuilder.NOOP; + } + + RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder ); + + synchronized( this ) + { + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + startupWorker(); + } + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + } + } + } + + if( request.exception != null ) + { + throw request.exception; + } + } + + + private synchronized void startupWorker() throws IOException + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + + worker.start(); + } + } + + public void unbind( SocketAddress address ) + { + // TODO: DIRMINA-93 + if( address == null ) + { + throw new NullPointerException( "address" ); + } + + CancellationRequest request = new CancellationRequest( address ); + synchronized( this ) + { + try + { + startupWorker(); + } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException( "Address not bound: " + address ); + } + + synchronized( cancelQueue ) + { + cancelQueue.push( request ); + } + } + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + } + } + } + + if( request.exception != null ) + { + request.exception.fillInStackTrace(); + + throw request.exception; + } + } + + public int getProcessors() + { + throw new UnsupportedOperationException(); + } + + public void setProcessors( int nProcessor ) + { + throw new UnsupportedOperationException(); + } + + private class Worker extends Thread + { + public Worker() + { + super( SocketAcceptorDelegate.this.threadName ); + } + + public void run() + { + for( ;; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + cancelKeys(); + + if( nKeys > 0 ) + { + processSessions( selector.selectedKeys() ); + } + + if( selector.keys().isEmpty() ) + { + synchronized( SocketAcceptorDelegate.this ) + { + if( selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + } + } + } + } + + private void processSessions( Set keys ) throws IOException + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + it.remove(); + + if( !key.isAcceptable() ) + { + continue; + } + + ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); + + SocketChannel ch = ssc.accept(); + + if( ch == null ) + { + continue; + } + + boolean success = false; + try + { + RegistrationRequest req = ( RegistrationRequest ) key.attachment(); + SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ch, req.handler ); + SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() ); + req.filterChainBuilder.buildFilterChain( session.getFilterChain() ); + ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session ); + session.getIoProcessor().addNew( session ); + success = true; + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + } + finally + { + if( !success ) + { + ch.close(); + } + } + } + } + } + + + private void registerNew() + { + if( registerQueue.isEmpty() ) + { + return; + } + + for( ;; ) + { + RegistrationRequest req; + + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + { + break; + } + + ServerSocketChannel ssc = null; + + try + { + ssc = ServerSocketChannel.open(); + ssc.configureBlocking( false ); + + // Configure the server socket, + ssc.socket().setReuseAddress( isReuseAddress() ); + if( getReceiveBufferSize() > 0 ) + { + ssc.socket().setReceiveBufferSize( getReceiveBufferSize() ); + } + + // and bind. + ssc.socket().bind( req.address, getBacklog() ); + ssc.register( selector, SelectionKey.OP_ACCEPT, req ); + + channels.put( req.address, ssc ); + } + catch( IOException e ) + { + req.exception = e; + } + finally + { + synchronized( req ) + { + req.done = true; + + req.notify(); + } + + if( ssc != null && req.exception != null ) + { + try + { + ssc.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + } + + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + { + return; + } + + for( ;; ) + { + CancellationRequest request; + + synchronized( cancelQueue ) + { + request = ( CancellationRequest ) cancelQueue.pop(); + } + + if( request == null ) + { + break; + } + + ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address ); + + // close the channel + try + { + if( ssc == null ) + { + request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); + } + else + { + SelectionKey key = ssc.keyFor( selector ); + + key.cancel(); + + selector.wakeup(); // wake up again to trigger thread death + + ssc.close(); + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + synchronized( request ) + { + request.done = true; + + request.notify(); + } + } + } + } + + public int getReceiveBufferSize() + { + return receiveBufferSize; + } + + /** + * @param receiveBufferSize -1 to use the default value. + */ + public void setReceiveBufferSize( int receiveBufferSize ) + { + this.receiveBufferSize = receiveBufferSize; + } + + public boolean isReuseAddress() + { + return reuseAddress; + } + + public void setReuseAddress( boolean reuseAddress ) + { + this.reuseAddress = reuseAddress; + } + + public int getBacklog() + { + return backlog; + } + + public void setBacklog( int backlog ) + { + if( backlog <= 0 ) + { + throw new IllegalArgumentException( "backlog: " + backlog ); + } + this.backlog = backlog; + } + + private static class RegistrationRequest + { + private final SocketAddress address; + private final IoHandler handler; + private final IoFilterChainBuilder filterChainBuilder; + private IOException exception; + private boolean done; + + private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) + { + this.address = address; + this.handler = handler; + this.filterChainBuilder = filterChainBuilder; + } + } + + + private static class CancellationRequest + { + private final SocketAddress address; + + private boolean done; + + private RuntimeException exception; + + private CancellationRequest( SocketAddress address ) + { + this.address = address; + } + } +}