directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nik...@apache.org
Subject svn commit: r377213 - in /directory/sandbox/trustin/dirmina-158/core/src: main/java/org/apache/mina/filter/StreamWriteFilter.java test/java/org/apache/mina/filter/StreamWriteFilterTest.java
Date Sun, 12 Feb 2006 18:57:05 GMT
Author: niklas
Date: Sun Feb 12 10:57:03 2006
New Revision: 377213

URL: http://svn.apache.org/viewcvs?rev=377213&view=rev
Log:
Updated StreamWriteFilter so that it queues write requests while processing a 
stream instead of throwing an IllegalStateException.

Modified:
    directory/sandbox/trustin/dirmina-158/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
    directory/sandbox/trustin/dirmina-158/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java

Modified: directory/sandbox/trustin/dirmina-158/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
URL: http://svn.apache.org/viewcvs/directory/sandbox/trustin/dirmina-158/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java?rev=377213&r1=377212&r2=377213&view=diff
==============================================================================
--- directory/sandbox/trustin/dirmina-158/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
(original)
+++ directory/sandbox/trustin/dirmina-158/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
Sun Feb 12 10:57:03 2006
@@ -25,6 +25,7 @@
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
+import org.apache.mina.util.Queue;
 
 /**
  * Filter implementation which makes it possible to write {@link InputStream}
@@ -63,6 +64,7 @@
      */
     public static final String CURRENT_STREAM = StreamWriteFilter.class.getName() + ".stream";
 
+    protected static final String WRITE_REQUEST_QUEUE = StreamWriteFilter.class.getName()
+ ".queue";
     protected static final String INITIAL_WRITE_FUTURE = StreamWriteFilter.class.getName()
+ ".future";
 
     private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
@@ -70,10 +72,17 @@
     public void filterWrite( NextFilter nextFilter, IoSession session, 
                             WriteRequest writeRequest ) throws Exception 
     {
-        // Make sure we're not already processing a stream.
+        // If we're already processing a stream we need to queue the WriteRequest.
         if( session.getAttribute( CURRENT_STREAM ) != null )
         {
-            throw new IllegalStateException( "Already processing a stream" );
+            Queue queue = ( Queue ) session.getAttribute( WRITE_REQUEST_QUEUE );
+            if( queue == null )
+            {
+                queue = new Queue();
+                session.setAttribute( WRITE_REQUEST_QUEUE, queue );
+            }
+            queue.push( writeRequest );
+            return;
         }
         
         Object message = writeRequest.getMessage();
@@ -122,6 +131,19 @@
                 // End of stream reached.
                 session.removeAttribute( CURRENT_STREAM );
                 WriteFuture writeFuture = ( WriteFuture ) session.removeAttribute( INITIAL_WRITE_FUTURE
);
+                
+                // Write queued WriteRequests.
+                Queue queue = ( Queue ) session.removeAttribute( WRITE_REQUEST_QUEUE );
+                if( queue != null )
+                {
+                    WriteRequest wr = ( WriteRequest ) queue.pop();
+                    while( wr != null )
+                    {
+                        filterWrite( nextFilter, session, wr );
+                        wr = ( WriteRequest ) queue.pop();
+                    }
+                }
+                
                 writeFuture.setWritten( true );
                 nextFilter.messageSent( session, inputStream );
             }

Modified: directory/sandbox/trustin/dirmina-158/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewcvs/directory/sandbox/trustin/dirmina-158/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java?rev=377213&r1=377212&r2=377213&view=diff
==============================================================================
--- directory/sandbox/trustin/dirmina-158/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
(original)
+++ directory/sandbox/trustin/dirmina-158/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
Sun Feb 12 10:57:03 2006
@@ -34,12 +34,14 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.IoFilter.NextFilter;
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.util.AvailablePortFinder;
+import org.apache.mina.util.Queue;
 import org.easymock.AbstractMatcher;
 import org.easymock.MockControl;
 
@@ -165,6 +167,8 @@
         mockSession.setReturnValue( stream );
         session.removeAttribute( StreamWriteFilter.INITIAL_WRITE_FUTURE );
         mockSession.setReturnValue( writeRequest.getFuture() );
+        session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
+        mockSession.setReturnValue( null );
         nextFilter.messageSent( session, stream );
         
         /*
@@ -225,6 +229,8 @@
         mockSession.setReturnValue( stream );
         session.removeAttribute( StreamWriteFilter.INITIAL_WRITE_FUTURE );
         mockSession.setReturnValue( writeRequest.getFuture() );
+        session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
+        mockSession.setReturnValue( null );
         nextFilter.messageSent( session, stream );
         
         /*
@@ -251,6 +257,7 @@
     {
         StreamWriteFilter filter = new StreamWriteFilter();
         
+        Queue queue = new Queue();
         InputStream stream = new ByteArrayInputStream( new byte[ 5 ] );
         
         /*
@@ -259,6 +266,8 @@
         mockSession.reset();
         session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
         mockSession.setReturnValue( stream );
+        session.getAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
+        mockSession.setReturnValue( queue );
         
         /*
          * Replay.
@@ -266,14 +275,10 @@
         mockNextFilter.replay();
         mockSession.replay();
 
-        try
-        {
-            filter.filterWrite( nextFilter, session, new WriteRequest( new Object() ) );
-            fail( "Alreday processing a stream. IllegalStateException expected." );
-        }
-        catch ( IllegalStateException ise )
-        {
-        }
+        WriteRequest wr = new WriteRequest( new Object() );
+        filter.filterWrite( nextFilter, session, wr );
+        assertEquals( 1, queue.size() );
+        assertSame( wr, queue.pop() );
         
         /*
          * Verify.
@@ -282,6 +287,63 @@
         mockSession.verify();
     }
     
+    public void testWritesWriteRequestQueueWhenFinished() throws Exception
+    {
+        StreamWriteFilter filter = new StreamWriteFilter();
+
+        WriteRequest wrs[] = new WriteRequest[] { 
+                new WriteRequest( new Object() ),
+                new WriteRequest( new Object() ),
+                new WriteRequest( new Object() )
+        };
+        Queue queue = new Queue();
+        queue.push( wrs[ 0 ] );
+        queue.push( wrs[ 1 ] );
+        queue.push( wrs[ 2 ] );
+        InputStream stream = new ByteArrayInputStream( new byte[ 0 ] );
+        
+        /*
+         * Record expectations
+         */
+        mockSession.reset();
+        
+        session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
+        mockSession.setReturnValue( stream );
+        session.removeAttribute( StreamWriteFilter.CURRENT_STREAM );
+        mockSession.setReturnValue( stream );
+        session.removeAttribute( StreamWriteFilter.INITIAL_WRITE_FUTURE );
+        mockSession.setReturnValue( new WriteFuture() );
+        session.removeAttribute( StreamWriteFilter.WRITE_REQUEST_QUEUE );
+        mockSession.setReturnValue( queue );
+        
+        nextFilter.filterWrite( session, wrs[ 0 ] );
+        session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
+        mockSession.setReturnValue( null );
+        nextFilter.filterWrite( session, wrs[ 1 ] );
+        session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
+        mockSession.setReturnValue( null );
+        nextFilter.filterWrite( session, wrs[ 2 ] );
+        session.getAttribute( StreamWriteFilter.CURRENT_STREAM );
+        mockSession.setReturnValue( null );
+        
+        nextFilter.messageSent( session, stream );
+        
+        /*
+         * Replay.
+         */
+        mockNextFilter.replay();
+        mockSession.replay();
+
+        filter.messageSent( nextFilter, session, new Object() );
+        assertEquals( 0, queue.size() );
+        
+        /*
+         * Verify.
+         */
+        mockNextFilter.verify();
+        mockSession.verify();
+    }    
+    
     /**
      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
      * specified size.
@@ -404,12 +466,12 @@
             this.inputStream = inputStream;
         }
 
-        public void sessionCreated(IoSession session) throws Exception {
+        public void sessionCreated( IoSession session ) throws Exception {
             super.sessionCreated( session );
             session.getFilterChain().addLast( "codec", streamWriteFilter );
         }
 
-        public void sessionOpened(IoSession session) throws Exception {
+        public void sessionOpened( IoSession session ) throws Exception {
             session.write( inputStream );
         }
 



Mime
View raw message