Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE723200BD9 for ; Fri, 9 Dec 2016 20:48:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AD2F5160B2A; Fri, 9 Dec 2016 19:48:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA835160B36 for ; Fri, 9 Dec 2016 20:48:47 +0100 (CET) Received: (qmail 89315 invoked by uid 500); 9 Dec 2016 19:48:46 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88518 invoked by uid 99); 9 Dec 2016 19:48:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Dec 2016 19:48:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EFE7AE36E0; Fri, 9 Dec 2016 19:48:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 09 Dec 2016 19:48:56 -0000 Message-Id: <0d7bb69b36e043c99df619a2aa78ea8d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] activemq-artemis git commit: ARTEMIS-788 Stomp refactor + track autocreation for addresses archived-at: Fri, 09 Dec 2016 19:48:50 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index fcb9884..9b1ad93 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -33,14 +33,15 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12; -import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,28 +50,28 @@ import org.junit.Test; /** * Testing Stomp version 1.2 functionalities */ -public class StompV12Test extends StompV11TestBase { +public class StompV12Test extends StompTestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; public static final String CLIENT_ID = "myclientid"; - private StompClientConnectionV12 connV12; + private StompClientConnectionV12 conn; @Override @Before public void setUp() throws Exception { super.setUp(); - connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); } @Override @After public void tearDown() throws Exception { try { - boolean connected = connV12 != null && connV12.isConnected(); + boolean connected = conn != null && conn.isConnected(); log.debug("Connection 1.2 : " + connected); if (connected) { - connV12.disconnect(); + conn.disconnect(); } } finally { super.tearDown(); @@ -119,32 +120,32 @@ public class StompV12Test extends StompV11TestBase { public void testConnectionAsInSpec() throws Exception { StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); - ClientStompFrame frame = conn.createFrame("CONNECT"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("accept-version", "1.2"); - frame.addHeader("host", "127.0.0.1"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.2"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); ClientStompFrame reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); - Assert.assertEquals("1.2", reply.getHeader("version")); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); + Assert.assertEquals("1.2", reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); //need 1.2 client conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = conn.createFrame("STOMP"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("accept-version", "1.2"); - frame.addHeader("host", "127.0.0.1"); + frame = conn.createFrame(Stomp.Commands.STOMP); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.2"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); - Assert.assertEquals("1.2", reply.getHeader("version")); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); + Assert.assertEquals("1.2", reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); } @@ -153,82 +154,82 @@ public class StompV12Test extends StompV11TestBase { public void testNegotiation() throws Exception { StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); // case 1 accept-version absent. It is a 1.0 connect - ClientStompFrame frame = conn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); ClientStompFrame reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server - Assert.assertEquals(null, reply.getHeader("version")); + Assert.assertEquals(null, reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); // case 2 accept-version=1.0, result: 1.0 conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.0"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server - Assert.assertEquals("1.0", reply.getHeader("version")); + Assert.assertEquals("1.0", reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); // case 3 accept-version=1.1, result: 1.1 conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.1"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.1"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server - Assert.assertEquals("1.1", reply.getHeader("version")); + Assert.assertEquals("1.1", reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); // case 4 accept-version=1.0,1.1,1.3, result 1.2 conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.0,1.1,1.3"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.3"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); //reply headers: version, session, server - Assert.assertEquals("1.1", reply.getHeader("version")); + Assert.assertEquals("1.1", reply.getHeader(Stomp.Headers.Error.VERSION)); conn.disconnect(); // case 5 accept-version=1.3, result error conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("accept-version", "1.3"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.ACCEPT_VERSION, "1.3"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); + frame.addHeader(Stomp.Headers.Connect.LOGIN, this.defUser); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); reply = conn.sendFrame(frame); - Assert.assertEquals("ERROR", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.ERROR, reply.getCommand()); System.out.println("Got error frame " + reply); @@ -236,50 +237,31 @@ public class StompV12Test extends StompV11TestBase { @Test public void testSendAndReceive() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World 1!"); + conn.connect(defUser, defPass); - ClientStompFrame response = connV12.sendFrame(frame); + ClientStompFrame response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); Assert.assertNull(response); - frame.addHeader("receipt", "1234"); - frame.setBody("Hello World 2!"); - - response = connV12.sendFrame(frame); - - Assert.assertNotNull(response); - - Assert.assertEquals("RECEIPT", response.getCommand()); - - Assert.assertEquals("1234", response.getHeader("receipt-id")); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); + subscribe(newConn, "a-sub"); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); - - frame = newConn.receiveFrame(); + ClientStompFrame frame = newConn.receiveFrame(); System.out.println("received " + frame); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - Assert.assertEquals("a-sub", frame.getHeader("subscription")); + Assert.assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); //'auto' ack mode doesn't require 'ack' header - Assert.assertNull(frame.getHeader("ack")); + Assert.assertNull(frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE)); - Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); Assert.assertEquals("Hello World 1!", frame.getBody()); @@ -288,90 +270,70 @@ public class StompV12Test extends StompV11TestBase { System.out.println("received " + frame); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @Test public void testHeaderContentType() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.setBody("Hello World 1!"); + conn.connect(defUser, defPass); - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!"); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass); + subscribe(newConn, "a-sub"); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); - - frame = newConn.receiveFrame(); + ClientStompFrame frame = newConn.receiveFrame(); System.out.println("received " + frame); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - Assert.assertEquals("application/xml", frame.getHeader("content-type")); + Assert.assertEquals("application/xml", frame.getHeader(Stomp.Headers.CONTENT_TYPE)); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @Test public void testHeaderContentLength() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); + conn.connect(defUser, defPass); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); - frame.setBody(body + "extra"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .setBody(body + "extra"); - connV12.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); - - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals(body, frame.getBody()); - Assert.assertEquals(cLen, frame.getHeader("content-length")); + Assert.assertEquals(cLen, frame.getHeader(Stomp.Headers.CONTENT_LENGTH)); //send again without content-length header - frame = connV12.createFrame("SEND"); + frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .setBody(body + "extra"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.setBody(body + "extra"); - - connV12.sendFrame(frame); + conn.sendFrame(frame); //receive again. extra should received. frame = newConn.receiveFrame(); @@ -380,11 +342,10 @@ public class StompV12Test extends StompV11TestBase { //although sender didn't send the content-length header, //the server should add it anyway - Assert.assertEquals((body + "extra").getBytes(StandardCharsets.UTF_8).length, Integer.valueOf(frame.getHeader("content-length")).intValue()); + Assert.assertEquals((body + "extra").getBytes(StandardCharsets.UTF_8).length, Integer.valueOf(frame.getHeader(Stomp.Headers.CONTENT_LENGTH)).intValue()); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @@ -396,104 +357,90 @@ public class StompV12Test extends StompV11TestBase { addressSettings.setAutoCreateJmsQueues(false); server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings); - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); + conn.connect(defUser, defPass); String body = "Hello World!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("destination", "aNonexistentQueue"); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); - frame.addHeader("foo", "value1"); - frame.addHeader("foo", "value2"); - frame.addHeader("foo", "value3"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "aNonexistentQueue") + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .addHeader("foo", "value1") + .addHeader("foo", "value2") + .addHeader("foo", "value3"); frame.setBody(body); - connV12.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); - - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals(body, frame.getBody()); System.out.println("received: " + frame); Assert.assertEquals("value1", frame.getHeader("foo")); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); //should get error - frame = connV12.createFrame("SEND"); body = "Hello World!"; cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - frame.addHeader("destination", "aNonexistentQueue"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); - frame.addHeader("receipt", "1234"); - - frame.setBody(body); + frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "aNonexistentQueue") + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1234") + .setBody(body); - ClientStompFrame reply = connV12.sendFrame(frame); - Assert.assertEquals("ERROR", reply.getCommand()); + ClientStompFrame reply = conn.sendFrame(frame); + // TODO this is broken because queue auto-creation is always on + Assert.assertEquals(Stomp.Responses.ERROR, reply.getCommand()); } //padding shouldn't be trimmed @Test public void testHeadersPadding() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); + conn.connect(defUser, defPass); String body = "

Hello World!

"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); - frame.addHeader(" header1", "value1 "); - frame.addHeader(" header2", "value2 "); - frame.addHeader("header3 ", " value3"); - frame.addHeader(" header4 ", " value4 "); - frame.addHeader(" header 5 ", " value 5 "); - frame.addHeader("header6", "\t value\t 6 \t"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .addHeader(" header1", "value1 ") + .addHeader(" header2", "value2 ") + .addHeader("header3 ", " value3") + .addHeader(" header4 ", " value4 ") + .addHeader(" header 5 ", " value 5 ") + .addHeader("header6", "\t value\t 6 \t") + .setBody(body); - frame.setBody(body); - - connV12.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); - - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals(body, frame.getBody()); System.out.println("received: " + frame); @@ -506,8 +453,7 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("\t value\t 6 \t", frame.getHeader("header6")); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @@ -517,49 +463,41 @@ public class StompV12Test extends StompV11TestBase { */ @Test public void testHeaderEncoding() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - + conn.connect(defUser, defPass); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); - - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "application/xml"); - frame.addHeader("content-length", cLen); String hKey = "\\rspecial-header\\\\\\n\\c\\r\\n"; String hVal = "\\c\\\\\\ngood\\n\\r"; - frame.addHeader(hKey, hVal); - System.out.println("key: |" + hKey + "| val: |" + hVal + "|"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml") + .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen) + .addHeader(hKey, hVal) + .setBody(body); - frame.setBody(body); + System.out.println("key: |" + hKey + "| val: |" + hVal + "|"); - connV12.sendFrame(frame); + conn.sendFrame(frame); //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); frame = newConn.receiveFrame(); System.out.println("received " + frame); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); String value = frame.getHeader("\r" + "special-header" + "\\" + "\n" + ":" + "\r\n"); Assert.assertEquals(":" + "\\" + "\n" + "good\n\r", value); //unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); + unsubscribe(newConn, "a-sub"); newConn.disconnect(); } @@ -569,8 +507,8 @@ public class StompV12Test extends StompV11TestBase { */ @Test public void testHeaderUndefinedEscape() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); + conn.connect(defUser, defPass); + ClientStompFrame frame = conn.createFrame("SEND"); String body = "Hello World 1!"; String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length); @@ -586,9 +524,9 @@ public class StompV12Test extends StompV11TestBase { frame.setBody(body); - connV12.sendFrame(frame); + conn.sendFrame(frame); - ClientStompFrame error = connV12.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); System.out.println("received " + error); @@ -596,22 +534,22 @@ public class StompV12Test extends StompV11TestBase { Assert.assertNotNull(desc, error); Assert.assertEquals(desc, "ERROR", error.getCommand()); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); } @Test public void testHeartBeat() throws Exception { StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); //no heart beat at all if heat-beat absent - ClientStompFrame frame = conn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass); ClientStompFrame reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); Thread.sleep(5000); @@ -621,18 +559,18 @@ public class StompV12Test extends StompV11TestBase { //no heart beat for (0,0) conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "0,0"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - Assert.assertEquals("0,30000", reply.getHeader("heart-beat")); + Assert.assertEquals("0,30000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(5000); @@ -642,30 +580,25 @@ public class StompV12Test extends StompV11TestBase { //heart-beat (1,0), should receive a min client ping accepted by server conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.2"); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.2"); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - Assert.assertEquals("0,500", reply.getHeader("heart-beat")); + Assert.assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); Thread.sleep(2000); //now server side should be disconnected because we didn't send ping for 2 sec - frame = conn.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will fail try { - conn.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); Assert.fail("connection should have been destroyed by now"); } catch (IOException e) { //ignore @@ -673,128 +606,106 @@ public class StompV12Test extends StompV11TestBase { //heart-beat (1,0), start a ping, then send a message, should be ok. conn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = conn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,0"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - Assert.assertEquals("0,500", reply.getHeader("heart-beat")); + Assert.assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); conn.startPinger(500); Thread.sleep(2000); - frame = conn.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will be ok - conn.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); conn.stopPinger(); conn.disconnect(); - } //server ping @Test public void testHeartBeat2() throws Exception { //heart-beat (1,1) - ClientStompFrame frame = connV12.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "1,1"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,1") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); - ClientStompFrame reply = connV12.sendFrame(frame); + ClientStompFrame reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); - Assert.assertEquals("500,500", reply.getHeader("heart-beat")); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); + Assert.assertEquals("500,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - connV12.disconnect(); + conn.disconnect(); //heart-beat (500,1000) - connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = connV12.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); - reply = connV12.sendFrame(frame); + reply = conn.sendFrame(frame); - Assert.assertEquals("CONNECTED", reply.getCommand()); + Assert.assertEquals(Stomp.Responses.CONNECTED, reply.getCommand()); - Assert.assertEquals("1000,500", reply.getHeader("heart-beat")); + Assert.assertEquals("1000,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT)); - connV12.startPinger(500); + conn.startPinger(500); Thread.sleep(10000); //now check the frame size - int size = connV12.getServerPingNumber(); + int size = conn.getServerPingNumber(); System.out.println("ping received: " + size); Assert.assertTrue("size: " + size, size > 5); //now server side should be disconnected because we didn't send ping for 2 sec - frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); - frame.setBody("Hello World"); - //send will be ok - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World"); - connV12.disconnect(); + conn.disconnect(); } @Test public void testSendWithHeartBeatsAndReceive() throws Exception { StompClientConnection newConn = null; try { - ClientStompFrame frame = connV12.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); - connV12.sendFrame(frame); - - connV12.startPinger(500); + conn.sendFrame(frame); - frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.startPinger(500); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } // subscribe newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); newConn.connect(defUser, defPass); - - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -809,38 +720,32 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { if (newConn != null) newConn.disconnect(); - connV12.disconnect(); + conn.disconnect(); } } @Test public void testSendAndReceiveWithHeartBeats() throws Exception { - connV12.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.connect(defUser, defPass); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } //subscribe StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); try { - frame = newConn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1"); + ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1"); newConn.sendFrame(frame); @@ -848,12 +753,7 @@ public class StompV12Test extends StompV11TestBase { Thread.sleep(500); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -868,9 +768,7 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { newConn.disconnect(); } @@ -880,35 +778,30 @@ public class StompV12Test extends StompV11TestBase { public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception { StompClientConnection newConn = null; try { - ClientStompFrame frame = connV12.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); - connV12.sendFrame(frame); - - connV12.startPinger(500); + conn.sendFrame(frame); - frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-type", "text/plain"); + conn.startPinger(500); for (int i = 0; i < 10; i++) { - frame.setBody("Hello World " + i + "!"); - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!"); Thread.sleep(500); } // subscribe newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - frame = newConn.createFrame("CONNECT"); - frame.addHeader("host", "127.0.0.1"); - frame.addHeader("login", this.defUser); - frame.addHeader("passcode", this.defPass); - frame.addHeader("heart-beat", "500,1000"); - frame.addHeader("accept-version", "1.0,1.1,1.2"); + frame = newConn.createFrame(Stomp.Commands.CONNECT) + .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1") + .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser) + .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass) + .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000") + .addHeader(Stomp.Headers.ACCEPT_VERSION, "1.0,1.1,1.2"); newConn.sendFrame(frame); @@ -916,12 +809,7 @@ public class StompV12Test extends StompV11TestBase { Thread.sleep(500); - ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - newConn.sendFrame(subFrame); + subscribe(newConn, "a-sub"); int cnt = 0; @@ -935,33 +823,31 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals(10, cnt); // unsub - ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - newConn.sendFrame(unsubFrame); + unsubscribe(newConn, "a-sub"); } finally { if (newConn != null) newConn.disconnect(); - connV12.disconnect(); + conn.disconnect(); } } @Test public void testNack() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - nack(connV12, messageID); + nack(conn, messageID); - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //Nack makes the message be dropped. MessageConsumer consumer = session.createConsumer(queue); @@ -971,24 +857,24 @@ public class StompV12Test extends StompV11TestBase { @Test public void testNackWithWrongSubId() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("ack"); + String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); - nack(connV12, messageID + "0"); + nack(conn, messageID + "0"); - ClientStompFrame error = connV12.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); - Assert.assertEquals("ERROR", error.getCommand()); + Assert.assertEquals(Stomp.Responses.ERROR, error.getCommand()); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message should be still there MessageConsumer consumer = session.createConsumer(queue); @@ -998,26 +884,26 @@ public class StompV12Test extends StompV11TestBase { @Test public void testNackWithWrongMessageId() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); Assert.assertNotNull(frame); - Assert.assertNotNull(frame.getHeader("ack")); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE)); - nack(connV12, "someother"); + nack(conn, "someother"); - ClientStompFrame error = connV12.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); System.out.println("Receiver error: " + error); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1027,23 +913,23 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAck() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("ack"); + String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); Assert.assertNotNull(messageID); - ack(connV12, messageID, null); + ack(conn, messageID); - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //Nack makes the message be dropped. MessageConsumer consumer = session.createConsumer(queue); @@ -1053,28 +939,28 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckNoIDHeader() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client-individual"); + subscribe(conn, "sub1", "client-individual"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("ack"); + String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); Assert.assertNotNull(messageID); - ClientStompFrame ackFrame = connV12.createFrame("ACK"); + ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK); - connV12.sendFrame(ackFrame); + conn.sendFrame(ackFrame); - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); - Assert.assertEquals("ERROR", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.ERROR, frame.getCommand()); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message still there. MessageConsumer consumer = session.createConsumer(queue); @@ -1084,24 +970,24 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckWithWrongMessageId() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); Assert.assertNotNull(frame); - ack(connV12, "someother", null); + ack(conn, "someother"); - ClientStompFrame error = connV12.receiveFrame(); + ClientStompFrame error = conn.receiveFrame(); System.out.println("Receiver error: " + error); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1111,32 +997,32 @@ public class StompV12Test extends StompV11TestBase { @Test public void testErrorWithReceipt() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ClientStompFrame ackFrame = connV12.createFrame("ACK"); + ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK); //give it a wrong sub id - ackFrame.addHeader("subscription", "sub2"); - ackFrame.addHeader("message-id", messageID); - ackFrame.addHeader("receipt", "answer-me"); + ackFrame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, "sub2"); + ackFrame.addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID); + ackFrame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me"); - ClientStompFrame error = connV12.sendFrame(ackFrame); + ClientStompFrame error = conn.sendFrame(ackFrame); System.out.println("Receiver error: " + error); - Assert.assertEquals("ERROR", error.getCommand()); + Assert.assertEquals(Stomp.Responses.ERROR, error.getCommand()); Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1146,32 +1032,32 @@ public class StompV12Test extends StompV11TestBase { @Test public void testErrorWithReceipt2() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - String messageID = frame.getHeader("message-id"); + String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID); - ClientStompFrame ackFrame = connV12.createFrame("ACK"); + ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK); //give it a wrong sub id - ackFrame.addHeader("subscription", "sub1"); - ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1)); - ackFrame.addHeader("receipt", "answer-me"); + ackFrame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, "sub1"); + ackFrame.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(Long.valueOf(messageID) + 1)); + ackFrame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me"); - ClientStompFrame error = connV12.sendFrame(ackFrame); + ClientStompFrame error = conn.sendFrame(ackFrame); System.out.println("Receiver error: " + error); - Assert.assertEquals("ERROR", error.getCommand()); + Assert.assertEquals(Stomp.Responses.ERROR, error.getCommand()); Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1189,29 +1075,29 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckModeClient() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-ack" + i); + this.sendJmsMessage("client-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertNotNull(frame); } //ack the last - ack(connV12, frame); + ack(conn, frame); - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1221,31 +1107,31 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckModeClient2() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client"); + subscribe(conn, "sub1", "client"); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-ack" + i); + this.sendJmsMessage("client-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertNotNull(frame); //ack the 49th if (i == num - 2) { - ack(connV12, frame); + ack(conn, frame); } } - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //one can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1258,26 +1144,26 @@ public class StompV12Test extends StompV11TestBase { //when ack is missing the mode default to auto @Test public void testAckModeDefault() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", null); + subscribe(conn, "sub1", null); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("auto-ack" + i); + this.sendJmsMessage("auto-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertNotNull(frame); } - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1287,26 +1173,26 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckModeAuto() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "auto"); + subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("auto-ack" + i); + this.sendJmsMessage("auto-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertNotNull(frame); } - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1316,32 +1202,32 @@ public class StompV12Test extends StompV11TestBase { @Test public void testAckModeClientIndividual() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - subscribe(connV12, "sub1", "client-individual"); + subscribe(conn, "sub1", "client-individual"); int num = 50; //send a bunch of messages for (int i = 0; i < num; i++) { - this.sendMessage("client-individual-ack" + i); + this.sendJmsMessage("client-individual-ack" + i); } ClientStompFrame frame = null; for (int i = 0; i < num; i++) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertNotNull(frame); System.out.println(i + " == received: " + frame); //ack on even numbers if (i % 2 == 0) { - ack(connV12, frame); + ack(conn, frame); } } - unsubscribe(connV12, "sub1"); + unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); //no messages can be received. MessageConsumer consumer = session.createConsumer(queue); @@ -1360,64 +1246,55 @@ public class StompV12Test extends StompV11TestBase { @Test public void testTwoSubscribers() throws Exception { - connV12.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV12, "sub1", "auto", null); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); newConn.connect(defUser, defPass, "myclientid2"); - this.subscribeTopic(newConn, "sub2", "auto", null); + this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getTopicPrefix() + getTopicName()); - - frame.setBody("Hello World"); - - connV12.sendFrame(frame); + send(conn, getTopicPrefix() + getTopicName(), null, "Hello World"); // receive message from socket - frame = connV12.receiveFrame(1000); + ClientStompFrame frame = conn.receiveFrame(1000); System.out.println("received frame : " + frame); Assert.assertEquals("Hello World", frame.getBody()); - Assert.assertEquals("sub1", frame.getHeader("subscription")); + Assert.assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); frame = newConn.receiveFrame(1000); System.out.println("received 2 frame : " + frame); Assert.assertEquals("Hello World", frame.getBody()); - Assert.assertEquals("sub2", frame.getHeader("subscription")); + Assert.assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); // remove suscription - this.unsubscribe(connV12, "sub1", true); + this.unsubscribe(conn, "sub1", true); this.unsubscribe(newConn, "sub2", true); - connV12.disconnect(); + conn.disconnect(); newConn.disconnect(); } @Test public void testSendAndReceiveOnDifferentConnections() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame sendFrame = connV12.createFrame("SEND"); - sendFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - sendFrame.setBody("Hello World"); - - connV12.sendFrame(sendFrame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); StompClientConnection connV12_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); connV12_2.connect(defUser, defPass); - this.subscribe(connV12_2, "sub1", "auto"); + this.subscribe(connV12_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); ClientStompFrame frame = connV12_2.receiveFrame(2000); - Assert.assertEquals("MESSAGE", frame.getCommand()); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); Assert.assertEquals("Hello World", frame.getBody()); - connV12.disconnect(); + conn.disconnect(); connV12_2.disconnect(); } @@ -1425,80 +1302,80 @@ public class StompV12Test extends StompV11TestBase { @Test public void testBeginSameTransactionTwice() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - beginTransaction(connV12, "tx1"); + beginTransaction(conn, "tx1"); - beginTransaction(connV12, "tx1"); + beginTransaction(conn, "tx1"); - ClientStompFrame f = connV12.receiveFrame(); - Assert.assertTrue(f.getCommand().equals("ERROR")); + ClientStompFrame f = conn.receiveFrame(); + Assert.assertTrue(f.getCommand().equals(Stomp.Responses.ERROR)); } @Test public void testBodyWithUTF8() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, getName(), "auto"); + this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO); String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C"; System.out.println(text); - sendMessage(text); + sendJmsMessage(text); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); System.out.println(frame); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); Assert.assertTrue(frame.getBody().equals(text)); - connV12.disconnect(); + conn.disconnect(); } @Test public void testClientAckNotPartOfTransaction() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, getName(), "client"); + this.subscribe(conn, getName(), "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); Assert.assertTrue(frame.getBody().equals(getName())); - Assert.assertNotNull(frame.getHeader("message-id")); - Assert.assertNotNull(frame.getHeader("ack")); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE)); - String messageID = frame.getHeader("ack"); + String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); - beginTransaction(connV12, "tx1"); + beginTransaction(conn, "tx1"); - ack(connV12, messageID, "tx1"); + ack(conn, messageID, "tx1"); - abortTransaction(connV12, "tx1"); + abortTransaction(conn, "tx1"); - frame = connV12.receiveFrame(500); + frame = conn.receiveFrame(500); Assert.assertNull(frame); - this.unsubscribe(connV12, getName()); + this.unsubscribe(conn, getName()); - connV12.disconnect(); + conn.disconnect(); } @Test public void testDisconnectAndError() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, getName(), "client"); + this.subscribe(conn, getName(), "client"); - ClientStompFrame frame = connV12.createFrame("DISCONNECT"); - frame.addHeader("receipt", "1"); + ClientStompFrame frame = conn.createFrame("DISCONNECT"); + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1"); - ClientStompFrame result = connV12.sendFrame(frame); + ClientStompFrame result = conn.sendFrame(frame); - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { Assert.fail("Disconnect failed! " + result); } @@ -1507,12 +1384,9 @@ public class StompV12Test extends StompV11TestBase { Thread thr = new Thread() { @Override public void run() { - ClientStompFrame sendFrame = connV12.createFrame("SEND"); - sendFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - sendFrame.setBody("Hello World"); while (latch.getCount() != 0) { try { - connV12.sendFrame(sendFrame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); Thread.sleep(500); } catch (InterruptedException e) { //retry @@ -1525,7 +1399,7 @@ public class StompV12Test extends StompV11TestBase { latch.countDown(); break; } finally { - connV12.destroy(); + conn.destroy(); } } } @@ -1545,67 +1419,67 @@ public class StompV12Test extends StompV11TestBase { @Test public void testDurableSubscriber() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, "sub1", "client", getName()); + this.subscribe(conn, "sub1", "client", getName()); - this.subscribe(connV12, "sub1", "client", getName()); + this.subscribe(conn, "sub1", "client", getName()); - ClientStompFrame frame = connV12.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("ERROR")); + ClientStompFrame frame = conn.receiveFrame(); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR)); - waitDisconnect(connV12); - Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); + waitDisconnect(conn); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", conn.isConnected()); } @Test public void testDurableSubscriberWithReconnection() throws Exception { - connV12.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV12, "sub1", "auto", getName()); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); - ClientStompFrame frame = connV12.createFrame("DISCONNECT"); - frame.addHeader("receipt", "1"); + ClientStompFrame frame = conn.createFrame("DISCONNECT"); + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1"); - ClientStompFrame result = connV12.sendFrame(frame); + ClientStompFrame result = conn.sendFrame(frame); - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { Assert.fail("Disconnect failed! " + result); } // send the message when the durable subscriber is disconnected - sendMessage(getName(), topic); + sendJmsMessage(getName(), topic); - connV12.destroy(); - connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - connV12.connect(defUser, defPass, CLIENT_ID); + conn.destroy(); + conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV12, "sub1", "auto", getName()); + this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); // we must have received the message - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); - Assert.assertNotNull(frame.getHeader("destination")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); + Assert.assertNotNull(frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); Assert.assertEquals(getName(), frame.getBody()); - this.unsubscribe(connV12, "sub1"); + this.unsubscribe(conn, "sub1"); - connV12.disconnect(); + conn.disconnect(); } @Test public void testDurableUnSubscribe() throws Exception { - connV12.connect(defUser, defPass, CLIENT_ID); + conn.connect(defUser, defPass, CLIENT_ID); - this.subscribeTopic(connV12, null, "auto", getName()); + this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName()); - connV12.disconnect(); - connV12.destroy(); - connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - connV12.connect(defUser, defPass, CLIENT_ID); + conn.disconnect(); + conn.destroy(); + conn = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + conn.connect(defUser, defPass, CLIENT_ID); - this.unsubscribe(connV12, getName(), false, true); + this.unsubscribe(conn, getName(), null, false, true); long start = System.currentTimeMillis(); SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName()); @@ -1615,21 +1489,21 @@ public class StompV12Test extends StompV11TestBase { assertNull(server.getActiveMQServer().locateQueue(queueName)); - connV12.disconnect(); + conn.disconnect(); } @Test public void testJMSXGroupIdCanBeSet() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); frame.addHeader("JMSXGroupID", "TEST"); frame.setBody("Hello World"); - connV12.sendFrame(frame); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1643,64 +1517,64 @@ public class StompV12Test extends StompV11TestBase { int ctr = 10; String[] data = new String[ctr]; - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, "sub1", "auto"); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO); for (int i = 0; i < ctr; ++i) { data[i] = getName() + i; - sendMessage(data[i]); + sendJmsMessage(data[i]); } ClientStompFrame frame = null; for (int i = 0; i < ctr; ++i) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } for (int i = 0; i < ctr; ++i) { data[i] = getName() + ":second:" + i; - sendMessage(data[i]); + sendJmsMessage(data[i]); } for (int i = 0; i < ctr; ++i) { - frame = connV12.receiveFrame(); + frame = conn.receiveFrame(); Assert.assertTrue("Message not in order", frame.getBody().equals(data[i])); } - connV12.disconnect(); + conn.disconnect(); } @Test public void testSubscribeWithAutoAckAndSelector() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, "sub1", "auto", null, "foo = 'zzz'"); + this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'"); - sendMessage("Ignored message", "foo", "1234"); - sendMessage("Real message", "foo", "zzz"); + sendJmsMessage("Ignored message", "foo", "1234"); + sendJmsMessage("Real message", "foo", "zzz"); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message")); - connV12.disconnect(); + conn.disconnect(); } @Test public void testRedeliveryWithClientAck() throws Exception { - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - this.subscribe(connV12, "subId", "client"); + this.subscribe(conn, "subId", "client"); - sendMessage(getName()); + sendJmsMessage(getName()); - ClientStompFrame frame = connV12.receiveFrame(); + ClientStompFrame frame = conn.receiveFrame(); - Assert.assertTrue(frame.getCommand().equals("MESSAGE")); + Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE)); - connV12.disconnect(); + conn.disconnect(); // message should be received since message was not acknowledged MessageConsumer consumer = session.createConsumer(queue); @@ -1713,7 +1587,7 @@ public class StompV12Test extends StompV11TestBase { public void testSendManyMessages() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); int count = 1000; final CountDownLatch latch = new CountDownLatch(count); @@ -1731,30 +1605,22 @@ public class StompV12Test extends StompV11TestBase { } }); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - for (int i = 1; i <= count; i++) { - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); } Assert.assertTrue(latch.await(60, TimeUnit.SECONDS)); - connV12.disconnect(); + conn.disconnect(); } @Test public void testSendMessage() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); - - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); + conn.connect(defUser, defPass); - connV12.sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1773,18 +1639,16 @@ public class StompV12Test extends StompV11TestBase { public void testSendMessageWithContentLength() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); byte[] data = new byte[]{1, 0, 0, 4}; - ClientStompFrame frame = connV12.createFrame("SEND"); - - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody(new String(data, StandardCharsets.UTF_8)); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody(new String(data, StandardCharsets.UTF_8)) + .addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length)); - frame.addHeader("content-length", String.valueOf(data.length)); - - connV12.sendFrame(frame); + conn.sendFrame(frame); BytesMessage message = (BytesMessage) consumer.receive(10000); Assert.assertNotNull(message); @@ -1800,16 +1664,15 @@ public class StompV12Test extends StompV11TestBase { public void testSendMessageWithCustomHeadersAndSelector() throws Exception { MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("foo", "abc"); - frame.addHeader("bar", "123"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .setBody("Hello World"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - - connV12.sendFrame(frame); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1822,14 +1685,13 @@ public class StompV12Test extends StompV11TestBase { public void testSendMessageWithLeadingNewLine() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .setBody("Hello World"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.setBody("Hello World"); - - connV12.sendWickedFrame(frame); + conn.sendWickedFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1843,24 +1705,16 @@ public class StompV12Test extends StompV11TestBase { Assert.assertNull(consumer.receive(1000)); - connV12.disconnect(); + conn.disconnect(); } @Test public void testSendMessageWithReceipt() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("receipt", "1234"); - frame.setBody("Hello World"); - - frame = connV12.sendFrame(frame); - - Assert.assertTrue(frame.getCommand().equals("RECEIPT")); - Assert.assertEquals("1234", frame.getHeader("receipt-id")); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1872,28 +1726,27 @@ public class StompV12Test extends StompV11TestBase { long tmsg = message.getJMSTimestamp(); Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); - connV12.disconnect(); + conn.disconnect(); } @Test public void testSendMessageWithStandardHeaders() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("correlation-id", "c123"); - frame.addHeader("persistent", "true"); - frame.addHeader("priority", "3"); - frame.addHeader("type", "t345"); - frame.addHeader("JMSXGroupID", "abc"); - frame.addHeader("foo", "abc"); - frame.addHeader("bar", "123"); - - frame.setBody("Hello World"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("priority", "3") + .addHeader(Stomp.Headers.Message.TYPE, "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("foo", "abc") + .addHeader("bar", "123") + .setBody("Hello World"); - frame = connV12.sendFrame(frame); + conn.sendFrame(frame); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -1907,33 +1760,32 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); - connV12.disconnect(); + conn.disconnect(); } @Test public void testSendMessageWithLongHeaders() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - connV12.connect(defUser, defPass); + conn.connect(defUser, defPass); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < 2048; i++) { buffer.append("a"); } - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("correlation-id", "c123"); - frame.addHeader("persistent", "true"); - frame.addHeader("priority", "3"); - frame.addHeader("type", "t345"); - frame.addHeader("JMSXGroupID", "abc"); - frame.addHeader("foo", "abc"); - frame.addHeader("very-very-long-stomp-message-header", buffer.toString()); - - frame.setBody("Hello World"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("priority", "3") + .addHeader(Stomp.Headers.Message.TYPE, "t345") + .addHeader("JMSXGroupID", "abc") +