activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r990827 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java test/java/org/apache/activemq/transport/stomp/StompTest.java
Date Mon, 30 Aug 2010 15:47:17 GMT
Author: tabish
Date: Mon Aug 30 15:47:16 2010
New Revision: 990827

URL: http://svn.apache.org/viewvc?rev=990827&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2883

readLength counter not reset after reading the content of the first BytesMessage.

Added new test that sends multiple BytesMessages to cover this case.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=990827&r1=990826&r2=990827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Mon Aug 30 15:47:16 2010
@@ -45,14 +45,14 @@ import org.apache.activemq.wireformat.Wi
 
 /**
  * An implementation of the {@link Transport} interface for using Stomp over NIO
- * 
+ *
  * @version $Revision$
  */
 public class StompNIOTransport extends TcpTransport {
 
     private SocketChannel channel;
     private SelectorSelection selection;
-    
+
     private ByteBuffer inputBuffer;
     ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
     boolean processedHeaders = false;
@@ -94,10 +94,10 @@ public class StompNIOTransport extends T
         this.dataOut = new DataOutputStream(outPutStream);
         this.buffOut = outPutStream;
     }
-    
+
     private void serviceRead() {
         try {
-            
+
            while (true) {
                // read channel
                int readSize = channel.read(inputBuffer);
@@ -111,12 +111,12 @@ public class StompNIOTransport extends T
                if (readSize == 0) {
                    break;
                }
-               
+
                inputBuffer.flip();
-               
+
                int b;
                ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
-               
+
                int i = 0;
                while(i++ < readSize) {
                    b = input.read();
@@ -124,7 +124,7 @@ public class StompNIOTransport extends T
                    if (!processedHeaders && previousByte == 0 && b == 0)
{
                        continue;
                    }
-                   
+
                    if (!processedHeaders) {
                        currentCommand.write(b);
                        // end of headers section, parse action and header
@@ -144,7 +144,7 @@ public class StompNIOTransport extends T
                            currentCommand.reset();
                        }
                    } else {
-                       
+
                        if (contentLength == -1) {
                            // end of command reached, unmarshal
                            if (b == 0) {
@@ -156,31 +156,32 @@ public class StompNIOTransport extends T
                            // read desired content length
                            if (readLength++ == contentLength) {
                                processCommand();
+                               readLength = 0;
                            } else {
                                currentCommand.write(b);
                            }
                        }
                    }
-                   
+
                    previousByte = b;
                }
                // clear the buffer
                inputBuffer.clear();
-               
+
            }
         } catch (IOException e) {
-            onException(e);  
+            onException(e);
         } catch (Throwable e) {
             onException(IOExceptionSupport.create(e));
         }
     }
-    
+
     private void processCommand() throws Exception {
         StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
         doConsume(frame);
         processedHeaders = false;
         currentCommand.reset();
-        contentLength = -1;       
+        contentLength = -1;
     }
 
     protected void doStart() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=990827&r1=990826&r2=990827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Aug 30 15:47:16 2010
@@ -397,7 +397,7 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
-    
+
     public void testBytesMessageWithNulls() throws Exception {
 
         String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
@@ -408,21 +408,53 @@ public class StompTest extends Combinati
 
         frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + "
\n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
         stompConnection.sendFrame(frame);
-        
+
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n"
+ Stomp.NULL;
-        stompConnection.sendFrame(frame);        
+        stompConnection.sendFrame(frame);
 
         StompFrame message = stompConnection.receive();
         assertTrue(message.getAction().startsWith("MESSAGE"));
 
         String length = message.getHeaders().get("content-length");
         assertEquals("5", length);
-        
+
         assertEquals(5, message.getContent().length);
-        
+
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
-    }    
+    }
+
+    public void testSendMultipleBytesMessages() throws Exception {
+
+    	final int MSG_COUNT = 50;
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        for( int ix = 0; ix < MSG_COUNT; ix++) {
+            frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5"
+ " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
+            stompConnection.sendFrame(frame);
+        }
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        for( int ix = 0; ix < MSG_COUNT; ix++) {
+            StompFrame message = stompConnection.receive();
+            assertTrue(message.getAction().startsWith("MESSAGE"));
+
+            String length = message.getHeaders().get("content-length");
+            assertEquals("5", length);
+
+            assertEquals(5, message.getContent().length);
+        }
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
 
     public void testSubscribeWithMessageSentWithProperties() throws Exception {
 



Mime
View raw message