Author: tabish
Date: Mon Aug 30 15:47:16 2010
New Revision: 990827
URL: http://svn.apache.org/viewvc?rev=990827&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2883
readLength counter not reset after reading the content of the first BytesMessage.
Added new test that sends multiple BytesMessages to cover this case.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=990827&r1=990826&r2=990827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Mon Aug 30 15:47:16 2010
@@ -45,14 +45,14 @@ import org.apache.activemq.wireformat.Wi
/**
* An implementation of the {@link Transport} interface for using Stomp over NIO
- *
+ *
* @version $Revision$
*/
public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
-
+
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
boolean processedHeaders = false;
@@ -94,10 +94,10 @@ public class StompNIOTransport extends T
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
}
-
+
private void serviceRead() {
try {
-
+
while (true) {
// read channel
int readSize = channel.read(inputBuffer);
@@ -111,12 +111,12 @@ public class StompNIOTransport extends T
if (readSize == 0) {
break;
}
-
+
inputBuffer.flip();
-
+
int b;
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
-
+
int i = 0;
while(i++ < readSize) {
b = input.read();
@@ -124,7 +124,7 @@ public class StompNIOTransport extends T
if (!processedHeaders && previousByte == 0 && b == 0)
{
continue;
}
-
+
if (!processedHeaders) {
currentCommand.write(b);
// end of headers section, parse action and header
@@ -144,7 +144,7 @@ public class StompNIOTransport extends T
currentCommand.reset();
}
} else {
-
+
if (contentLength == -1) {
// end of command reached, unmarshal
if (b == 0) {
@@ -156,31 +156,32 @@ public class StompNIOTransport extends T
// read desired content length
if (readLength++ == contentLength) {
processCommand();
+ readLength = 0;
} else {
currentCommand.write(b);
}
}
}
-
+
previousByte = b;
}
// clear the buffer
inputBuffer.clear();
-
+
}
} catch (IOException e) {
- onException(e);
+ onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
-
+
private void processCommand() throws Exception {
StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
doConsume(frame);
processedHeaders = false;
currentCommand.reset();
- contentLength = -1;
+ contentLength = -1;
}
protected void doStart() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=990827&r1=990826&r2=990827&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Aug 30 15:47:16 2010
@@ -397,7 +397,7 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
-
+
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
@@ -408,21 +408,53 @@ public class StompTest extends Combinati
frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + "
\n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n"
+ Stomp.NULL;
- stompConnection.sendFrame(frame);
+ stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
-
+
assertEquals(5, message.getContent().length);
-
+
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- }
+ }
+
+ public void testSendMultipleBytesMessages() throws Exception {
+
+ final int MSG_COUNT = 50;
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ for( int ix = 0; ix < MSG_COUNT; ix++) {
+ frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5"
+ " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n"
+ Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ for( int ix = 0; ix < MSG_COUNT; ix++) {
+ StompFrame message = stompConnection.receive();
+ assertTrue(message.getAction().startsWith("MESSAGE"));
+
+ String length = message.getHeaders().get("content-length");
+ assertEquals("5", length);
+
+ assertEquals(5, message.getContent().length);
+ }
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|