Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F01ED200BC9 for ; Fri, 11 Nov 2016 20:23:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EEA53160B15; Fri, 11 Nov 2016 19:23:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DF3F160B01 for ; Fri, 11 Nov 2016 20:23:33 +0100 (CET) Received: (qmail 27458 invoked by uid 500); 11 Nov 2016 19:23:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 27249 invoked by uid 99); 11 Nov 2016 19:23:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 19:23:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB3EBEEE34; Fri, 11 Nov 2016 19:23:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Fri, 11 Nov 2016 19:23:35 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/11] activemq-artemis git commit: Stomp refactor + track autocreation for addresses archived-at: Fri, 11 Nov 2016 19:23:35 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java index 7a1a529..d32823b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java @@ -18,52 +18,47 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; -/** - * pls use factory to create frames. - */ +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; + public class StompClientConnectionV10 extends AbstractStompClientConnection { public StompClientConnectionV10(String host, int port) throws IOException { super("1.0", host, port); } + public StompClientConnectionV10(String version, String host, int port) throws IOException { + super(version, host, port); + } + @Override public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - - ClientStompFrame response = this.sendFrame(frame); - - if (response.getCommand().equals(CONNECTED_COMMAND)) { - connected = true; - } else { - System.out.println("Connection failed with: " + response); - connected = false; - } - return response; + return connect(username, passcode, null); } @Override - public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - frame.addHeader(CLIENT_ID_HEADER, clientID); + public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { + ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.LOGIN, username); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode); + if (clientID != null) { + frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID); + } ClientStompFrame response = this.sendFrame(frame); - if (response.getCommand().equals(CONNECTED_COMMAND)) { + if (response.getCommand().equals(Stomp.Responses.CONNECTED)) { connected = true; } else { - System.out.println("Connection failed with: " + response); + IntegrationTestLogger.LOGGER.warn("Connection failed with: " + response); connected = false; } + return response; } @Override public void disconnect() throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND); + ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT); this.sendFrame(frame); close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java index aa07145..cfc8f92 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java @@ -17,55 +17,40 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import java.util.UUID; -public class StompClientConnectionV11 extends AbstractStompClientConnection { +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; + +public class StompClientConnectionV11 extends StompClientConnectionV10 { public StompClientConnectionV11(String host, int port) throws IOException { super("1.1", host, port); } - @Override - public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(ACCEPT_HEADER, "1.1"); - frame.addHeader(HOST_HEADER, "localhost"); - if (username != null) { - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - } - - ClientStompFrame response = this.sendFrame(frame); - - if (response.getCommand().equals(CONNECTED_COMMAND)) { - String version = response.getHeader(VERSION_HEADER); - assert (version.equals("1.1")); - - this.username = username; - this.passcode = passcode; - this.connected = true; - } else { - connected = false; - } - return response; + public StompClientConnectionV11(String version, String host, int port) throws IOException { + super(version, host, port); } @Override - public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(ACCEPT_HEADER, "1.1"); - frame.addHeader(HOST_HEADER, "localhost"); - frame.addHeader(CLIENT_ID_HEADER, clientID); + public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { + ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, getVersion()); + frame.addHeader(Stomp.Headers.Connect.HOST, "localhost"); + if (clientID != null) { + frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID); + } if (username != null) { - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); + frame.addHeader(Stomp.Headers.Connect.LOGIN, username); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode); } ClientStompFrame response = this.sendFrame(frame); - if (response.getCommand().equals(CONNECTED_COMMAND)) { - String version = response.getHeader(VERSION_HEADER); - assert (version.equals("1.1")); + if (Stomp.Responses.CONNECTED.equals(response.getCommand())) { + String version = response.getHeader(Stomp.Headers.Connected.VERSION); + if (!version.equals(getVersion())) + throw new IllegalStateException("incorrect version!"); this.username = username; this.passcode = passcode; @@ -73,22 +58,24 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection { } else { connected = false; } + return response; } public void connect1(String username, String passcode) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(STOMP_COMMAND); - frame.addHeader(ACCEPT_HEADER, "1.0,1.1"); - frame.addHeader(HOST_HEADER, "127.0.0.1"); + ClientStompFrame frame = factory.newFrame(Stomp.Commands.STOMP); + frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1"); + frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1"); if (username != null) { - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); + frame.addHeader(Stomp.Headers.Connect.LOGIN, username); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode); } ClientStompFrame response = this.sendFrame(frame); - if (response.getCommand().equals(CONNECTED_COMMAND)) { - String version = response.getHeader(VERSION_HEADER); - assert (version.equals("1.1")); + if (Stomp.Responses.CONNECTED.equals(response.getCommand())) { + String version = response.getHeader(Stomp.Headers.Connected.VERSION); + if (!version.equals(getVersion())) + throw new IllegalStateException("incorrect version!"); this.username = username; this.passcode = passcode; @@ -103,12 +90,15 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection { public void disconnect() throws IOException, InterruptedException { stopPinger(); - ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND); - frame.addHeader("receipt", "1"); + ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT); + + String uuid = UUID.randomUUID().toString(); + + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); ClientStompFrame result = this.sendFrame(frame); - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) { throw new IOException("Disconnect failed! " + result); } @@ -122,4 +112,28 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection { return factory.newFrame(command); } + @Override + public void startPinger(long interval) { + pinger = new Pinger(interval); + pinger.startPing(); + } + + @Override + public void stopPinger() { + if (pinger != null) { + pinger.stopPing(); + try { + pinger.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + pinger = null; + } + } + + @Override + public int getServerPingNumber() { + return serverPingCounter; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java index fb77832..2d8f354 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java @@ -18,90 +18,13 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; -public class StompClientConnectionV12 extends AbstractStompClientConnection { +public class StompClientConnectionV12 extends StompClientConnectionV11 { public StompClientConnectionV12(String host, int port) throws IOException { super("1.2", host, port); } - @Override - public ClientStompFrame createFrame(String command) { - return factory.newFrame(command); - } - - @Override - public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(ACCEPT_HEADER, "1.2"); - frame.addHeader(HOST_HEADER, "localhost"); - if (username != null) { - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - } - - ClientStompFrame response = this.sendFrame(frame); - - if (response.getCommand().equals(CONNECTED_COMMAND)) { - String version = response.getHeader(VERSION_HEADER); - if (!version.equals("1.2")) - throw new IllegalStateException("incorrect version!"); - - this.username = username; - this.passcode = passcode; - this.connected = true; - } else { - connected = false; - } - return response; - } - - @Override - public void disconnect() throws IOException, InterruptedException { - stopPinger(); - - ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND); - frame.addHeader("receipt", "1"); - - ClientStompFrame result = this.sendFrame(frame); - - if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) { - throw new IOException("Disconnect failed! " + result); - } - - close(); - - connected = false; - } - - @Override - public void connect(String username, String passcode, String clientID) throws Exception { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(ACCEPT_HEADER, "1.2"); - frame.addHeader(HOST_HEADER, "localhost"); - frame.addHeader(CLIENT_ID_HEADER, clientID); - - if (username != null) { - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - } - - ClientStompFrame response = this.sendFrame(frame); - - if (response.getCommand().equals(CONNECTED_COMMAND)) { - String version = response.getHeader(VERSION_HEADER); - if (!version.equals("1.2")) - throw new IllegalStateException("incorrect version!"); - - this.username = username; - this.passcode = passcode; - this.connected = true; - } else { - connected = false; - } - } - public ClientStompFrame createAnyFrame(String command) { return factory.newAnyFrame(command); } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java index 3ec03cf..1c78c5a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java @@ -24,4 +24,6 @@ public interface StompFrameFactory { ClientStompFrame newAnyFrame(String command); + String[] handleHeaders(String header); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java index 5ed8566..8813fd6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java @@ -38,18 +38,18 @@ import java.util.StringTokenizer; public class StompFrameFactoryV10 implements StompFrameFactory { @Override - public ClientStompFrame createFrame(String data) { + public ClientStompFrame createFrame(final String data) { //split the string at "\n\n" String[] dataFields = data.split("\n\n"); StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n"); String command = tokenizer.nextToken(); - ClientStompFrame frame = new ClientStompFrameV10(command); + ClientStompFrame frame = newFrame(command); while (tokenizer.hasMoreTokens()) { String header = tokenizer.nextToken(); - String[] fields = header.split(":"); + String[] fields = handleHeaders(header); frame.addHeader(fields[0], fields[1]); } @@ -61,6 +61,11 @@ public class StompFrameFactoryV10 implements StompFrameFactory { } @Override + public String[] handleHeaders(String header) { + return header.split(":"); + } + + @Override public ClientStompFrame newFrame(String command) { return new ClientStompFrameV10(command); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java index 4de0d7d..807b3f0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java @@ -36,32 +36,10 @@ import java.util.StringTokenizer; * 13. RECEIPT * 14. ERROR */ -public class StompFrameFactoryV11 implements StompFrameFactory { +public class StompFrameFactoryV11 extends StompFrameFactoryV10 { @Override - public ClientStompFrame createFrame(final String data) { - //split the string at "\n\n" - String[] dataFields = data.split("\n\n"); - - StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n"); - - String command = tokenizer.nextToken(); - ClientStompFrame frame = new ClientStompFrameV11(command); - - while (tokenizer.hasMoreTokens()) { - String header = tokenizer.nextToken(); - String[] fields = splitAndDecodeHeader(header); - frame.addHeader(fields[0], fields[1]); - } - - //body (without null byte) - if (dataFields.length == 2) { - frame.setBody(dataFields[1]); - } - return frame; - } - - private String[] splitAndDecodeHeader(String header) { + public String[] handleHeaders(String header) { // split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header // except for the one separating the key and value String[] result = header.split(":"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java index 5223b4e..dbd32e0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java @@ -18,42 +18,10 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.util.StringTokenizer; -public class StompFrameFactoryV12 implements StompFrameFactory { +public class StompFrameFactoryV12 extends StompFrameFactoryV11 { @Override - public ClientStompFrame createFrame(String data) { - //split the string at "\n\n" - String[] dataFields = data.split("\n\n"); - - StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n"); - - String command = tokenizer.nextToken(); - ClientStompFrame frame = new ClientStompFrameV12(command); - - while (tokenizer.hasMoreTokens()) { - String header = tokenizer.nextToken(); - String[] fields = splitAndDecodeHeader(header); - frame.addHeader(fields[0], fields[1]); - } - - //body (without null byte) - if (dataFields.length == 2) { - frame.setBody(dataFields[1]); - } - return frame; - } - - public void printByteHeader(String headers) { - StringBuffer buffer = new StringBuffer(); - - for (int i = 0; i < headers.length(); i++) { - char c = headers.charAt(i); - buffer.append((byte) c + " "); - } - System.out.println("header in byte : " + buffer.toString()); - } - - private String[] splitAndDecodeHeader(String header) { + public String[] handleHeaders(String header) { // split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header // except for the one separating the key and value String[] result = header.split(":"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java index a5d3068..5414a9f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.v11; import java.nio.charset.StandardCharsets; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; @@ -28,15 +30,18 @@ import org.junit.Test; /* * Some Stomp tests against server with persistence enabled are put here. */ -public class ExtraStompTest extends StompV11TestBase { +public class ExtraStompTest extends StompTestBase { private StompClientConnection connV10; private StompClientConnection connV11; + public boolean isPersistenceEnabled() { + return true; + } + @Override @Before public void setUp() throws Exception { - persistenceEnabled = true; super.setUp(); connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); connV10.connect(defUser, defPass); @@ -57,331 +62,142 @@ public class ExtraStompTest extends StompV11TestBase { @Test public void testSendAndReceive10() throws Exception { - String msg1 = "Hello World 1!"; - String msg2 = "Hello World 2!"; - - ClientStompFrame frame = connV10.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length)); - frame.addHeader("persistent", "true"); - frame.setBody(msg1); - - connV10.sendFrame(frame); - - ClientStompFrame frame2 = connV10.createFrame("SEND"); - frame2.addHeader("destination", getQueuePrefix() + getQueueName()); - frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length)); - frame2.addHeader("persistent", "true"); - frame2.setBody(msg2); - - connV10.sendFrame(frame2); - - ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV10.sendFrame(subFrame); - - frame = connV10.receiveFrame(); - - System.out.println("received " + frame); - - assertEquals("MESSAGE", frame.getCommand()); - - assertEquals("a-sub", frame.getHeader("subscription")); - - assertNotNull(frame.getHeader("message-id")); - - assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); - - assertEquals(msg1, frame.getBody()); - - frame = connV10.receiveFrame(); - - System.out.println("received " + frame); - - assertEquals("MESSAGE", frame.getCommand()); - - assertEquals("a-sub", frame.getHeader("subscription")); - - assertNotNull(frame.getHeader("message-id")); - - assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); - - assertEquals(msg2, frame.getBody()); - - //unsub - ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV10.sendFrame(unsubFrame); - + testSendAndReceive(connV10); } @Test public void testSendAndReceive11() throws Exception { + testSendAndReceive(connV11); + } + + public void testSendAndReceive(StompClientConnection conn) throws Exception { String msg1 = "Hello World 1!"; String msg2 = "Hello World 2!"; - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length)); - frame.addHeader("persistent", "true"); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length)); + frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame.setBody(msg1); - connV11.sendFrame(frame); + conn.sendFrame(frame); - ClientStompFrame frame2 = connV11.createFrame("SEND"); - frame2.addHeader("destination", getQueuePrefix() + getQueueName()); - frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length)); - frame2.addHeader("persistent", "true"); + ClientStompFrame frame2 = conn.createFrame(Stomp.Commands.SEND); + frame2.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame2.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length)); + frame2.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame2.setBody(msg2); - connV11.sendFrame(frame2); - - ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV11.sendFrame(subFrame); - - frame = connV11.receiveFrame(); - - System.out.println("received " + frame); - - assertEquals("MESSAGE", frame.getCommand()); + conn.sendFrame(frame2); - assertEquals("a-sub", frame.getHeader("subscription")); + subscribe(conn, "a-sub"); - assertNotNull(frame.getHeader("message-id")); - - assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); + frame = conn.receiveFrame(); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); + assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); + assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); assertEquals(msg1, frame.getBody()); - frame = connV11.receiveFrame(); - - System.out.println("received " + frame); - - assertEquals("MESSAGE", frame.getCommand()); - - assertEquals("a-sub", frame.getHeader("subscription")); - - assertNotNull(frame.getHeader("message-id")); - - assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination")); + frame = conn.receiveFrame(); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION)); + assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID)); + assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION)); assertEquals(msg2, frame.getBody()); - //unsub - ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV11.sendFrame(unsubFrame); + unsubscribe(conn, "a-sub"); } @Test public void testNoGarbageAfterPersistentMessageV10() throws Exception { - ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV10.sendFrame(subFrame); + testNoGarbageAfterPersistentMessage(connV10); + } - ClientStompFrame frame = connV10.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); + @Test + public void testNoGarbageAfterPersistentMessageV11() throws Exception { + testNoGarbageAfterPersistentMessage(connV11); + } + + public void testNoGarbageAfterPersistentMessage(StompClientConnection conn) throws Exception { + subscribe(conn, "a-sub"); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11"); + frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame.setBody("Hello World"); - connV10.sendFrame(frame); + conn.sendFrame(frame); - frame = connV10.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); + frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11"); + frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame.setBody("Hello World"); - connV10.sendFrame(frame); - - frame = connV10.receiveFrame(10000); + conn.sendFrame(frame); - System.out.println("received: " + frame); + frame = conn.receiveFrame(10000); assertEquals("Hello World", frame.getBody()); //if activemq sends trailing garbage bytes, the second message //will not be normal - frame = connV10.receiveFrame(10000); - - System.out.println("received: " + frame); + frame = conn.receiveFrame(10000); assertEquals("Hello World", frame.getBody()); - //unsub - ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV10.sendFrame(unsubFrame); - + unsubscribe(conn, "a-sub"); } @Test public void testNoGarbageOnPersistentRedeliveryV10() throws Exception { - - ClientStompFrame frame = connV10.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); - frame.setBody("Hello World"); - - connV10.sendFrame(frame); - - frame = connV10.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); - frame.setBody("Hello World"); - - connV10.sendFrame(frame); - - ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "client"); - - connV10.sendFrame(subFrame); - - // receive but don't ack - frame = connV10.receiveFrame(10000); - frame = connV10.receiveFrame(10000); - - System.out.println("received: " + frame); - - //unsub - ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV10.sendFrame(unsubFrame); - - subFrame = connV10.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV10.sendFrame(subFrame); - - frame = connV10.receiveFrame(10000); - frame = connV10.receiveFrame(10000); - - //second receive will get problem if trailing bytes - assertEquals("Hello World", frame.getBody()); - - System.out.println("received again: " + frame); - - //unsub - unsubFrame = connV10.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV10.sendFrame(unsubFrame); + testNoGarbageOnPersistentRedelivery(connV10); } @Test - public void testNoGarbageAfterPersistentMessageV11() throws Exception { - ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV11.sendFrame(subFrame); - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); - frame.setBody("Hello World"); - - connV11.sendFrame(frame); - - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); - frame.setBody("Hello World"); - - connV11.sendFrame(frame); - frame = connV11.receiveFrame(10000); - - System.out.println("received: " + frame); - - assertEquals("Hello World", frame.getBody()); - - //if activemq sends trailing garbage bytes, the second message - //will not be normal - frame = connV11.receiveFrame(10000); - - System.out.println("received: " + frame); - - assertEquals("Hello World", frame.getBody()); - - //unsub - ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV11.sendFrame(unsubFrame); + public void testNoGarbageOnPersistentRedeliveryV11() throws Exception { + testNoGarbageOnPersistentRedelivery(connV11); } - @Test - public void testNoGarbageOnPersistentRedeliveryV11() throws Exception { - ClientStompFrame frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); + public void testNoGarbageOnPersistentRedelivery(StompClientConnection conn) throws Exception { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11"); + frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame.setBody("Hello World"); - connV11.sendFrame(frame); + conn.sendFrame(frame); - frame = connV11.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("content-length", "11"); - frame.addHeader("persistent", "true"); + frame = conn.createFrame(Stomp.Commands.SEND); + frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName()); + frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11"); + frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString()); frame.setBody("Hello World"); - connV11.sendFrame(frame); - - ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "client"); + conn.sendFrame(frame); - connV11.sendFrame(subFrame); + subscribe(conn, "a-sub", Stomp.Headers.Subscribe.AckModeValues.CLIENT); // receive but don't ack - frame = connV11.receiveFrame(10000); - frame = connV11.receiveFrame(10000); - - System.out.println("received: " + frame); + frame = conn.receiveFrame(10000); + frame = conn.receiveFrame(10000); - //unsub - ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV11.sendFrame(unsubFrame); - - subFrame = connV11.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); + unsubscribe(conn, "a-sub"); - connV11.sendFrame(subFrame); + subscribe(conn, "a-sub"); - frame = connV11.receiveFrame(10000); - frame = connV11.receiveFrame(10000); + frame = conn.receiveFrame(10000); + frame = conn.receiveFrame(10000); //second receive will get problem if trailing bytes assertEquals("Hello World", frame.getBody()); - System.out.println("received again: " + frame); - //unsub - unsubFrame = connV11.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV11.sendFrame(unsubFrame); + unsubscribe(conn, "a-sub"); } }