Author: rajdavies
Date: Fri Jul 20 09:47:40 2007
New Revision: 558044
URL: http://svn.apache.org/viewvc?view=rev&rev=558044
Log:
Applying patch for http://issues.apache.org/activemq/browse/AMQ-1323
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
(with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java?view=auto&rev=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
Fri Jul 20 09:47:40 2007
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+
+public class StompConnection {
+
+ public static final long RECEIVE_TIMEOUT = 10000;
+
+ private Socket stompSocket;
+ private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+
+ public void open(String host, int port) throws IOException, UnknownHostException {
+ stompSocket = new Socket(host, port);
+ }
+
+ public void close() throws IOException {
+ if (stompSocket != null) {
+ stompSocket.close();
+ stompSocket = null;
+ }
+ }
+
+ public void sendFrame(String data) throws Exception {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ outputStream.write(bytes);
+ outputStream.write(0);
+ outputStream.flush();
+ }
+
+ public String receiveFrame() throws Exception {
+ return receiveFrame(RECEIVE_TIMEOUT);
+ }
+
+ private String receiveFrame(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int) timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;) {
+ c = is.read();
+ if (c < 0) {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0) {
+ c = is.read();
+ if (c != '\n') {
+ throw new IOException("Expecting stomp frame to terminate with \0\n");
+ }
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java?view=diff&rev=558044&r1=558043&r2=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
Fri Jul 20 09:47:40 2007
@@ -18,13 +18,9 @@
package org.apache.activemq.transport.stomp;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import javax.jms.Connection;
import javax.jms.Message;
@@ -45,10 +41,11 @@
*/
public class StompSubscriptionRemoveTest extends TestCase {
private static final Log log = LogFactory.getLog(StompSubscriptionRemoveTest.class);
+ private static final String COMMAND_MESSAGE = "MESSAGE";
+ private static final String HEADER_MESSAGE_ID = "message-id";
+ private static final int STOMP_PORT = 61613;
- private Socket stompSocket;
- private ByteArrayOutputStream inputBuffer;
-
+ private StompConnection stompConnection = new StompConnection();
public void testRemoveSubscriber() throws Exception {
BrokerService broker = new BrokerService();
@@ -68,104 +65,64 @@
log.debug("Sending: " + idx);
}
producer.close();
- // consumer.close();
session.close();
connection.close();
- stompSocket = new Socket("localhost", 61613);
- inputBuffer = new ByteArrayOutputStream();
+ stompConnection.open("localhost", STOMP_PORT);
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" +
"\n";
- sendFrame(connect_frame);
+ stompConnection.sendFrame(connect_frame);
- String f = receiveFrame(100000);
+ stompConnection.receiveFrame();
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n"
+ "ack:client\n\n";
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
+
int messagesCount = 0;
int count = 0;
while (count < 2) {
- String receiveFrame = receiveFrame(10000);
- DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes()));
- String line;
- while (true) {
- line = input.readLine();
- if (line == null) {
- throw new IOException("connection was closed");
- }
- else {
- line = line.trim();
- if (line.length() > 0) {
- break;
- }
- }
- }
- line = input.readLine();
- if (line == null) {
- throw new IOException("connection was closed");
- }
- String messageId = line.substring(line.indexOf(':') + 1);
- messageId = messageId.trim();
- String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
- sendFrame(ackmessage);
- log.debug(receiveFrame);
- //Thread.sleep(1000);
+ String receiveFrame = stompConnection.receiveFrame();
+ log.debug("Received: " + receiveFrame);
+ assertEquals("Unexpected frame received", COMMAND_MESSAGE, getCommand(receiveFrame));
+ String messageId = getHeaderValue(receiveFrame, HEADER_MESSAGE_ID);
+ String ackmessage = "ACK\n" + HEADER_MESSAGE_ID + ":" + messageId + "\n\n";
+ stompConnection.sendFrame(ackmessage);
+ // Thread.sleep(1000);
++messagesCount;
++count;
}
- sendFrame("DISCONNECT\n\n");
+ stompConnection.sendFrame("DISCONNECT\n\n");
Thread.sleep(1000);
- stompSocket.close();
+ stompConnection.close();
- stompSocket = new Socket("localhost", 61613);
- inputBuffer = new ByteArrayOutputStream();
+ stompConnection.open("localhost", STOMP_PORT);
connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n";
- sendFrame(connect_frame);
+ stompConnection.sendFrame(connect_frame);
- f = receiveFrame(5000);
+ stompConnection.receiveFrame();
frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
try {
while (count != 2000) {
- String receiveFrame = receiveFrame(5000);
- DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes()));
- String line;
- while (true) {
- line = input.readLine();
- if (line == null) {
- throw new IOException("connection was closed");
- }
- else {
- line = line.trim();
- if (line.length() > 0) {
- break;
- }
- }
- }
-
- line = input.readLine();
- if (line == null) {
- throw new IOException("connection was closed");
- }
- String messageId = line.substring(line.indexOf(':') + 1);
- messageId = messageId.trim();
- String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
- sendFrame(ackmessage);
+ String receiveFrame = stompConnection.receiveFrame();
log.debug("Received: " + receiveFrame);
+ assertEquals("Unexpected frame received", COMMAND_MESSAGE, getCommand(receiveFrame));
+ String messageId = getHeaderValue(receiveFrame, HEADER_MESSAGE_ID);
+ String ackmessage = "ACK\n" + HEADER_MESSAGE_ID + ":" + messageId.trim()
+ "\n\n";
+ stompConnection.sendFrame(ackmessage);
//Thread.sleep(1000);
++messagesCount;
++count;
}
-
}
catch (IOException ex) {
ex.printStackTrace();
}
- sendFrame("DISCONNECT\n\n");
- stompSocket.close();
+ stompConnection.sendFrame("DISCONNECT\n\n");
+ stompConnection.close();
broker.stop();
log.info("Total messages received: " + messagesCount);
@@ -178,36 +135,35 @@
// Subscription without any connections
}
- public void sendFrame(String data) throws Exception {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- outputStream.write(bytes);
- outputStream.write(0);
- outputStream.flush();
+ protected String getDestinationName() {
+ return getClass().getName() + "." + getName();
}
- public String receiveFrame(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int) timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (;;) {
- c = is.read();
- if (c < 0) {
- throw new IOException("socket closed.");
- }
- else if (c == 0) {
- c = is.read();
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else {
- inputBuffer.write(c);
- }
- }
+ // These two methods could move to a utility class
+ protected String getCommand(String frame) {
+ return frame.substring(0, frame.indexOf('\n') + 1).trim();
}
- protected String getDestinationName() {
- return getClass().getName() + "." + getName();
+ protected String getHeaderValue (String frame, String header) throws IOException {
+ DataInput input = new DataInputStream(new ByteArrayInputStream(frame.getBytes()));
+ String line;
+ for (int idx = 0; /*forever, sort of*/; ++idx) {
+ line = input.readLine();
+ if (line == null) {
+ // end of message, no headers
+ return null;
+ }
+ line = line.trim();
+ if (line.length() == 0) {
+ // start body, no headers from here on
+ return null;
+ }
+ if (idx > 0) { // Ignore command line
+ int pos = line.indexOf(':');
+ if (header.equals(line.substring(0, pos))) {
+ return line.substring(pos + 1).trim();
+ }
+ }
+ }
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?view=diff&rev=558044&r1=558043&r2=558044
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Jul 20 09:47:40 2007
@@ -22,14 +22,10 @@
import org.apache.activemq.broker.*;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.transport.stomp.Stomp;
import javax.jms.*;
import javax.jms.Connection;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
@@ -40,8 +36,7 @@
private BrokerService broker;
private TransportConnector connector;
- private Socket stompSocket;
- private ByteArrayOutputStream inputBuffer;
+ private StompConnection stompConnection = new StompConnection();
private Connection connection;
private Session session;
private ActiveMQQueue queue;
@@ -55,8 +50,7 @@
broker.start();
URI connectUri = connector.getConnectUri();
- stompSocket = createSocket(connectUri);
- inputBuffer = new ByteArrayOutputStream();
+ stompConnection.open("127.0.0.1", connectUri.getPort());
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
connection = cf.createConnection();
@@ -66,7 +60,7 @@
}
protected Socket createSocket(URI connectUri) throws IOException {
- return new Socket("127.0.0.1", connectUri.getPort());
+ return new Socket();
}
protected String getQueueName() {
@@ -75,43 +69,10 @@
protected void tearDown() throws Exception {
connection.close();
- if (stompSocket != null) {
- stompSocket.close();
- }
+ stompConnection.close();
broker.stop();
}
- public void sendFrame(String data) throws Exception {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++) {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int) timeOut);
- InputStream is = stompSocket.getInputStream();
- int c=0;
- for(;;) {
- c = is.read();
- if( c < 0 ) {
- throw new IOException("socket closed.");
- } else if( c == 0 ) {
- c = is.read();
- assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- } else {
- inputBuffer.write(c);
- }
- }
- }
-
-
-
public void sendMessage(String msg) throws Exception {
sendMessage(msg, "foo", "xyz");
}
@@ -128,15 +89,14 @@
BytesMessage message = session.createBytesMessage();
message.writeBytes(msg);
producer.send(message);
-
}
public void testConnect() throws Exception {
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" +
"request-id: 1\n" + "\n" + Stomp.NULL;
- sendFrame(connect_frame);
+ stompConnection.sendFrame(connect_frame);
- String f = receiveFrame(10000);
+ String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("response-id:1") >= 0);
@@ -151,9 +111,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -162,7 +122,7 @@
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
@@ -184,9 +144,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -196,7 +156,7 @@
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
@@ -213,9 +173,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -226,7 +186,7 @@
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
@@ -244,9 +204,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -261,7 +221,7 @@
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
@@ -284,9 +244,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(100000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -294,18 +254,18 @@
"destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
sendMessage(getName());
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame =
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
@@ -315,9 +275,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(100000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -325,11 +285,11 @@
"destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
sendBytesMessage(new byte[] {1,2,3,4,5});
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
@@ -343,7 +303,7 @@
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
public void testSubscribeWithMessageSentWithProperties() throws Exception {
@@ -353,9 +313,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(100000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -363,7 +323,7 @@
"destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
MessageProducer producer = session.createProducer(queue);
@@ -378,7 +338,7 @@
message.setShortProperty("s", (short) 12);
producer.send(message);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// System.out.println("out: "+frame);
@@ -387,7 +347,7 @@
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
public void testMessagesAreInOrder() throws Exception {
@@ -399,9 +359,9 @@
"login: brianm\n" +
"passcode: wombats\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(100000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -409,7 +369,7 @@
"destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + i;
@@ -417,7 +377,7 @@
}
for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
+ frame = stompConnection.receiveFrame();
assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
}
@@ -430,7 +390,7 @@
}
for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
+ frame = stompConnection.receiveFrame();
assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
}
@@ -438,7 +398,7 @@
"DISCONNECT\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
@@ -449,9 +409,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(100000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -460,12 +420,12 @@
"selector: foo = 'zzz'\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
sendMessage("Ignored message", "foo", "1234");
sendMessage("Real message", "foo", "zzz");
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real
message") > 0);
@@ -473,7 +433,7 @@
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
}
@@ -484,9 +444,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
@@ -497,25 +457,22 @@
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
sendMessage(getName());
- frame = receiveFrame(10000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame =
"DISCONNECT\n" +
"\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertTrue(message.getJMSRedelivered());
-
-
-
}
public void testUnsubscribe() throws Exception {
@@ -525,8 +482,8 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame =
@@ -534,14 +491,13 @@
"destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
//send a message to our queue
sendMessage("first message");
-
//receive message from socket
- frame = receiveFrame(1000);
+ frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
//remove suscription
@@ -550,7 +506,7 @@
"destination:/queue/" + getQueueName() + "\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
@@ -559,7 +515,7 @@
try {
- frame = receiveFrame(1000);
+ frame = stompConnection.receiveFrame();
log.info("Received frame: " + frame);
fail("No message should have been received since subscription was removed");
}catch (SocketTimeoutException e){
@@ -577,9 +533,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- String f = receiveFrame(1000);
+ String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
frame =
@@ -587,7 +543,7 @@
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"SEND\n" +
@@ -596,14 +552,14 @@
"\n\n" +
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"COMMIT\n" +
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
@@ -619,9 +575,9 @@
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
- String f = receiveFrame(1000);
+ String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
frame =
@@ -629,7 +585,7 @@
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"SEND\n" +
@@ -638,7 +594,7 @@
"\n" +
"first message" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
//rollback first message
frame =
@@ -646,14 +602,14 @@
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"BEGIN\n" +
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"SEND\n" +
@@ -662,14 +618,14 @@
"\n" +
"second message" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
frame =
"COMMIT\n" +
"transaction: tx1\n" +
"\n\n" +
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
// This test case is currently failing
waitForFrameToTakeEffect();
@@ -688,16 +644,15 @@
"passcode: wombats\n\n"+
Stomp.NULL;
- sendFrame(frame);
+ stompConnection.sendFrame(frame);
// This test case is currently failing
waitForFrameToTakeEffect();
assertClients(2);
- // now lets kill the socket
- stompSocket.close();
- stompSocket = null;
+ // now lets kill the stomp connection
+ stompConnection.close();
Thread.sleep(2000);
|