directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nik...@apache.org
Subject svn commit: r349414 - in /directory/network/trunk/src: java/org/apache/mina/transport/socket/nio/support/ java/org/apache/mina/transport/vmpipe/ java/org/apache/mina/transport/vmpipe/support/ test/org/apache/mina/transport/ test/org/apache/mina/transpo...
Date Mon, 28 Nov 2005 16:15:36 GMT
Author: niklas
Date: Mon Nov 28 08:14:55 2005
New Revision: 349414

URL: http://svn.apache.org/viewcvs?rev=349414&view=rev
Log:
Added support for traffic control for vmpipe and datagram transports

Added:
    directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java
  (with props)
    directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java
  (with props)
    directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java
  (with props)
    directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/
    directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java
  (with props)
Removed:
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java
Modified:
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
Mon Nov 28 08:14:55 2005
@@ -574,6 +574,13 @@
         }
     }
     
+    public void updateTrafficMask( DatagramSessionImpl session )
+    {
+        // There's no point in changing the traffic mask for sessions originating
+        // from this acceptor since new sessions are created every time data is
+        // received.
+    }
+
     public IoFilterChain getFilterChain()
     {
         return filters;

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
Mon Nov 28 08:14:55 2005
@@ -60,6 +60,8 @@
 
     private final Queue flushingSessions = new Queue();
 
+    private final Queue trafficControllingSessions = new Queue();
+    
     private Worker worker;
 
     /**
@@ -178,6 +180,70 @@
         }
     }
 
+    public void updateTrafficMask( DatagramSessionImpl session )
+    {
+        scheduleTrafficControl( session );
+        selector.wakeup();
+    }
+    
+    private void scheduleTrafficControl( DatagramSessionImpl session )
+    {
+        synchronized( trafficControllingSessions )
+        {
+            trafficControllingSessions.push( session );
+        }
+    }
+    
+    private void doUpdateTrafficMask() 
+    {
+        if( trafficControllingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            DatagramSessionImpl session;
+
+            synchronized( trafficControllingSessions )
+            {
+                session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleTrafficControl( session );
+                break;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, set OP_WRITE to trigger flushing.
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( !writeRequestQueue.isEmpty() )
+                {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getTrafficMask().getInterestOps();
+            key.interestOps( ops & mask );
+        }
+    }
+    
     private class Worker extends Thread
     {
         public Worker()
@@ -194,6 +260,7 @@
                     int nKeys = selector.select();
 
                     registerNew();
+                    doUpdateTrafficMask();
 
                     if( nKeys > 0 )
                     {
@@ -256,12 +323,12 @@
 
             DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
 
-            if( key.isReadable() )
+            if( key.isReadable() && session.getTrafficMask().isReadable() )
             {
                 readSession( session );
             }
 
-            if( key.isWritable() )
+            if( key.isWritable() && session.getTrafficMask().isWritable() )
             {
                 scheduleFlush( session );
             }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
Mon Nov 28 08:14:55 2005
@@ -56,7 +56,7 @@
      * Creates a new instance.
      */
     DatagramSessionImpl( IoSessionManagerFilterChain managerFilterChain,
-                     DatagramChannel ch, IoHandler defaultHandler )
+                         DatagramChannel ch, IoHandler defaultHandler )
     {
         this.managerFilterChain = managerFilterChain;
         this.filterChain = new IoSessionFilterChain( this, managerFilterChain );
@@ -167,6 +167,8 @@
 
     protected void updateTrafficMask()
     {
-        // TODO: Implement me.
+        DatagramSessionManager sessionManager = 
+            ( DatagramSessionManager) managerFilterChain.getManager();
+        sessionManager.updateTrafficMask( this );
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
Mon Nov 28 08:14:55 2005
@@ -39,4 +39,10 @@
      * This method is invoked by MINA internally.
      */
     void closeSession( DatagramSessionImpl session );
+    
+    /**
+     * Requests this processor to update the traffic mask for the specified
+     * session. This method is invoked by MINA internally.
+     */
+    void updateTrafficMask( DatagramSessionImpl session );    
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
Mon Nov 28 08:14:55 2005
@@ -28,7 +28,7 @@
         synchronized( writeRequestQueue )
         {
             writeRequestQueue.push( writeRequest );
-            if( writeRequestQueue.size() == 1 )
+            if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable()
)
             {
                 // Notify DatagramSessionManager only when writeRequestQueue was empty.
                 ( ( DatagramSessionManager ) getManager() ).flushSession( s );

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
Mon Nov 28 08:14:55 2005
@@ -12,7 +12,6 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.support.BaseIoAcceptor;
 import org.apache.mina.transport.vmpipe.support.VmPipe;
-import org.apache.mina.transport.vmpipe.support.VmPipeFilter;
 import org.apache.mina.transport.vmpipe.support.VmPipeSessionManagerFilterChain;
 
 /**
@@ -29,21 +28,6 @@
     private final VmPipeSessionManagerFilterChain filterChain =
         new VmPipeSessionManagerFilterChain( this );
 
-    /**
-     * Creates a new instance.
-     */
-    public VmPipeAcceptor()
-    {
-        try
-        {
-            filterChain.addFirst( "VMPipe", new VmPipeFilter() );
-        }
-        catch( Exception e ) 
-        {
-            throw ( InternalError ) new InternalError( "Unexpected exception caught." ).initCause(
e );
-        }
-    }
-    
     public void bind( SocketAddress address, IoHandler handler ) throws IOException
     {
         if( address == null )

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Mon Nov 28 08:14:55 2005
@@ -11,8 +11,6 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.support.BaseIoConnector;
 import org.apache.mina.transport.vmpipe.support.VmPipe;
-import org.apache.mina.transport.vmpipe.support.VmPipeFilter;
-import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
 import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
 import org.apache.mina.transport.vmpipe.support.VmPipeSessionManagerFilterChain;
 import org.apache.mina.util.AnonymousSocketAddress;
@@ -29,21 +27,6 @@
     private final VmPipeSessionManagerFilterChain filterChain =
         new VmPipeSessionManagerFilterChain( this );
 
-    /**
-     * Creates a new instance.
-     */
-    public VmPipeConnector()
-    {
-        try
-        {
-            filterChain.addFirst( "VMPipe", new VmPipeFilter() );
-        }
-        catch( Exception e ) 
-        {
-            throw ( InternalError ) new InternalError( "Unexpected exception caught." ).initCause(
e );
-        }
-    }
-    
     public IoFilterChain getFilterChain()
     {
         return filterChain;
@@ -74,7 +57,6 @@
                                                    handler,
                                                    entry );
 
-        VmPipeIdleStatusChecker.getInstance().addSession( session );
         ConnectFuture future = new ConnectFuture();
         future.setSession( session );
         return future;

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
Mon Nov 28 08:14:55 2005
@@ -18,6 +18,7 @@
 import org.apache.mina.filter.codec.ProtocolEncoder;
 import org.apache.mina.transport.vmpipe.VmPipeSession;
 import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
 
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
@@ -34,6 +35,7 @@
     private final VmPipeSessionManagerFilterChain managerFilterChain;
     final VmPipeSessionImpl remoteSession;
     final Object lock;
+    final Queue pendingDataQueue;
 
     /**
      * Constructor for client-side session.
@@ -49,6 +51,7 @@
         this.handler = handler;
         this.filterChain = new IoSessionFilterChain( this, managerFilterChain );
         this.managerFilterChain = managerFilterChain;
+        this.pendingDataQueue = new Queue();
 
         remoteSession = new VmPipeSessionImpl( this, remoteEntry );
         
@@ -75,6 +78,9 @@
             ExceptionUtil.throwException( t );
         }
 
+        VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
+        VmPipeIdleStatusChecker.getInstance().addSession( this );
+        
         remoteEntry.getManagerFilterChain().sessionOpened( remoteSession );
         managerFilterChain.sessionOpened( this );
     }
@@ -91,6 +97,7 @@
         this.managerFilterChain = entry.getManagerFilterChain();
         this.filterChain = new IoSessionFilterChain( this, entry.getManagerFilterChain()
);
         this.remoteSession = remoteSession;
+        this.pendingDataQueue = new Queue();
     }
     
     VmPipeSessionManagerFilterChain getManagerFilterChain()
@@ -150,6 +157,27 @@
 
     protected void updateTrafficMask()
     {
-        // TODO: Implement me.
+        if( getTrafficMask().isReadable() || getTrafficMask().isWritable())
+        {
+            Object[] data = null;
+            synchronized( pendingDataQueue )
+            {
+                data = pendingDataQueue.toArray();
+                pendingDataQueue.clear();
+            }
+            
+            for( int i = 0; i < data.length; i++ )
+            {
+                if( data[ i ] instanceof WriteRequest )
+                {
+                    WriteRequest wr = ( WriteRequest ) data[ i ];
+                    managerFilterChain.doWrite( this, wr );
+                }
+                else
+                {
+                    managerFilterChain.messageReceived( this, data[ i ] );
+                }
+            }
+        }
     }
-}
\ No newline at end of file
+}

Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java?rev=349414&r1=349413&r2=349414&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java
(original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java
Mon Nov 28 08:14:55 2005
@@ -1,5 +1,6 @@
 package org.apache.mina.transport.vmpipe.support;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionManager;
@@ -13,6 +14,33 @@
         super( manager );
     }
 
+    public void messageReceived( IoSession session, Object message )
+    {
+        VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
+        synchronized( s.lock )
+        {
+            if( !s.getTrafficMask().isReadable() )
+            {
+                synchronized( s.pendingDataQueue )
+                {
+                    s.pendingDataQueue.push( message );
+                }
+            }
+            else
+            {
+                int byteCount = 1;
+                if( message instanceof ByteBuffer )
+                {
+                    byteCount = ( ( ByteBuffer ) message ).remaining();
+                }
+                
+                s.increaseReadBytes( byteCount );
+                
+                super.messageReceived( s, message );
+            }
+        }
+    }
+
     protected void doWrite( IoSession session, WriteRequest writeRequest )
     {
         VmPipeSessionImpl s = ( VmPipeSessionImpl ) session;
@@ -20,8 +48,40 @@
         {
             if( s.isConnected() )
             {
-                s.remoteSession.getManagerFilterChain().messageReceived( s.remoteSession,
writeRequest.getMessage() );
-                writeRequest.getFuture().setWritten( true );
+                
+                if( !s.getTrafficMask().isWritable() )
+                {
+                    synchronized( s.pendingDataQueue )
+                    {
+                        s.pendingDataQueue.push( writeRequest );
+                    }
+                }
+                else
+                {
+                
+                    Object message = writeRequest.getMessage();
+                    
+                    int byteCount = 1;
+                    Object messageCopy = message;
+                    if( message instanceof ByteBuffer )
+                    {
+                        ByteBuffer rb = ( ByteBuffer ) message;
+                        byteCount = rb.remaining();
+                        ByteBuffer wb = ByteBuffer.allocate( rb.remaining() );
+                        wb.put( rb );
+                        wb.flip();
+                        messageCopy = wb;
+                    }
+                    
+                    s.increaseWrittenBytes( byteCount );
+                    s.increaseWrittenWriteRequests();
+    
+                    s.getManagerFilterChain().messageSent( s, message );
+                    s.remoteSession.getManagerFilterChain()
+                                .messageReceived( s.remoteSession, messageCopy );
+                    
+                    writeRequest.getFuture().setWritten( true );
+                }
             }
             else 
             {
@@ -43,4 +103,5 @@
             }
         }
     }
+    
 }

Added: directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=349414&view=auto
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java
(added)
+++ directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java
Mon Nov 28 08:14:55 2005
@@ -0,0 +1,197 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport;
+
+import java.net.SocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.registry.Service;
+import org.apache.mina.registry.SimpleServiceRegistry;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * Abstract base class for testing suspending and resuming reads and
+ * writes.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractTrafficControlTest extends TestCase
+{
+    protected int port = 0;
+    protected SimpleServiceRegistry registry;
+    protected TransportType transportType;
+    
+    public AbstractTrafficControlTest( TransportType transportType )
+    {
+        this.transportType = transportType;
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        
+        port = AvailablePortFinder.getNextAvailable();
+        
+        registry = new SimpleServiceRegistry();
+        registry.bind( new Service( "traffic", transportType, 
+                                    createServerSocketAddress( port ) ), 
+                       new ServerIoHandler() );
+        
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        
+        registry.unbindAll();
+    }
+
+    protected abstract ConnectFuture connect( int port, IoHandler handler) throws Exception;
+    protected abstract SocketAddress createServerSocketAddress( int port );
+    
+    public void testSuspendResumeReadWrite() throws Exception
+    {
+        ConnectFuture future = connect( port, new ClientIoHandler() );
+        future.join();
+        IoSession session = future.getSession();
+        
+        write( session, "1" );
+        Thread.sleep( 250 );
+        assertEquals( "1", getReceived( session ) );
+        assertEquals( "1", getSent( session ) );
+        
+        session.suspendRead();
+        
+        write( session, "2" );
+        Thread.sleep( 250 );
+        assertEquals( "1", getReceived( session ) );
+        assertEquals( "12", getSent( session ) );
+        
+        session.suspendWrite();
+        
+        write( session, "3" );
+        Thread.sleep( 250 );
+        assertEquals( "1", getReceived( session ) );
+        assertEquals( "12", getSent( session ) );
+        
+        session.resumeRead();
+        
+        write( session, "4" );
+        Thread.sleep( 250 );
+        assertEquals( "12", getReceived( session ) );
+        assertEquals( "12", getSent( session ) );
+        
+        session.resumeWrite();
+        
+        write( session, "5" );
+        Thread.sleep( 250 );
+        assertEquals( "12345", getReceived( session ) );
+        assertEquals( "12345", getSent( session ) );
+        
+        session.suspendWrite();
+        
+        write( session, "6" );
+        Thread.sleep( 250 );
+        assertEquals( "12345", getReceived( session ) );
+        assertEquals( "12345", getSent( session ) );
+        
+        session.suspendRead();
+        session.resumeWrite();
+        
+        write( session, "7" );
+        Thread.sleep( 250 );
+        assertEquals( "12345", getReceived( session ) );
+        assertEquals( "1234567", getSent( session ) );
+        
+        session.resumeRead();
+        
+        Thread.sleep( 250 );
+        assertEquals( "1234567", getReceived( session ) );
+        assertEquals( "1234567", getSent( session ) );
+        
+        session.close().join();
+    }
+
+    private void write( IoSession session, String s ) throws Exception
+    {
+        session.write( ByteBuffer.wrap( s.getBytes( "ASCII" ) ) );
+    }
+    
+    private static String getReceived( IoSession session ) 
+    {
+        return session.getAttribute( "received" ).toString();
+    }
+    
+    private static String getSent( IoSession session ) 
+    {
+        return session.getAttribute( "sent" ).toString();
+    }
+    
+    public static class ClientIoHandler extends IoHandlerAdapter
+    {
+        public void sessionCreated( IoSession session ) throws Exception
+        {
+            super.sessionCreated( session );
+            session.setAttribute( "received", new StringBuffer() );
+            session.setAttribute( "sent", new StringBuffer() );
+        }
+
+        public void messageReceived( IoSession session, Object message ) throws Exception
+        {
+            ByteBuffer buffer = ( ByteBuffer ) message;
+            byte[] data = new byte[ buffer.remaining() ];
+            buffer.get( data );
+            StringBuffer sb = ( StringBuffer ) session.getAttribute( "received" );
+            sb.append( new String( data, "ASCII" ) );
+        }
+
+        public void messageSent( IoSession session, Object message ) throws Exception
+        {
+            ByteBuffer buffer = ( ByteBuffer ) message;
+            buffer.rewind();
+            byte[] data = new byte[ buffer.remaining() ];
+            buffer.get( data );
+            StringBuffer sb = ( StringBuffer ) session.getAttribute( "sent" );
+            sb.append( new String( data, "ASCII" ) );
+        }
+        
+    }
+    
+    public static class ServerIoHandler extends IoHandlerAdapter
+    {
+        public void messageReceived( IoSession session, Object message )
+                throws Exception
+        {
+            // Just echo the received bytes.
+            ByteBuffer rb = ( ByteBuffer ) message;
+            ByteBuffer wb = ByteBuffer.allocate( rb.remaining() );
+            wb.put( rb );
+            wb.flip();
+            session.write( wb );                
+        }    
+    }    
+}

Propchange: directory/network/trunk/src/test/org/apache/mina/transport/AbstractTrafficControlTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java?rev=349414&view=auto
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java
(added)
+++ directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java
Mon Nov 28 08:14:55 2005
@@ -0,0 +1,60 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.transport.AbstractTrafficControlTest;
+
+/**
+ * Tests suspending and resuming reads and writes for the
+ * {@link org.apache.mina.common.TransportType#DATAGRAM} transport type. 
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Id$
+ */
+public class DatagramTrafficControlTest extends AbstractTrafficControlTest
+{
+
+    public DatagramTrafficControlTest()
+    {
+        super( TransportType.DATAGRAM );
+    }
+
+    protected ConnectFuture connect( int port, IoHandler handler )
+            throws Exception
+    {
+        IoConnector connector = new DatagramConnector();
+        SocketAddress addr = new InetSocketAddress( InetAddress.getLocalHost(), 
+                                                    port );
+        return connector.connect( addr, handler );
+    }
+
+    protected SocketAddress createServerSocketAddress( int port )
+    {
+        return new InetSocketAddress( port );
+    }
+
+}

Propchange: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/DatagramTrafficControlTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java?rev=349414&view=auto
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java
(added)
+++ directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java
Mon Nov 28 08:14:55 2005
@@ -0,0 +1,60 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.transport.AbstractTrafficControlTest;
+
+/**
+ * Tests suspending and resuming reads and writes for the
+ * {@link org.apache.mina.common.TransportType#SOCKET} transport type. 
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Id$
+ */
+public class SocketTrafficControlTest extends AbstractTrafficControlTest
+{
+
+    public SocketTrafficControlTest()
+    {
+        super( TransportType.SOCKET );
+    }
+
+    protected ConnectFuture connect( int port, IoHandler handler )
+            throws Exception
+    {
+        IoConnector connector = new SocketConnector();
+        SocketAddress addr = new InetSocketAddress( InetAddress.getLocalHost(), 
+                                                    port );
+        return connector.connect( addr, handler );
+    }
+
+    protected SocketAddress createServerSocketAddress( int port )
+    {
+        return new InetSocketAddress( port );
+    }
+
+}

Propchange: directory/network/trunk/src/test/org/apache/mina/transport/socket/nio/SocketTrafficControlTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java?rev=349414&view=auto
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java
(added)
+++ directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java
Mon Nov 28 08:14:55 2005
@@ -0,0 +1,57 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.transport.AbstractTrafficControlTest;
+
+/**
+ * Tests suspending and resuming reads and writes for the
+ * {@link org.apache.mina.common.TransportType#VM_PIPE} transport type. 
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Id$
+ */
+public class VmPipeTrafficControlTest extends AbstractTrafficControlTest
+{
+
+    public VmPipeTrafficControlTest()
+    {
+        super( TransportType.VM_PIPE );
+    }
+
+    protected ConnectFuture connect( int port, IoHandler handler )
+            throws Exception
+    {
+        IoConnector connector = new VmPipeConnector();
+        SocketAddress addr = new VmPipeAddress( port );
+        return connector.connect( addr, handler );
+    }
+
+    protected SocketAddress createServerSocketAddress( int port )
+    {
+        return new VmPipeAddress( port );
+    }
+
+}

Propchange: directory/network/trunk/src/test/org/apache/mina/transport/vmpipe/VmPipeTrafficControlTest.java
------------------------------------------------------------------------------
    svn:keywords = Id



Mime
View raw message