Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 16937 invoked from network); 28 Nov 2005 22:20:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 28 Nov 2005 22:20:21 -0000 Received: (qmail 97021 invoked by uid 500); 28 Nov 2005 22:19:55 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 96891 invoked by uid 500); 28 Nov 2005 22:19:54 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Delivered-To: moderator for commits@directory.apache.org Received: (qmail 48787 invoked by uid 99); 28 Nov 2005 16:16:59 -0000 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Message-ID: <20051128161635.54357.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r349414 - in /directory/network/trunk/src: java/org/apache/mina/transport/socket/nio/support/ java/org/apache/mina/transport/vmpipe/ java/org/apache/mina/transport/vmpipe/support/ test/org/apache/mina/transport/ test/org/apache/mina/transpo... Date: Mon, 28 Nov 2005 16:15:36 -0000 To: commits@directory.apache.org From: niklas@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: niklas Date: Mon Nov 28 08:14:55 2005 New Revision: 349414 URL: http://svn.apache.org/viewcvs?rev=349414&view=rev Log: Added support for traffic control for vmpipe and datagram transports Added: directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java (with props) directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java (with props) directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java (with props) directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/ directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java (with props) Removed: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Mon Nov 28 08:14:55 2005 @@ -574,6 +574,13 @@ } } + public void updateTrafficMask( DatagramSessionImpl session ) + { + // There's no point in changing the traffic mask for sessions originating + // from this acceptor since new sessions are created every time data is + // received. + } + public IoFilterChain getFilterChain() { return filters; Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Mon Nov 28 08:14:55 2005 @@ -60,6 +60,8 @@ private final Queue flushingSessions = new Queue(); + private final Queue trafficControllingSessions = new Queue(); + private Worker worker; /** @@ -178,6 +180,70 @@ } } + public void updateTrafficMask( DatagramSessionImpl session ) + { + scheduleTrafficControl( session ); + selector.wakeup(); + } + + private void scheduleTrafficControl( DatagramSessionImpl session ) + { + synchronized( trafficControllingSessions ) + { + trafficControllingSessions.push( session ); + } + } + + private void doUpdateTrafficMask() + { + if( trafficControllingSessions.isEmpty() ) + return; + + for( ;; ) + { + DatagramSessionImpl session; + + synchronized( trafficControllingSessions ) + { + session = ( DatagramSessionImpl ) trafficControllingSessions.pop(); + } + + if( session == null ) + break; + + SelectionKey key = session.getSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if( key == null ) + { + scheduleTrafficControl( session ); + break; + } + // skip if channel is already closed + if( !key.isValid() ) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + int ops = SelectionKey.OP_READ; + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized( writeRequestQueue ) + { + if( !writeRequestQueue.isEmpty() ) + { + ops |= SelectionKey.OP_WRITE; + } + } + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + key.interestOps( ops & mask ); + } + } + private class Worker extends Thread { public Worker() @@ -194,6 +260,7 @@ int nKeys = selector.select(); registerNew(); + doUpdateTrafficMask(); if( nKeys > 0 ) { @@ -256,12 +323,12 @@ DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment(); - if( key.isReadable() ) + if( key.isReadable() && session.getTrafficMask().isReadable() ) { readSession( session ); } - if( key.isWritable() ) + if( key.isWritable() && session.getTrafficMask().isWritable() ) { scheduleFlush( session ); } Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Mon Nov 28 08:14:55 2005 @@ -56,7 +56,7 @@ * Creates a new instance. */ DatagramSessionImpl( IoSessionManagerFilterChain managerFilterChain, - DatagramChannel ch, IoHandler defaultHandler ) + DatagramChannel ch, IoHandler defaultHandler ) { this.managerFilterChain = managerFilterChain; this.filterChain = new IoSessionFilterChain( this, managerFilterChain ); @@ -167,6 +167,8 @@ protected void updateTrafficMask() { - // TODO: Implement me. + DatagramSessionManager sessionManager = + ( DatagramSessionManager) managerFilterChain.getManager(); + sessionManager.updateTrafficMask( this ); } } Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java Mon Nov 28 08:14:55 2005 @@ -39,4 +39,10 @@ * This method is invoked by MINA internally. */ void closeSession( DatagramSessionImpl session ); + + /** + * Requests this processor to update the traffic mask for the specified + * session. This method is invoked by MINA internally. + */ + void updateTrafficMask( DatagramSessionImpl session ); } Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java Mon Nov 28 08:14:55 2005 @@ -28,7 +28,7 @@ synchronized( writeRequestQueue ) { writeRequestQueue.push( writeRequest ); - if( writeRequestQueue.size() == 1 ) + if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) { // Notify DatagramSessionManager only when writeRequestQueue was empty. ( ( DatagramSessionManager ) getManager() ).flushSession( s ); Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java Mon Nov 28 08:14:55 2005 @@ -12,7 +12,6 @@ import org.apache.mina.common.IoHandler; import org.apache.mina.common.support.BaseIoAcceptor; import org.apache.mina.transport.vmpipe.support.VmPipe; -import org.apache.mina.transport.vmpipe.support.VmPipeFilter; import org.apache.mina.transport.vmpipe.support.VmPipeSessionManagerFilterChain; /** @@ -29,21 +28,6 @@ private final VmPipeSessionManagerFilterChain filterChain = new VmPipeSessionManagerFilterChain( this ); - /** - * Creates a new instance. - */ - public VmPipeAcceptor() - { - try - { - filterChain.addFirst( "VMPipe", new VmPipeFilter() ); - } - catch( Exception e ) - { - throw ( InternalError ) new InternalError( "Unexpected exception caught." ).initCause( e ); - } - } - public void bind( SocketAddress address, IoHandler handler ) throws IOException { if( address == null ) Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Mon Nov 28 08:14:55 2005 @@ -11,8 +11,6 @@ import org.apache.mina.common.IoHandler; import org.apache.mina.common.support.BaseIoConnector; import org.apache.mina.transport.vmpipe.support.VmPipe; -import org.apache.mina.transport.vmpipe.support.VmPipeFilter; -import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; import org.apache.mina.transport.vmpipe.support.VmPipeSessionManagerFilterChain; import org.apache.mina.util.AnonymousSocketAddress; @@ -29,21 +27,6 @@ private final VmPipeSessionManagerFilterChain filterChain = new VmPipeSessionManagerFilterChain( this ); - /** - * Creates a new instance. - */ - public VmPipeConnector() - { - try - { - filterChain.addFirst( "VMPipe", new VmPipeFilter() ); - } - catch( Exception e ) - { - throw ( InternalError ) new InternalError( "Unexpected exception caught." ).initCause( e ); - } - } - public IoFilterChain getFilterChain() { return filterChain; @@ -74,7 +57,6 @@ handler, entry ); - VmPipeIdleStatusChecker.getInstance().addSession( session ); ConnectFuture future = new ConnectFuture(); future.setSession( session ); return future; Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Mon Nov 28 08:14:55 2005 @@ -18,6 +18,7 @@ import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.transport.vmpipe.VmPipeSession; import org.apache.mina.util.ExceptionUtil; +import org.apache.mina.util.Queue; /** * A {@link IoSession} for in-VM transport (VM_PIPE). @@ -34,6 +35,7 @@ private final VmPipeSessionManagerFilterChain managerFilterChain; final VmPipeSessionImpl remoteSession; final Object lock; + final Queue pendingDataQueue; /** * Constructor for client-side session. @@ -49,6 +51,7 @@ this.handler = handler; this.filterChain = new IoSessionFilterChain( this, managerFilterChain ); this.managerFilterChain = managerFilterChain; + this.pendingDataQueue = new Queue(); remoteSession = new VmPipeSessionImpl( this, remoteEntry ); @@ -75,6 +78,9 @@ ExceptionUtil.throwException( t ); } + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( this ); + remoteEntry.getManagerFilterChain().sessionOpened( remoteSession ); managerFilterChain.sessionOpened( this ); } @@ -91,6 +97,7 @@ this.managerFilterChain = entry.getManagerFilterChain(); this.filterChain = new IoSessionFilterChain( this, entry.getManagerFilterChain() ); this.remoteSession = remoteSession; + this.pendingDataQueue = new Queue(); } VmPipeSessionManagerFilterChain getManagerFilterChain() @@ -150,6 +157,27 @@ protected void updateTrafficMask() { - // TODO: Implement me. + if( getTrafficMask().isReadable() || getTrafficMask().isWritable()) + { + Object[] data = null; + synchronized( pendingDataQueue ) + { + data = pendingDataQueue.toArray(); + pendingDataQueue.clear(); + } + + for( int i = 0; i < data.length; i++ ) + { + if( data[ i ] instanceof WriteRequest ) + { + WriteRequest wr = ( WriteRequest ) data[ i ]; + managerFilterChain.doWrite( this, wr ); + } + else + { + managerFilterChain.messageReceived( this, data[ i ] ); + } + } + } } -} \ No newline at end of file +} Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java?rev=349414&r1=349413&r2=349414&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java (original) +++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java Mon Nov 28 08:14:55 2005 @@ -1,5 +1,6 @@ package org.apache.mina.transport.vmpipe.support; +import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.IoSessionManager; @@ -13,6 +14,33 @@ super( manager ); } + public void messageReceived( IoSession session, Object message ) + { + VmPipeSessionImpl s = ( VmPipeSessionImpl ) session; + synchronized( s.lock ) + { + if( !s.getTrafficMask().isReadable() ) + { + synchronized( s.pendingDataQueue ) + { + s.pendingDataQueue.push( message ); + } + } + else + { + int byteCount = 1; + if( message instanceof ByteBuffer ) + { + byteCount = ( ( ByteBuffer ) message ).remaining(); + } + + s.increaseReadBytes( byteCount ); + + super.messageReceived( s, message ); + } + } + } + protected void doWrite( IoSession session, WriteRequest writeRequest ) { VmPipeSessionImpl s = ( VmPipeSessionImpl ) session; @@ -20,8 +48,40 @@ { if( s.isConnected() ) { - s.remoteSession.getManagerFilterChain().messageReceived( s.remoteSession, writeRequest.getMessage() ); - writeRequest.getFuture().setWritten( true ); + + if( !s.getTrafficMask().isWritable() ) + { + synchronized( s.pendingDataQueue ) + { + s.pendingDataQueue.push( writeRequest ); + } + } + else + { + + Object message = writeRequest.getMessage(); + + int byteCount = 1; + Object messageCopy = message; + if( message instanceof ByteBuffer ) + { + ByteBuffer rb = ( ByteBuffer ) message; + byteCount = rb.remaining(); + ByteBuffer wb = ByteBuffer.allocate( rb.remaining() ); + wb.put( rb ); + wb.flip(); + messageCopy = wb; + } + + s.increaseWrittenBytes( byteCount ); + s.increaseWrittenWriteRequests(); + + s.getManagerFilterChain().messageSent( s, message ); + s.remoteSession.getManagerFilterChain() + .messageReceived( s.remoteSession, messageCopy ); + + writeRequest.getFuture().setWritten( true ); + } } else { @@ -43,4 +103,5 @@ } } } + } Added: directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=349414&view=auto ============================================================================== --- directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java (added) +++ directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java Mon Nov 28 08:14:55 2005 @@ -0,0 +1,197 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport; + +import java.net.SocketAddress; + +import junit.framework.TestCase; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.TransportType; +import org.apache.mina.registry.Service; +import org.apache.mina.registry.SimpleServiceRegistry; +import org.apache.mina.util.AvailablePortFinder; + +/** + * Abstract base class for testing suspending and resuming reads and + * writes. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public abstract class AbstractTrafficControlTest extends TestCase +{ + protected int port = 0; + protected SimpleServiceRegistry registry; + protected TransportType transportType; + + public AbstractTrafficControlTest( TransportType transportType ) + { + this.transportType = transportType; + } + + protected void setUp() throws Exception + { + super.setUp(); + + port = AvailablePortFinder.getNextAvailable(); + + registry = new SimpleServiceRegistry(); + registry.bind( new Service( "traffic", transportType, + createServerSocketAddress( port ) ), + new ServerIoHandler() ); + + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + registry.unbindAll(); + } + + protected abstract ConnectFuture connect( int port, IoHandler handler) throws Exception; + protected abstract SocketAddress createServerSocketAddress( int port ); + + public void testSuspendResumeReadWrite() throws Exception + { + ConnectFuture future = connect( port, new ClientIoHandler() ); + future.join(); + IoSession session = future.getSession(); + + write( session, "1" ); + Thread.sleep( 250 ); + assertEquals( "1", getReceived( session ) ); + assertEquals( "1", getSent( session ) ); + + session.suspendRead(); + + write( session, "2" ); + Thread.sleep( 250 ); + assertEquals( "1", getReceived( session ) ); + assertEquals( "12", getSent( session ) ); + + session.suspendWrite(); + + write( session, "3" ); + Thread.sleep( 250 ); + assertEquals( "1", getReceived( session ) ); + assertEquals( "12", getSent( session ) ); + + session.resumeRead(); + + write( session, "4" ); + Thread.sleep( 250 ); + assertEquals( "12", getReceived( session ) ); + assertEquals( "12", getSent( session ) ); + + session.resumeWrite(); + + write( session, "5" ); + Thread.sleep( 250 ); + assertEquals( "12345", getReceived( session ) ); + assertEquals( "12345", getSent( session ) ); + + session.suspendWrite(); + + write( session, "6" ); + Thread.sleep( 250 ); + assertEquals( "12345", getReceived( session ) ); + assertEquals( "12345", getSent( session ) ); + + session.suspendRead(); + session.resumeWrite(); + + write( session, "7" ); + Thread.sleep( 250 ); + assertEquals( "12345", getReceived( session ) ); + assertEquals( "1234567", getSent( session ) ); + + session.resumeRead(); + + Thread.sleep( 250 ); + assertEquals( "1234567", getReceived( session ) ); + assertEquals( "1234567", getSent( session ) ); + + session.close().join(); + } + + private void write( IoSession session, String s ) throws Exception + { + session.write( ByteBuffer.wrap( s.getBytes( "ASCII" ) ) ); + } + + private static String getReceived( IoSession session ) + { + return session.getAttribute( "received" ).toString(); + } + + private static String getSent( IoSession session ) + { + return session.getAttribute( "sent" ).toString(); + } + + public static class ClientIoHandler extends IoHandlerAdapter + { + public void sessionCreated( IoSession session ) throws Exception + { + super.sessionCreated( session ); + session.setAttribute( "received", new StringBuffer() ); + session.setAttribute( "sent", new StringBuffer() ); + } + + public void messageReceived( IoSession session, Object message ) throws Exception + { + ByteBuffer buffer = ( ByteBuffer ) message; + byte[] data = new byte[ buffer.remaining() ]; + buffer.get( data ); + StringBuffer sb = ( StringBuffer ) session.getAttribute( "received" ); + sb.append( new String( data, "ASCII" ) ); + } + + public void messageSent( IoSession session, Object message ) throws Exception + { + ByteBuffer buffer = ( ByteBuffer ) message; + buffer.rewind(); + byte[] data = new byte[ buffer.remaining() ]; + buffer.get( data ); + StringBuffer sb = ( StringBuffer ) session.getAttribute( "sent" ); + sb.append( new String( data, "ASCII" ) ); + } + + } + + public static class ServerIoHandler extends IoHandlerAdapter + { + public void messageReceived( IoSession session, Object message ) + throws Exception + { + // Just echo the received bytes. + ByteBuffer rb = ( ByteBuffer ) message; + ByteBuffer wb = ByteBuffer.allocate( rb.remaining() ); + wb.put( rb ); + wb.flip(); + session.write( wb ); + } + } +} Propchange: directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java ------------------------------------------------------------------------------ svn:keywords = Id Added: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java?rev=349414&view=auto ============================================================================== --- directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java (added) +++ directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java Mon Nov 28 08:14:55 2005 @@ -0,0 +1,60 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.TransportType; +import org.apache.mina.transport.AbstractTrafficControlTest; + +/** + * Tests suspending and resuming reads and writes for the + * {@link org.apache.mina.common.TransportType#DATAGRAM} transport type. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Id$ + */ +public class DatagramTrafficControlTest extends AbstractTrafficControlTest +{ + + public DatagramTrafficControlTest() + { + super( TransportType.DATAGRAM ); + } + + protected ConnectFuture connect( int port, IoHandler handler ) + throws Exception + { + IoConnector connector = new DatagramConnector(); + SocketAddress addr = new InetSocketAddress( InetAddress.getLocalHost(), + port ); + return connector.connect( addr, handler ); + } + + protected SocketAddress createServerSocketAddress( int port ) + { + return new InetSocketAddress( port ); + } + +} Propchange: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java ------------------------------------------------------------------------------ svn:keywords = Id Added: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java?rev=349414&view=auto ============================================================================== --- directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java (added) +++ directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java Mon Nov 28 08:14:55 2005 @@ -0,0 +1,60 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.TransportType; +import org.apache.mina.transport.AbstractTrafficControlTest; + +/** + * Tests suspending and resuming reads and writes for the + * {@link org.apache.mina.common.TransportType#SOCKET} transport type. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Id$ + */ +public class SocketTrafficControlTest extends AbstractTrafficControlTest +{ + + public SocketTrafficControlTest() + { + super( TransportType.SOCKET ); + } + + protected ConnectFuture connect( int port, IoHandler handler ) + throws Exception + { + IoConnector connector = new SocketConnector(); + SocketAddress addr = new InetSocketAddress( InetAddress.getLocalHost(), + port ); + return connector.connect( addr, handler ); + } + + protected SocketAddress createServerSocketAddress( int port ) + { + return new InetSocketAddress( port ); + } + +} Propchange: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java ------------------------------------------------------------------------------ svn:keywords = Id Added: directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java?rev=349414&view=auto ============================================================================== --- directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java (added) +++ directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java Mon Nov 28 08:14:55 2005 @@ -0,0 +1,57 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.TransportType; +import org.apache.mina.transport.AbstractTrafficControlTest; + +/** + * Tests suspending and resuming reads and writes for the + * {@link org.apache.mina.common.TransportType#VM_PIPE} transport type. + * + * @author The Apache Directory Project (dev@directory.apache.org) + * @version $Id$ + */ +public class VmPipeTrafficControlTest extends AbstractTrafficControlTest +{ + + public VmPipeTrafficControlTest() + { + super( TransportType.VM_PIPE ); + } + + protected ConnectFuture connect( int port, IoHandler handler ) + throws Exception + { + IoConnector connector = new VmPipeConnector(); + SocketAddress addr = new VmPipeAddress( port ); + return connector.connect( addr, handler ); + } + + protected SocketAddress createServerSocketAddress( int port ) + { + return new VmPipeAddress( port ); + } + +} Propchange: directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java ------------------------------------------------------------------------------ svn:keywords = Id