Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B3DB200BFA for ; Wed, 7 Dec 2016 19:18:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 19EC7160B42; Wed, 7 Dec 2016 18:18:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FAFC160B3C for ; Wed, 7 Dec 2016 19:18:08 +0100 (CET) Received: (qmail 36085 invoked by uid 500); 7 Dec 2016 18:18:07 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 34233 invoked by uid 99); 7 Dec 2016 18:18:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 18:18:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB828E0C09; Wed, 7 Dec 2016 18:18:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 07 Dec 2016 18:18:27 -0000 Message-Id: <2fa3aee50f5c4f33a8649c8558341183@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/55] [abbrv] activemq-artemis git commit: Stomp refactor + track autocreation for addresses archived-at: Wed, 07 Dec 2016 18:18:11 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58934fed/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 90d18ae..e7dcc91 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -25,6 +25,7 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -40,50 +41,73 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class StompTest extends StompTestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + protected StompClientConnection conn; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + } + + @Override + @After + public void tearDown() throws Exception { + try { + boolean connected = conn != null && conn.isConnected(); + log.debug("Connection 1.0 connected: " + connected); + if (connected) { + conn.disconnect(); + } + } finally { + super.tearDown(); + } + } @Test public void testConnectionTTL() throws Exception { - int index = 1; int port = 61614; server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start(); - createBootstrap(index, port); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(index, frame); - frame = receiveFrame(index, 10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect("brianm", "wombats"); Thread.sleep(5000); - assertTrue(receiveFrame(index, 10000).indexOf(Stomp.Responses.ERROR) != -1); + ClientStompFrame frame = conn.receiveFrame(); + + assertEquals(Stomp.Responses.ERROR, frame.getCommand()); - assertChannelClosed(index); + assertFalse(conn.isConnected()); } @Test public void testSendManyMessages() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); + conn.connect(defUser, defPass); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); + MessageConsumer consumer = session.createConsumer(queue); - Assert.assertTrue(frame.startsWith("CONNECTED")); int count = 1000; final CountDownLatch latch = new CountDownLatch(count); consumer.setMessageListener(new MessageListener() { @@ -94,11 +118,8 @@ public class StompTest extends StompTestBase { } }); - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; for (int i = 1; i <= count; i++) { - // Thread.sleep(1); - // System.out.println(">>> " + i); - sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); } assertTrue(latch.await(60, TimeUnit.SECONDS)); @@ -110,11 +131,7 @@ public class StompTest extends StompTestBase { try { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); int count = 1000; final CountDownLatch latch = new CountDownLatch(count); consumer.setMessageListener(new MessageListener() { @@ -125,13 +142,14 @@ public class StompTest extends StompTestBase { } }); - ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick(); + ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor() + .setMaxUsage(0) + .tick(); - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; for (int i = 1; i <= count; i++) { // Thread.sleep(1); - // System.out.println(">>> " + i); - sendFrame(frame); + // log.info(">>> " + i); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); } // It should encounter the exception on logs @@ -140,52 +158,44 @@ public class StompTest extends StompTestBase { AssertionLoggerHandler.clear(); AssertionLoggerHandler.stopCapture(); } - } @Test public void testConnect() throws Exception { - - String connect_frame = "CONNECT\n" + "login: brianm\n" + - "passcode: wombats\n" + - "request-id: 1\n" + - "\n" + - Stomp.NULL; - sendFrame(connect_frame); - - String f = receiveFrame(10000); - Assert.assertTrue(f.startsWith("CONNECTED")); - Assert.assertTrue(f.indexOf("response-id:1") >= 0); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.LOGIN, defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, defPass) + .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1"); + ClientStompFrame response = conn.sendFrame(frame); + + Assert.assertTrue(response.getCommand() + .equals(Stomp.Responses.CONNECTED)); + Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID) + .equals("1")); } @Test public void testDisconnectAndError() throws Exception { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.LOGIN, defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, defPass) + .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1"); + ClientStompFrame response = conn.sendFrame(frame); - String connectFrame = "CONNECT\n" + "login: brianm\n" + - "passcode: wombats\n" + - "request-id: 1\n" + - "\n" + - Stomp.NULL; - sendFrame(connectFrame); - - String f = receiveFrame(10000); - Assert.assertTrue(f.startsWith("CONNECTED")); - Assert.assertTrue(f.indexOf("response-id:1") >= 0); + Assert.assertTrue(response.getCommand() + .equals(Stomp.Responses.CONNECTED)); + Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID) + .equals("1")); - String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL; - sendFrame(disconnectFrame); - - waitForFrameToTakeEffect(); + conn.disconnect(); // sending a message will result in an error - String frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; - - assertChannelClosed(); + try { + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); + fail("Should have thrown an exception since connection is disconnected"); + } catch (Exception e) { + // ignore + } } @Test @@ -193,15 +203,9 @@ public class StompTest extends StompTestBase { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + conn.connect(defUser, defPass); - sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -218,21 +222,16 @@ public class StompTest extends StompTestBase { @Test public void sendSTOMPReceiveMQTT() throws Exception { - String address = "myTestAddress"; - // Set up MQTT Subscription MQTTClientProvider clientProvider = new FuseMQTTClientProvider(); - clientProvider.connect("tcp://localhost:61616"); - clientProvider.subscribe(address, 0); + clientProvider.connect("tcp://" + hostname + ":" + port); + clientProvider.subscribe(getTopicPrefix() + getTopicName(), 0); String stompPayload = "This is a test message"; // Set up STOMP connection and send STOMP Message - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + send(conn, getTopicPrefix() + getTopicName(), null, stompPayload); // Receive MQTT Message byte[] mqttPayload = clientProvider.receive(10000); @@ -244,44 +243,30 @@ public class StompTest extends StompTestBase { @Test public void sendMQTTReceiveSTOMP() throws Exception { - String address = "myTestAddress"; String payload = "This is a test message"; - server.getActiveMQServer().createQueue(new SimpleString(address), new SimpleString(address), null, false, false); - // Set up STOMP subscription - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SUBSCRIBE\n" + "destination:" + address + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); - receiveFrame(1000); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); // Send MQTT Message MQTTClientProvider clientProvider = new FuseMQTTClientProvider(); - clientProvider.connect("tcp://localhost:61616"); - clientProvider.publish(address, payload.getBytes(), 0); + clientProvider.connect("tcp://" + hostname + ":" + port); + clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0); clientProvider.disconnect(); // Receive STOMP Message - frame = receiveFrame(1000); - assertTrue(frame.contains(payload)); + ClientStompFrame frame = conn.receiveFrame(); + assertTrue(frame.getBody() + .contains(payload)); } @Test public void testSendMessageToNonExistentQueue() throws Exception { String nonExistentQueue = RandomUtil.randomString(); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistentQueue + "\n\n" + "Hello World" + Stomp.NULL; - - sendFrame(frame); - receiveFrame(1000); + conn.connect(defUser, defPass); + send(conn, getQueuePrefix() + nonExistentQueue, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST); MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistentQueue)); TextMessage message = (TextMessage) consumer.receive(1000); @@ -297,29 +282,26 @@ public class StompTest extends StompTestBase { Assert.assertTrue(Math.abs(tnow - tmsg) < 1500); // closing the consumer here should trigger auto-deletion - assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue))); + assertNotNull(server.getActiveMQServer() + .getPostOffice() + .getBinding(new SimpleString(nonExistentQueue))); consumer.close(); - assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue))); + assertNull(server.getActiveMQServer() + .getPostOffice() + .getBinding(new SimpleString(nonExistentQueue))); } @Test public void testSendMessageToNonExistentTopic() throws Exception { String nonExistentTopic = RandomUtil.randomString(); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); // first send a message to ensure that sending to a non-existent topic won't throw an error - frame = "SEND\n" + "destination:" + getTopicPrefix() + nonExistentTopic + "\n\n" + "Hello World" + Stomp.NULL; - sendFrame(frame); - receiveFrame(1000); + send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true, AddressInfo.RoutingType.MULTICAST); // create a subscription on the topic and send/receive another message MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createTopic(nonExistentTopic)); - sendFrame(frame); - receiveFrame(1000); + send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); @@ -332,11 +314,14 @@ public class StompTest extends StompTestBase { long tmsg = message.getJMSTimestamp(); Assert.assertTrue(Math.abs(tnow - tmsg) < 1500); - assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic))); + assertNotNull(server.getActiveMQServer() + .getAddressInfo(new SimpleString(nonExistentTopic))); - // closing the consumer here should trigger auto-deletion of the topic + // closing the consumer here should trigger auto-deletion of the subscription queue and address consumer.close(); - assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic))); + Thread.sleep(200); + assertNull(server.getActiveMQServer() + .getAddressInfo(new SimpleString(nonExistentTopic))); } /* @@ -346,29 +331,22 @@ public class StompTest extends StompTestBase { */ @Test public void testSendMessageWithLeadingNewLine() throws Exception { + conn.connect(defUser, defPass); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody("Hello World"); + conn.sendWickedFrame(frame); MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL + "\n"; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL + - "\n"; - - sendFrame(frame); - TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); + message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + // Make sure that the timestamp is valid - should // be very close to the current time. long tnow = System.currentTimeMillis(); @@ -380,25 +358,9 @@ public class StompTest extends StompTestBase { public void testSendMessageWithReceipt() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt: 1234\n\n" + - "Hello World" + - Stomp.NULL; + conn.connect(defUser, defPass); - sendFrame(frame); - - String f = receiveFrame(10000); - Assert.assertTrue(f.startsWith("RECEIPT")); - Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -413,30 +375,19 @@ public class StompTest extends StompTestBase { @Test public void testSendMessageWithContentLength() throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); byte[] data = new byte[]{1, 0, 0, 4}; - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "content-length:" + - data.length + - "\n\n"; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(frame.getBytes(StandardCharsets.UTF_8)); baos.write(data); - baos.write('\0'); baos.flush(); - sendFrame(new String(baos.toByteArray())); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length)) + .setBody(new String(baos.toByteArray())); + conn.sendFrame(frame); BytesMessage message = (BytesMessage) consumer.receive(10000); Assert.assertNotNull(message); @@ -449,30 +400,22 @@ public class StompTest extends StompTestBase { @Test public void testJMSXGroupIdCanBeSet() throws Exception { - + final String jmsxGroupID = "JMSXGroupID"; MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "JMSXGroupID: TEST\n\n" + - "Hello World" + - Stomp.NULL; + conn.connect(defUser, defPass); - sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("JMSXGroupID", jmsxGroupID) + .setBody("Hello World"); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); // differ from StompConnect - Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID")); + Assert.assertEquals(jmsxGroupID, message.getStringProperty("JMSXGroupID")); } @Test @@ -480,22 +423,14 @@ public class StompTest extends StompTestBase { MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "foo:abc\n" + - "bar:123\n" + - "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; - - sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .setBody("Hello World"); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -509,22 +444,14 @@ public class StompTest extends StompTestBase { MessageConsumer consumer = session.createConsumer(queue, "hyphenated_props:b-ar = '123'"); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "foo:abc\n" + - "b-ar:123\n" + - "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; + conn.connect(defUser, defPass); - sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("b-ar", "123") + .setBody("Hello World"); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -538,27 +465,19 @@ public class StompTest extends StompTestBase { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SEND\n" + "correlation-id:c123\n" + - "persistent:true\n" + - "priority:3\n" + - "type:t345\n" + - "JMSXGroupID:abc\n" + - "foo:abc\n" + - "bar:123\n" + - "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; - - sendFrame(frame); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("type", "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("priority", "3") + .setBody("Hello World"); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -579,33 +498,25 @@ public class StompTest extends StompTestBase { public void testSendMessageWithLongHeaders() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < 1024; i++) { buffer.append("a"); } - String longHeader = "longHeader:" + buffer.toString() + "\n"; - - frame = "SEND\n" + "correlation-id:c123\n" + - "persistent:true\n" + - "priority:3\n" + - "type:t345\n" + - "JMSXGroupID:abc\n" + - "foo:abc\n" + - longHeader + - "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; - - sendFrame(frame); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("type", "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("priority", "3") + .addHeader("longHeader", buffer.toString()) + .setBody("Hello World"); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -615,37 +526,30 @@ public class StompTest extends StompTestBase { Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority()); Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); Assert.assertEquals("foo", "abc", message.getStringProperty("foo")); - Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader").length()); + Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader") + .length()); Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); } @Test public void testSubscribeWithAutoAck() throws Exception { + conn.connect(defUser, defPass); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); - sendMessage(getName()); + sendJmsMessage(getName()); - frame = receiveFrame(10000); - System.out.println("-------- frame received: " + frame); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); - Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); - Matcher cl_matcher = cl.matcher(frame); + Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE); + Matcher cl_matcher = cl.matcher(frame.toString()); Assert.assertFalse(cl_matcher.find()); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); // message should not be received as it was auto-acked MessageConsumer consumer = session.createConsumer(queue); @@ -656,48 +560,32 @@ public class StompTest extends StompTestBase { @Test public void testSubscribeWithAutoAckAndBytesMessage() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); byte[] payload = new byte[]{1, 2, 3, 4, 5}; - sendMessage(payload, queue); - frame = receiveFrame(10000); + sendJmsMessage(payload, queue); - System.out.println("Message: " + frame); + ClientStompFrame frame = conn.receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); - Matcher cl_matcher = cl.matcher(frame); + Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE); + Matcher cl_matcher = cl.matcher(frame.toString()); Assert.assertTrue(cl_matcher.find()); Assert.assertEquals("5", cl_matcher.group(1)); - Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); - Assert.assertTrue(frame.indexOf(new String(payload)) > -1); + Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame.toString()).find()); + Assert.assertTrue(frame.getBody().toString().indexOf(new String(payload)) > -1); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testSubscribeWithMessageSentWithProperties() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); MessageProducer producer = session.createProducer(queue); BytesMessage message = session.createBytesMessage(); @@ -712,79 +600,55 @@ public class StompTest extends StompTestBase { message.writeBytes("Hello World".getBytes(StandardCharsets.UTF_8)); producer.send(message); - frame = receiveFrame(10000); + ClientStompFrame frame = conn.receiveFrame(10000); Assert.assertNotNull(frame); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("S:") > 0); - Assert.assertTrue(frame.indexOf("n:") > 0); - Assert.assertTrue(frame.indexOf("byte:") > 0); - Assert.assertTrue(frame.indexOf("d:") > 0); - Assert.assertTrue(frame.indexOf("f:") > 0); - Assert.assertTrue(frame.indexOf("i:") > 0); - Assert.assertTrue(frame.indexOf("l:") > 0); - Assert.assertTrue(frame.indexOf("s:") > 0); - Assert.assertTrue(frame.indexOf("Hello World") > 0); - - // System.out.println("out: "+frame); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("value", frame.getHeader("S")); + Assert.assertEquals("false", frame.getHeader("n")); + Assert.assertEquals("9", frame.getHeader("byte")); + Assert.assertEquals("2.0", frame.getHeader("d")); + Assert.assertEquals("6.0", frame.getHeader("f")); + Assert.assertEquals("10", frame.getHeader("i")); + Assert.assertEquals("121", frame.getHeader("l")); + Assert.assertEquals("12", frame.getHeader("s")); + Assert.assertEquals("Hello World", frame.getBody()); + + conn.disconnect(); } @Test public void testSubscribeWithID() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "ack:auto\n" + - "id: mysubid\n\n" + - Stomp.NULL; - sendFrame(frame); + sendJmsMessage(getName()); - sendMessage(getName()); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION)); + Assert.assertEquals("mysubid", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); + Assert.assertEquals(getName(), frame.getBody()); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf("subscription:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } + // @Test public void testBodyWithUTF8() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C"; - System.out.println(text); - sendMessage(text); + log.info(text); + sendJmsMessage(text); - frame = receiveFrame(10000); - System.out.println(frame); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(text) > 0); + ClientStompFrame frame = conn.receiveFrame(10000); + log.info(frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION)); + Assert.assertEquals(text, frame.getBody()); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test @@ -792,88 +656,54 @@ public class StompTest extends StompTestBase { int ctr = 10; String[] data = new String[ctr]; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); for (int i = 0; i < ctr; ++i) { data[i] = getName() + i; - sendMessage(data[i]); + sendJmsMessage(data[i]); } for (int i = 0; i < ctr; ++i) { - frame = receiveFrame(1000); - Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } // sleep a while before publishing another set of messages - waitForFrameToTakeEffect(); + Thread.sleep(200); for (int i = 0; i < ctr; ++i) { - data[i] = getName() + ":second:" + i; - sendMessage(data[i]); + data[i] = getName() + Stomp.Headers.SEPARATOR + "second:" + i; + sendJmsMessage(data[i]); } for (int i = 0; i < ctr; ++i) { - frame = receiveFrame(1000); - Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0); + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testSubscribeWithAutoAckAndSelector() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'"); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + sendJmsMessage("Ignored message", "foo", "1234"); + sendJmsMessage("Real message", "foo", "zzz"); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message")); - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "selector: foo = 'zzz'\n" + - "ack:auto\n\n" + - Stomp.NULL; - sendFrame(frame); - - sendMessage("Ignored message", "foo", "1234"); - sendMessage("Real message", "foo", "zzz"); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testSubscribeWithAutoAckAndHyphenatedSelector() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "selector: hyphenated_props:foo-bar = 'zzz'\n" + - "ack:auto\n\n" + - Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "hyphenated_props:foo-bar = 'zzz'"); ServerLocator serverLocator = addServerLocator(ActiveMQClient.createServerLocator("vm://0")); ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory(); @@ -891,40 +721,26 @@ public class StompTest extends StompTestBase { producer.send(ignoredMessage); producer.send(realMessage); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message")); + + conn.disconnect(); } @Test public void testSubscribeWithClientAck() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - - sendFrame(frame); - - sendMessage(getName()); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE); - Matcher cl_matcher = cl.matcher(frame); - Assert.assertTrue(cl_matcher.find()); - String messageID = cl_matcher.group(1); - - frame = "ACK\n" + "message-id: " + messageID + "\n\n" + Stomp.NULL; - sendFrame(frame); + sendJmsMessage(getName()); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); + ack(conn, null, frame); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); // message should not be received since message was acknowledged by the client MessageConsumer consumer = session.createConsumer(queue); @@ -934,23 +750,14 @@ public class StompTest extends StompTestBase { @Test public void testRedeliveryWithClientAck() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + sendJmsMessage(getName()); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - - sendFrame(frame); - - sendMessage(getName()); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); // message should be received since message was not acknowledged MessageConsumer consumer = session.createConsumer(queue); @@ -970,150 +777,93 @@ public class StompTest extends StompTestBase { } protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT); + sendJmsMessage(getName()); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; - - sendFrame(frame); - sendMessage(getName()); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); log.info("Reconnecting!"); if (sendDisconnect) { - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForFrameToTakeEffect(); - reconnect(); + conn.disconnect(); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); } else { - reconnect(100); - waitForFrameToTakeEffect(); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); } // message should be received since message was not acknowledged - frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL; + subscribe(conn, null); - sendFrame(frame); + frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); + conn.disconnect(); + conn.destroy(); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForFrameToTakeEffect(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); // now let's make sure we don't see the message again - reconnect(); - - frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt: 1234\n\n" + - Stomp.NULL; + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, true); - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + sendJmsMessage("shouldBeNextMessage"); - sendMessage("shouldBeNextMessage"); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - System.out.println(frame); - Assert.assertTrue(frame.contains("shouldBeNextMessage")); + frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("shouldBeNextMessage", frame.getBody()); } + @Test public void testUnsubscribe() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); // send a message to our queue - sendMessage("first message"); + sendJmsMessage("first message"); - // receive message from socket - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); + // receive message + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt:567\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); + unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false); // send a message to our queue - sendMessage("second message"); + sendJmsMessage("second message"); - frame = receiveFrame(1000); + frame = conn.receiveFrame(1000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); } @Test public void testUnsubscribeWithID() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "id: mysubid\n" + - "ack:auto\n\n" + - Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO); // send a message to our queue - sendMessage("first message"); + sendJmsMessage("first message"); // receive message from socket - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); // remove suscription - frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt: 345\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForReceipt(); + unsubscribe(conn, "mysubid", null, true, false); // send a message to our queue - sendMessage("second message"); + sendJmsMessage("second message"); - frame = receiveFrame(1000); + frame = conn.receiveFrame(1000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); @@ -1122,34 +872,15 @@ public class StompTest extends StompTestBase { @Test public void testTransactionCommit() throws Exception { MessageConsumer consumer = session.createConsumer(queue); + conn.connect(defUser, defPass); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - String f = receiveFrame(1000); - Assert.assertTrue(f.startsWith("CONNECTED")); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "transaction: tx1\n" + - "receipt: 123\n" + - "\n\n" + - "Hello World" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); + beginTransaction(conn, "tx1"); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1"); // check the message is not committed assertNull(consumer.receive(100)); - frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:456\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForReceipt(); + commitTransaction(conn, "tx1", true); Message message = consumer.receive(1000); Assert.assertNotNull("Should have received a message", message); @@ -1158,49 +889,20 @@ public class StompTest extends StompTestBase { @Test public void testSuccessiveTransactionsWithSameID() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - String f = receiveFrame(1000); - Assert.assertTrue(f.startsWith("CONNECTED")); + conn.connect(defUser, defPass); // first tx - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "transaction: tx1\n" + - "\n\n" + - "Hello World" + - Stomp.NULL; - sendFrame(frame); - - frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + beginTransaction(conn, "tx1"); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1"); + commitTransaction(conn, "tx1"); Message message = consumer.receive(1000); Assert.assertNotNull("Should have received a message", message); // 2nd tx with same tx ID - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "transaction: tx1\n" + - "\n\n" + - "Hello World" + - Stomp.NULL; - sendFrame(frame); - - frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + beginTransaction(conn, "tx1"); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1"); + commitTransaction(conn, "tx1"); message = consumer.receive(1000); Assert.assertNotNull("Should have received a message", message); @@ -1208,67 +910,27 @@ public class StompTest extends StompTestBase { @Test public void testBeginSameTransactionTwice() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - String f = receiveFrame(1000); - Assert.assertTrue(f.startsWith("CONNECTED")); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - // begin the tx a 2nd time - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - f = receiveFrame(1000); - Assert.assertTrue(f.startsWith("ERROR")); + conn.connect(defUser, defPass); + beginTransaction(conn, "tx1"); + beginTransaction(conn, "tx1"); + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.ERROR, frame.getCommand()); } @Test public void testTransactionRollback() throws Exception { MessageConsumer consumer = session.createConsumer(queue); + String txId = "tx1"; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - String f = receiveFrame(1000); - Assert.assertTrue(f.startsWith("CONNECTED")); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "transaction: tx1\n" + - "\n" + - "first message" + - Stomp.NULL; - sendFrame(frame); - - // rollback first message - frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "transaction: tx1\n" + - "\n" + - "second message" + - Stomp.NULL; - sendFrame(frame); - - frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:789\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForReceipt(); + conn.connect(defUser, defPass); + beginTransaction(conn, txId); + send(conn, getQueuePrefix() + getQueueName(), null, "first message", true, null, txId); + abortTransaction(conn, txId); + + beginTransaction(conn, txId); + send(conn, getQueuePrefix() + getQueueName(), null, "second message", true, null, txId); + commitTransaction(conn, txId); // only second msg should be received since first msg was rolled back TextMessage message = (TextMessage) consumer.receive(1000); @@ -1280,91 +942,52 @@ public class StompTest extends StompTestBase { public void testSubscribeToTopic() throws Exception { final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); - frame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + subscribeTopic(conn, null, null, null, true); assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) { + int length = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length; + if (length - baselineQueueCount == 1) { return true; } else { + log.info("Queue count: " + (length - baselineQueueCount)); return false; } } - }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100))); - - sendMessage(getName(), topic); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); - - frame = "UNSUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 1234\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for UNSUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); - - sendMessage(getName(), topic); - - frame = receiveFrame(1000); + }, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS.toMillis(100))); + + sendJmsMessage(getName(), topic); + + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); + + unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false); + + sendJmsMessage(getName(), topic); + + frame = conn.receiveFrame(1000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testSubscribeToQueue() throws Exception { final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt: 12\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + conn.connect(defUser, defPass); + subscribe(conn, null, null, null, true); assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() { - @Override public boolean isSatisfied() throws Exception { if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) { @@ -1375,65 +998,39 @@ public class StompTest extends StompTestBase { } }, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100))); - sendMessage(getName(), queue); + sendJmsMessage(getName(), queue); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt: 1234\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for UNSUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false); - sendMessage(getName(), queue); + sendJmsMessage(getName(), queue); - frame = receiveFrame(1000); + frame = conn.receiveFrame(1000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testSubscribeToNonExistentQueue() throws Exception { String nonExistentQueue = RandomUtil.randomString(); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - nonExistentQueue + - "\n" + - "receipt: 12\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + conn.connect(defUser, defPass); + subscribe(conn, null, null, null, null, getQueuePrefix() + nonExistentQueue, true); - sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue)); + sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue)); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + ClientStompFrame frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + nonExistentQueue, frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue))); @@ -1449,322 +1046,160 @@ public class StompTest extends StompTestBase { } }, 1000, 50)); - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - nonExistentQueue + - "\n" + - "receipt: 1234\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for UNSUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + unsubscribe(conn, null, getQueuePrefix() + nonExistentQueue, true, false); assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue))); - sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue)); + sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue)); - frame = receiveFrame(1000); + frame = conn.receiveFrame(1000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testDurableSubscriberWithReconnection() throws Exception { + conn.connect(defUser, defPass, "myclientid"); + subscribeTopic(conn, null, null, getName()); - String connectFame = "CONNECT\n" + "login: brianm\n" + - "passcode: wombats\n" + - "client-id: myclientid\n\n" + - Stomp.NULL; - sendFrame(connectFame); - - String frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - String subscribeFrame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "durable-subscriber-name: " + - getName() + - "\n" + - "\n\n" + - Stomp.NULL; - sendFrame(subscribeFrame); - waitForFrameToTakeEffect(); - - String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(disconnectFrame); - waitForFrameToTakeEffect(); + conn.disconnect(); + + Thread.sleep(500); // send the message when the durable subscriber is disconnected - sendMessage(getName(), topic); + sendJmsMessage(getName(), topic); - reconnect(100); - sendFrame(connectFame); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass, "myclientid"); - sendFrame(subscribeFrame); + subscribeTopic(conn, null, null, getName()); - // we must have received the message - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + ClientStompFrame frame = conn.receiveFrame(3000); + assertNotNull("Should have received a message from the durable subscription", frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); - String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 1234\n" + - "\n\n" + - Stomp.NULL; - sendFrame(unsubscribeFrame); - // wait for UNSUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, true); - sendFrame(disconnectFrame); + conn.disconnect(); } @Test public void testDurableSubscriber() throws Exception { - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - String subscribeFrame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "durable-subscriber-name: " + - getName() + - "\n" + - "\n\n" + - Stomp.NULL; - sendFrame(subscribeFrame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); + conn.connect(defUser, defPass, "myclientid"); + subscribeTopic(conn, null, null, getName(), true); + ClientStompFrame response = subscribeTopic(conn, null, null, getName(), true); // creating a subscriber with the same durable-subscriber-name must fail - sendFrame(subscribeFrame); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("ERROR")); + Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand()); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testDurableUnSubscribe() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(1000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - String subscribeFrame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "durable-subscriber-name: " + - getName() + - "\n" + - "\n\n" + - Stomp.NULL; - sendFrame(subscribeFrame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(1000); - Assert.assertTrue(frame.startsWith("RECEIPT")); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForFrameToTakeEffect(); + conn.connect(defUser, defPass, "myclientid"); + subscribeTopic(conn, null, null, getName(), true); + conn.disconnect(); + Thread.sleep(500); assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); - reconnect(100); - frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(1000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "durable-subscriber-name: " + - getName() + - "\n" + - "\n\n" + - Stomp.NULL; - sendFrame(unsubscribeFrame); + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForFrameToTakeEffect(); + conn.connect(defUser, defPass, "myclientid"); + unsubscribe(conn, getName(), getTopicPrefix() + getTopicName(), false, true); + conn.disconnect(); + Thread.sleep(500); assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); } @Test public void testSubscribeToTopicWithNoLocal() throws Exception { + conn.connect(defUser, defPass); + subscribeTopic(conn, null, null, null, true, true); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "no-local: true\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); - - // send a message on the same connection => it should not be received - frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n\n" + "Hello World" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(2000); + // send a message on the same connection => it should not be received is noLocal = true on subscribe + send(conn, getTopicPrefix() + getTopicName(), null, "Hello World"); + + ClientStompFrame frame = conn.receiveFrame(2000); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); // send message on another JMS connection => it should be received - sendMessage(getName(), topic); - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + sendJmsMessage(getName(), topic); + frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception { + conn.connect(defUser, defPass); + subscribeTopic(conn, null, null, null, true); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + // disconnect, _without unsubscribing_ + conn.disconnect(); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + Thread.sleep(500); - frame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("RECEIPT")); - - // disconnect, _without unsubscribing_ - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - waitForFrameToTakeEffect(); + conn.destroy(); // connect again - reconnect(); - frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); // send a receipted message to the topic - frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\nreceipt:42\n\n\n" + "Hello World" + Stomp.NULL; - sendFrame(frame); - - // the topic should exist and receive the message, and we should get the requested receipt - frame = receiveFrame(2000); - log.info("Received frame: " + frame); - Assert.assertTrue(frame.startsWith("RECEIPT")); + ClientStompFrame response = send(conn, getTopicPrefix() + getTopicName(), null, "Hello World", true); + assertEquals(Stomp.Responses.RECEIPT, response.getCommand()); // ...and nothing else - frame = receiveFrame(2000); + ClientStompFrame frame = conn.receiveFrame(2000); log.info("Received frame: " + frame); Assert.assertNull(frame); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test public void testClientAckNotPartOfTransaction() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "ack:client\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - - sendMessage(getName()); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); - Assert.assertTrue(frame.indexOf("message-id:") > 0); - Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE); - Matcher cl_matcher = cl.matcher(frame); - Assert.assertTrue(cl_matcher.find()); - String messageID = cl_matcher.group(1); - - frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + sendJmsMessage(getName()); - frame = "ACK\n" + "message-id:" + messageID + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL; - sendFrame(frame); + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); + Assert.assertNotNull(messageID); + Assert.assertEquals(getName(), frame.getBody()); - frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + beginTransaction(conn, "tx1"); + ack(conn, null, messageID, "tx1"); + abortTransaction(conn, "tx1"); - frame = receiveFrame(1000); + frame = conn.receiveFrame(1000); Assert.assertNull("No message should have been received as the message was acked even though the transaction has been aborted", frame); - frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL; - sendFrame(frame); + unsubscribe(conn, null, getQueuePrefix() + getQueueName(), false, false); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } // HORNETQ-1007 @Test public void testMultiProtocolConsumers() throws Exception { - final int TIME_OUT = 5000; + final int TIME_OUT = 2000; int count = 1000; @@ -1773,21 +1208,8 @@ public class StompTest extends StompTestBase { MessageConsumer consumer2 = session.createConsumer(topic); // connect and subscribe STOMP consumer - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(TIME_OUT); - Assert.assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 12\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for SUBSCRIBE's receipt - frame = receiveFrame(TIME_OUT); - Assert.assertTrue(frame.startsWith("RECEIPT")); + conn.connect(defUser, defPass); + subscribeTopic(conn, null, null, null, true); MessageProducer producer = session.createProducer(topic); TextMessage message = session.createTextMessage(getName()); @@ -1796,58 +1218,192 @@ public class StompTest extends StompTestBase { producer.send(message); Assert.assertNotNull(consumer1.receive(TIME_OUT)); Assert.assertNotNull(consumer2.receive(TIME_OUT)); - frame = receiveFrame(TIME_OUT); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - Assert.assertTrue(frame.indexOf(getName()) > 0); + ClientStompFrame frame = conn.receiveFrame(TIME_OUT); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); } consumer1.close(); consumer2.close(); - frame = "UNSUBSCRIBE\n" + "destination:" + - getTopicPrefix() + - getTopicName() + - "\n" + - "receipt: 1234\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - // wait for UNSUBSCRIBE's receipt - frame = receiveFrame(TIME_OUT); - Assert.assertTrue(frame.startsWith("RECEIPT")); - - sendMessage(getName(), topic); - - frame = receiveFrame(TIME_OUT); + unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false); + + sendJmsMessage(getName(), topic); + + ClientStompFrame frame = conn.receiveFrame(TIME_OUT); log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); } @Test //stomp should return an ERROR when acking a non-existent message public void testUnexpectedAck() throws Exception { + String messageID = "888888"; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); + conn.connect(defUser, defPass); + ack(conn, null, messageID, null); - frame = receiveFrame(100000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + ClientStompFrame frame = conn.receiveFrame(1000); + assertNotNull(frame); + assertEquals(Stomp.Responses.ERROR, frame.getCommand()); - String messageID = "888888"; - frame = "ACK\n" + "message-id:" + messageID + "\n" + "\n" + Stomp.NULL; - sendFrame(frame); + conn.disconnect(); + } - frame = receiveFrame(100000); - assertNotNull(frame); + @Test + public void testDotAnycastPrefixOnSend() throws Exception { + testPrefix("jms.queue.", AddressInfo.RoutingType.ANYCAST, true); + } + + @Test + public void testDotMulticastPrefixOnSend() throws Exception { + testPrefix("jms.topic.", AddressInfo.RoutingType.MULTICAST, true); + } + + @Test + public void testDotAnycastPrefixOnSubscribe() throws Exception { + testPrefix("jms.queue.", AddressInfo.RoutingType.ANYCAST, false); + } + + @Test + public void testDotMulticastPrefixOnSubscribe() throws Exception { + testPrefix("jms.topic.", AddressInfo.RoutingType.MULTICAST, false); + } + + @Test + public void testSlashAnycastPrefixOnSend() throws Exception { + testPrefix("/queue/", AddressInfo.RoutingType.ANYCAST, true); + } + + @Test + public void testSlashMulticastPrefixOnSend() throws Exception { + testPrefix("/topic/", AddressInfo.RoutingType.MULTICAST, true); + } + + @Test + public void testSlashAnycastPrefixOnSubscribe() throws Exception { + testPrefix("/queue/", AddressInfo.RoutingType.ANYCAST, false); + } + + @Test + public void testSlashMulticastPrefixOnSubscribe() throws Exception { + testPrefix("/topic/", AddressInfo.RoutingType.MULTICAST, false); + } + + public void testPrefix(final String prefix, final AddressInfo.RoutingType routingType, final boolean send) throws Exception { + int port = 61614; + final String ADDRESS = UUID.randomUUID().toString(); + final String PREFIXED_ADDRESS = prefix + ADDRESS; + String param = routingType.toString(); + String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + // since this queue doesn't exist the broker should create a new address using the routing type matching the prefix + if (send) { + send(conn, PREFIXED_ADDRESS, null, "Hello World", true); + } else { + String uuid = UUID.randomUUID().toString(); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, PREFIXED_ADDRESS) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + + frame = conn.sendFrame(frame); + + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + } + + AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); + assertNotNull("No address was created with the name " + ADDRESS, addressInfo); + assertEquals(AddressInfo.RoutingType.valueOf(param), addressInfo.getRoutingType()); + + conn.disconnect(); + } + + @Test + public void testDotPrefixedSendAndRecieveAnycast() throws Exception { + testPrefixedSendAndRecieve("jms.queue.", AddressInfo.RoutingType.ANYCAST); + } + + @Test + public void testDotPrefixedSendAndRecieveMulticast() throws Exception { + testPrefixedSendAndRecieve("jms.topic.", AddressInfo.RoutingType.MULTICAST); + } + + @Test + public void testSlashPrefixedSendAndRecieveAnycast() throws Exception { + testPrefixedSendAndRecieve("/queue/", AddressInfo.RoutingType.ANYCAST); + } + + @Test + public void testSlashPrefixedSendAndRecieveMulticast() throws Exception { + testPrefixedSendAndRecieve("/topic/", AddressInfo.RoutingType.MULTICAST); + } - System.out.println("received frame: " + frame); - assertTrue(frame.startsWith("ERROR")); + public void testPrefixedSendAndRecieve(final String prefix, AddressInfo.RoutingType routingType) throws Exception { + int port = 61614; + final String ADDRESS = UUID.randomUUID().toString(); + final String PREFIXED_ADDRESS = prefix + ADDRESS; + String param = routingType.toString(); + String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); + conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + String uuid = UUID.randomUUID().toString(); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION,