activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [07/11] activemq-artemis git commit: Stomp refactor + track autocreation for addresses
Date Fri, 11 Nov 2016 19:23:37 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 90d18ae..91ab5d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -42,48 +42,70 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class StompTest extends StompTestBase {
 
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+   protected StompClientConnection conn;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         boolean connected = conn != null && conn.isConnected();
+         log.debug("Connection 1.0 connected: " + connected);
+         if (connected) {
+            conn.disconnect();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
 
    @Test
    public void testConnectionTTL() throws Exception {
-      int index = 1;
       int port = 61614;
 
       server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start();
-      createBootstrap(index, port);
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(index, frame);
-      frame = receiveFrame(index, 10000);
-
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect("brianm", "wombats");
 
       Thread.sleep(5000);
 
-      assertTrue(receiveFrame(index, 10000).indexOf(Stomp.Responses.ERROR) != -1);
+      ClientStompFrame frame = conn.receiveFrame();
 
-      assertChannelClosed(index);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
+      assertFalse(conn.isConnected());
    }
 
    @Test
    public void testSendManyMessages() throws Exception {
-      MessageConsumer consumer = session.createConsumer(queue);
+      conn.connect(defUser, defPass);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(10000);
+      MessageConsumer consumer = session.createConsumer(queue);
 
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
       int count = 1000;
       final CountDownLatch latch = new CountDownLatch(count);
       consumer.setMessageListener(new MessageListener() {
@@ -94,11 +116,8 @@ public class StompTest extends StompTestBase {
          }
       });
 
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
       for (int i = 1; i <= count; i++) {
-         // Thread.sleep(1);
-         // System.out.println(">>> " + i);
-         sendFrame(frame);
+         send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
       }
 
       assertTrue(latch.await(60, TimeUnit.SECONDS));
@@ -110,11 +129,7 @@ public class StompTest extends StompTestBase {
       try {
          MessageConsumer consumer = session.createConsumer(queue);
 
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         frame = receiveFrame(10000);
-
-         Assert.assertTrue(frame.startsWith("CONNECTED"));
+         conn.connect(defUser, defPass);
          int count = 1000;
          final CountDownLatch latch = new CountDownLatch(count);
          consumer.setMessageListener(new MessageListener() {
@@ -125,13 +140,14 @@ public class StompTest extends StompTestBase {
             }
          });
 
-         ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick();
+         ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor()
+                                                          .setMaxUsage(0)
+                                                          .tick();
 
-         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
          for (int i = 1; i <= count; i++) {
             // Thread.sleep(1);
-            // System.out.println(">>> " + i);
-            sendFrame(frame);
+            // log.info(">>> " + i);
+            send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
          }
 
          // It should encounter the exception on logs
@@ -140,52 +156,44 @@ public class StompTest extends StompTestBase {
          AssertionLoggerHandler.clear();
          AssertionLoggerHandler.stopCapture();
       }
-
    }
 
    @Test
    public void testConnect() throws Exception {
-
-      String connect_frame = "CONNECT\n" + "login: brianm\n" +
-         "passcode: wombats\n" +
-         "request-id: 1\n" +
-         "\n" +
-         Stomp.NULL;
-      sendFrame(connect_frame);
-
-      String f = receiveFrame(10000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
-      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                   .addHeader(Stomp.Headers.Connect.LOGIN, defUser)
+                                   .addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
+                                   .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
+      ClientStompFrame response = conn.sendFrame(frame);
+
+      Assert.assertTrue(response.getCommand()
+                                .equals(Stomp.Responses.CONNECTED));
+      Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
+                                .equals("1"));
    }
 
    @Test
    public void testDisconnectAndError() throws Exception {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                   .addHeader(Stomp.Headers.Connect.LOGIN, defUser)
+                                   .addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
+                                   .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
+      ClientStompFrame response = conn.sendFrame(frame);
 
-      String connectFrame = "CONNECT\n" + "login: brianm\n" +
-         "passcode: wombats\n" +
-         "request-id: 1\n" +
-         "\n" +
-         Stomp.NULL;
-      sendFrame(connectFrame);
+      Assert.assertTrue(response.getCommand()
+                                .equals(Stomp.Responses.CONNECTED));
+      Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
+                                .equals("1"));
 
-      String f = receiveFrame(10000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
-      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
-      String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
-      sendFrame(disconnectFrame);
-
-      waitForFrameToTakeEffect();
+      conn.disconnect();
 
       // sending a message will result in an error
-      String frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-
-      assertChannelClosed();
+      try {
+         send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+         fail("Should have thrown an exception since connection is disconnected");
+      } catch (Exception e) {
+         // ignore
+      }
    }
 
    @Test
@@ -193,15 +201,9 @@ public class StompTest extends StompTestBase {
 
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
-
-      sendFrame(frame);
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -218,21 +220,16 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void sendSTOMPReceiveMQTT() throws Exception {
-      String address = "myTestAddress";
-
       // Set up MQTT Subscription
       MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
-      clientProvider.connect("tcp://localhost:61616");
-      clientProvider.subscribe(address, 0);
+      clientProvider.connect("tcp://" + hostname + ":" + port);
+      clientProvider.subscribe(getTopicPrefix() + getTopicName(), 0);
 
       String stompPayload = "This is a test message";
 
       // Set up STOMP connection and send STOMP Message
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      send(conn, getTopicPrefix() + getTopicName(), null, stompPayload);
 
       // Receive MQTT Message
       byte[] mqttPayload = clientProvider.receive(10000);
@@ -244,44 +241,30 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void sendMQTTReceiveSTOMP() throws Exception {
-      String address = "myTestAddress";
       String payload = "This is a test message";
 
-      server.getActiveMQServer().createQueue(new SimpleString(address), new SimpleString(address), null, false, false);
-
       // Set up STOMP subscription
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SUBSCRIBE\n" + "destination:" + address + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      receiveFrame(1000);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       // Send MQTT Message
       MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
-      clientProvider.connect("tcp://localhost:61616");
-      clientProvider.publish(address, payload.getBytes(), 0);
+      clientProvider.connect("tcp://" + hostname + ":" + port);
+      clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0);
       clientProvider.disconnect();
 
       // Receive STOMP Message
-      frame = receiveFrame(1000);
-      assertTrue(frame.contains(payload));
+      ClientStompFrame frame = conn.receiveFrame();
+      assertTrue(frame.getBody()
+                      .contains(payload));
 
    }
 
    @Test
    public void testSendMessageToNonExistentQueue() throws Exception {
       String nonExistentQueue = RandomUtil.randomString();
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistentQueue + "\n\n" + "Hello World" + Stomp.NULL;
-
-      sendFrame(frame);
-      receiveFrame(1000);
+      conn.connect(defUser, defPass);
+      send(conn, getQueuePrefix() + nonExistentQueue, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
 
       MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistentQueue));
       TextMessage message = (TextMessage) consumer.receive(1000);
@@ -297,29 +280,26 @@ public class StompTest extends StompTestBase {
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
 
       // closing the consumer here should trigger auto-deletion
-      assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
+      assertNotNull(server.getActiveMQServer()
+                          .getPostOffice()
+                          .getBinding(new SimpleString(nonExistentQueue)));
       consumer.close();
-      assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
+      assertNull(server.getActiveMQServer()
+                       .getPostOffice()
+                       .getBinding(new SimpleString(nonExistentQueue)));
    }
 
    @Test
    public void testSendMessageToNonExistentTopic() throws Exception {
       String nonExistentTopic = RandomUtil.randomString();
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
       // first send a message to ensure that sending to a non-existent topic won't throw an error
-      frame = "SEND\n" + "destination:" + getTopicPrefix() + nonExistentTopic + "\n\n" + "Hello World" + Stomp.NULL;
-      sendFrame(frame);
-      receiveFrame(1000);
+      send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true);
 
       // create a subscription on the topic and send/receive another message
       MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createTopic(nonExistentTopic));
-      sendFrame(frame);
-      receiveFrame(1000);
+      send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true);
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
@@ -332,11 +312,13 @@ public class StompTest extends StompTestBase {
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
 
-      assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic)));
+      assertNotNull(server.getActiveMQServer()
+                          .getAddressInfo(new SimpleString(nonExistentTopic)));
 
-      // closing the consumer here should trigger auto-deletion of the topic
+      // closing the consumer here should trigger auto-deletion of the subscription queue and address
       consumer.close();
-      assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic)));
+      assertNull(server.getActiveMQServer()
+                       .getAddressInfo(new SimpleString(nonExistentTopic)));
    }
 
    /*
@@ -346,29 +328,22 @@ public class StompTest extends StompTestBase {
     */
    @Test
    public void testSendMessageWithLeadingNewLine() throws Exception {
+      conn.connect(defUser, defPass);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .setBody("Hello World");
+      conn.sendWickedFrame(frame);
 
       MessageConsumer consumer = session.createConsumer(queue);
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL + "\n";
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL +
-         "\n";
-
-      sendFrame(frame);
-
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
 
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
+      message = (TextMessage) consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
@@ -380,25 +355,9 @@ public class StompTest extends StompTestBase {
    public void testSendMessageWithReceipt() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "receipt: 1234\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-
-      sendFrame(frame);
-
-      String f = receiveFrame(10000);
-      Assert.assertTrue(f.startsWith("RECEIPT"));
-      Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -413,30 +372,19 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testSendMessageWithContentLength() throws Exception {
-
       MessageConsumer consumer = session.createConsumer(queue);
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
       byte[] data = new byte[]{1, 0, 0, 4};
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "content-length:" +
-         data.length +
-         "\n\n";
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      baos.write(frame.getBytes(StandardCharsets.UTF_8));
       baos.write(data);
-      baos.write('\0');
       baos.flush();
-      sendFrame(new String(baos.toByteArray()));
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length))
+                                   .setBody(new String(baos.toByteArray()));
+      conn.sendFrame(frame);
 
       BytesMessage message = (BytesMessage) consumer.receive(10000);
       Assert.assertNotNull(message);
@@ -449,30 +397,22 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testJMSXGroupIdCanBeSet() throws Exception {
-
+      final String jmsxGroupID = "JMSXGroupID";
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "JMSXGroupID: TEST\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-
-      sendFrame(frame);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("JMSXGroupID", jmsxGroupID)
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
       // differ from StompConnect
-      Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+      Assert.assertEquals(jmsxGroupID, message.getStringProperty("JMSXGroupID"));
    }
 
    @Test
@@ -480,22 +420,14 @@ public class StompTest extends StompTestBase {
 
       MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SEND\n" + "foo:abc\n" +
-         "bar:123\n" +
-         "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-
-      sendFrame(frame);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -509,22 +441,14 @@ public class StompTest extends StompTestBase {
 
       MessageConsumer consumer = session.createConsumer(queue, "hyphenated_props:b-ar = '123'");
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SEND\n" + "foo:abc\n" +
-         "b-ar:123\n" +
-         "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
+      conn.connect(defUser, defPass);
 
-      sendFrame(frame);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("b-ar", "123")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -538,27 +462,19 @@ public class StompTest extends StompTestBase {
 
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SEND\n" + "correlation-id:c123\n" +
-         "persistent:true\n" +
-         "priority:3\n" +
-         "type:t345\n" +
-         "JMSXGroupID:abc\n" +
-         "foo:abc\n" +
-         "bar:123\n" +
-         "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
+      conn.connect(defUser, defPass);
 
-      sendFrame(frame);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .addHeader("correlation-id", "c123")
+                                   .addHeader("persistent", "true")
+                                   .addHeader("type", "t345")
+                                   .addHeader("JMSXGroupID", "abc")
+                                   .addHeader("priority", "3")
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -579,33 +495,25 @@ public class StompTest extends StompTestBase {
    public void testSendMessageWithLongHeaders() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
       StringBuffer buffer = new StringBuffer();
       for (int i = 0; i < 1024; i++) {
          buffer.append("a");
       }
-      String longHeader = "longHeader:" + buffer.toString() + "\n";
-
-      frame = "SEND\n" + "correlation-id:c123\n" +
-         "persistent:true\n" +
-         "priority:3\n" +
-         "type:t345\n" +
-         "JMSXGroupID:abc\n" +
-         "foo:abc\n" +
-         longHeader +
-         "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-
-      sendFrame(frame);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .addHeader("correlation-id", "c123")
+                                   .addHeader("persistent", "true")
+                                   .addHeader("type", "t345")
+                                   .addHeader("JMSXGroupID", "abc")
+                                   .addHeader("priority", "3")
+                                   .addHeader("longHeader", buffer.toString())
+                                   .setBody("Hello World");
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -615,37 +523,30 @@ public class StompTest extends StompTestBase {
       Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
       Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
       Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
-      Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader").length());
+      Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader")
+                                                     .length());
 
       Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
    }
 
    @Test
    public void testSubscribeWithAutoAck() throws Exception {
+      conn.connect(defUser, defPass);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName());
 
-      sendMessage(getName());
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
 
-      frame = receiveFrame(10000);
-      System.out.println("-------- frame received: " + frame);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-
-      Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
-      Matcher cl_matcher = cl.matcher(frame);
+      Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+      Matcher cl_matcher = cl.matcher(frame.toString());
       Assert.assertFalse(cl_matcher.find());
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
 
       // message should not be received as it was auto-acked
       MessageConsumer consumer = session.createConsumer(queue);
@@ -656,48 +557,32 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       byte[] payload = new byte[]{1, 2, 3, 4, 5};
-      sendMessage(payload, queue);
 
-      frame = receiveFrame(10000);
+      sendJmsMessage(payload, queue);
 
-      System.out.println("Message: " + frame);
+      ClientStompFrame frame = conn.receiveFrame(10000);
 
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
-      Matcher cl_matcher = cl.matcher(frame);
+      Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+      Matcher cl_matcher = cl.matcher(frame.toString());
       Assert.assertTrue(cl_matcher.find());
       Assert.assertEquals("5", cl_matcher.group(1));
 
-      Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
-      Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
+      Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame.toString()).find());
+      Assert.assertTrue(frame.getBody().toString().indexOf(new String(payload)) > -1);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithMessageSentWithProperties() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       MessageProducer producer = session.createProducer(queue);
       BytesMessage message = session.createBytesMessage();
@@ -712,79 +597,55 @@ public class StompTest extends StompTestBase {
       message.writeBytes("Hello World".getBytes(StandardCharsets.UTF_8));
       producer.send(message);
 
-      frame = receiveFrame(10000);
+      ClientStompFrame frame = conn.receiveFrame(10000);
       Assert.assertNotNull(frame);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("S:") > 0);
-      Assert.assertTrue(frame.indexOf("n:") > 0);
-      Assert.assertTrue(frame.indexOf("byte:") > 0);
-      Assert.assertTrue(frame.indexOf("d:") > 0);
-      Assert.assertTrue(frame.indexOf("f:") > 0);
-      Assert.assertTrue(frame.indexOf("i:") > 0);
-      Assert.assertTrue(frame.indexOf("l:") > 0);
-      Assert.assertTrue(frame.indexOf("s:") > 0);
-      Assert.assertTrue(frame.indexOf("Hello World") > 0);
-
-      // System.out.println("out: "+frame);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("value", frame.getHeader("S"));
+      Assert.assertEquals("false", frame.getHeader("n"));
+      Assert.assertEquals("9", frame.getHeader("byte"));
+      Assert.assertEquals("2.0", frame.getHeader("d"));
+      Assert.assertEquals("6.0", frame.getHeader("f"));
+      Assert.assertEquals("10", frame.getHeader("i"));
+      Assert.assertEquals("121", frame.getHeader("l"));
+      Assert.assertEquals("12", frame.getHeader("s"));
+      Assert.assertEquals("Hello World", frame.getBody());
+
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithID() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName());
 
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
+      Assert.assertEquals("mysubid", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+      Assert.assertEquals(getName(), frame.getBody());
 
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "ack:auto\n" +
-         "id: mysubid\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      sendMessage(getName());
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf("subscription:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
+   //
    @Test
    public void testBodyWithUTF8() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
-      System.out.println(text);
-      sendMessage(text);
+      log.info(text);
+      sendJmsMessage(text);
 
-      frame = receiveFrame(10000);
-      System.out.println(frame);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(text) > 0);
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      log.info(frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
+      Assert.assertEquals(text, frame.getBody());
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
@@ -792,88 +653,54 @@ public class StompTest extends StompTestBase {
       int ctr = 10;
       String[] data = new String[ctr];
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       for (int i = 0; i < ctr; ++i) {
          data[i] = getName() + i;
-         sendMessage(data[i]);
+         sendJmsMessage(data[i]);
       }
 
       for (int i = 0; i < ctr; ++i) {
-         frame = receiveFrame(1000);
-         Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+         ClientStompFrame frame = conn.receiveFrame(1000);
+         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
       }
 
       // sleep a while before publishing another set of messages
-      waitForFrameToTakeEffect();
+      Thread.sleep(200);
 
       for (int i = 0; i < ctr; ++i) {
-         data[i] = getName() + ":second:" + i;
-         sendMessage(data[i]);
+         data[i] = getName() + Stomp.Headers.SEPARATOR + "second:" + i;
+         sendJmsMessage(data[i]);
       }
 
       for (int i = 0; i < ctr; ++i) {
-         frame = receiveFrame(1000);
-         Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+         ClientStompFrame frame = conn.receiveFrame(1000);
+         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
       }
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithAutoAckAndSelector() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "selector: foo = 'zzz'\n" +
-         "ack:auto\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage("Ignored message", "foo", "1234");
+      sendJmsMessage("Real message", "foo", "zzz");
 
-      sendMessage("Ignored message", "foo", "1234");
-      sendMessage("Real message", "foo", "zzz");
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithAutoAckAndHyphenatedSelector() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "selector: hyphenated_props:foo-bar = 'zzz'\n" +
-         "ack:auto\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "hyphenated_props:foo-bar = 'zzz'");
 
       ServerLocator serverLocator = addServerLocator(ActiveMQClient.createServerLocator("vm://0"));
       ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
@@ -891,40 +718,26 @@ public class StompTest extends StompTestBase {
       producer.send(ignoredMessage);
       producer.send(realMessage);
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
+
+    conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithClientAck() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName());
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+      ack(conn, null, frame);
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
-
-      sendFrame(frame);
-
-      sendMessage(getName());
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
-      Matcher cl_matcher = cl.matcher(frame);
-      Assert.assertTrue(cl_matcher.find());
-      String messageID = cl_matcher.group(1);
-
-      frame = "ACK\n" + "message-id: " + messageID + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+    conn.disconnect();
 
       // message should not be received since message was acknowledged by the client
       MessageConsumer consumer = session.createConsumer(queue);
@@ -934,23 +747,14 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testRedeliveryWithClientAck() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName());
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
-
-      sendFrame(frame);
-
-      sendMessage(getName());
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
 
       // message should be received since message was not acknowledged
       MessageConsumer consumer = session.createConsumer(queue);
@@ -971,149 +775,131 @@ public class StompTest extends StompTestBase {
 
    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+//            String frame = Stomp.Commands.SUBSCRIBE + "\n" +
+//               Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+//               Stomp.Headers.Send.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n" +
+//               Stomp.Headers.Message.ACK + Stomp.Headers.SEPARATOR + "client\n\n" +
+//               Stomp.NULL;
+//
+//            sendFrame(frame);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
+      sendJmsMessage(getName());
 
-      sendFrame(frame);
-      sendMessage(getName());
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
+//            frame = receiveFrame(10000);
+//            Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
       log.info("Reconnecting!");
 
       if (sendDisconnect) {
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         waitForFrameToTakeEffect();
-         reconnect();
+       conn.disconnect();
+//         reconnect();
+         conn.destroy();
+         conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
       } else {
-         reconnect(100);
-         waitForFrameToTakeEffect();
+//         reconnect(100);
+//               waitForFrameToTakeEffect();
+         conn.destroy();
+         conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
       }
 
       // message should be received since message was not acknowledged
-      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
+//            frame = Stomp.Commands.SUBSCRIBE + "\n" +
+//               Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+//               Stomp.Headers.Subscribe.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n\n" +
+//               Stomp.NULL;
+//
+//            sendFrame(frame);
+      subscribe(conn, null);
 
-      sendFrame(frame);
+//            frame = receiveFrame(10000);
+//            log.info(frame);
+//            Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+      frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      conn.disconnect();
+      conn.destroy();
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForFrameToTakeEffect();
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
 
       // now let's make sure we don't see the message again
-      reconnect();
-
-      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
+//      reconnect();
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "receipt: 1234\n\n" +
-         Stomp.NULL;
+//            frame = Stomp.Commands.SUBSCRIBE + "\n" +
+//               Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+//               Stomp.Headers.Send.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n" +
+//               Stomp.Headers.RECEIPT_REQUESTED + Stomp.Headers.SEPARATOR + " 1234\n\n" +
+//               Stomp.NULL;
+//
+//            sendFrame(frame);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, true);
 
-      sendFrame(frame);
       // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      sendMessage("shouldBeNextMessage");
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      System.out.println(frame);
-      Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+//            frame = receiveFrame(10000);
+//            Assert.assertTrue(frame.startsWith(Stomp.Responses.RECEIPT));
+
+      sendJmsMessage("shouldBeNextMessage");
+
+//            frame = receiveFrame(10000);
+//            Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+//            log.info(frame);
+//            Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+      frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("shouldBeNextMessage", frame.getBody());
    }
 
+
    @Test
    public void testUnsubscribe() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       // send a message to our queue
-      sendMessage("first message");
+      sendJmsMessage("first message");
 
-      // receive message from socket
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      // receive message
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
       // remove suscription
-      frame = "UNSUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "receipt:567\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      waitForReceipt();
+      unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false);
 
       // send a message to our queue
-      sendMessage("second message");
+      sendJmsMessage("second message");
 
-      frame = receiveFrame(1000);
+      frame = conn.receiveFrame(1000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
    }
 
    @Test
    public void testUnsubscribeWithID() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "id: mysubid\n" +
-         "ack:auto\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
+      conn.connect(defUser, defPass);
+      subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       // send a message to our queue
-      sendMessage("first message");
+      sendJmsMessage("first message");
 
       // receive message from socket
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
       // remove suscription
-      frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt: 345\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForReceipt();
+      unsubscribe(conn, "mysubid", null, true, false);
 
       // send a message to our queue
-      sendMessage("second message");
+      sendJmsMessage("second message");
 
-      frame = receiveFrame(1000);
+      frame = conn.receiveFrame(1000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
@@ -1122,34 +908,15 @@ public class StompTest extends StompTestBase {
    @Test
    public void testTransactionCommit() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
+      conn.connect(defUser, defPass);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      String f = receiveFrame(1000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
-
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "transaction: tx1\n" +
-         "receipt: 123\n" +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-      sendFrame(frame);
-      waitForReceipt();
+      beginTransaction(conn, "tx1");
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
 
       // check the message is not committed
       assertNull(consumer.receive(100));
 
-      frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:456\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForReceipt();
+      commitTransaction(conn, "tx1", true);
 
       Message message = consumer.receive(1000);
       Assert.assertNotNull("Should have received a message", message);
@@ -1158,49 +925,20 @@ public class StompTest extends StompTestBase {
    @Test
    public void testSuccessiveTransactionsWithSameID() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      String f = receiveFrame(1000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
       // first tx
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "transaction: tx1\n" +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      beginTransaction(conn, "tx1");
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
+      commitTransaction(conn, "tx1");
 
       Message message = consumer.receive(1000);
       Assert.assertNotNull("Should have received a message", message);
 
       // 2nd tx with same tx ID
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "transaction: tx1\n" +
-         "\n\n" +
-         "Hello World" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      beginTransaction(conn, "tx1");
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
+      commitTransaction(conn, "tx1");
 
       message = consumer.receive(1000);
       Assert.assertNotNull("Should have received a message", message);
@@ -1208,67 +946,27 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testBeginSameTransactionTwice() throws Exception {
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      String f = receiveFrame(1000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
-
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      // begin the tx a 2nd time
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      f = receiveFrame(1000);
-      Assert.assertTrue(f.startsWith("ERROR"));
+      conn.connect(defUser, defPass);
+      beginTransaction(conn, "tx1");
+      beginTransaction(conn, "tx1");
 
+      ClientStompFrame frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.ERROR, frame.getCommand());
    }
 
    @Test
    public void testTransactionRollback() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
+      String txId = "tx1";
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      String f = receiveFrame(1000);
-      Assert.assertTrue(f.startsWith("CONNECTED"));
-
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "transaction: tx1\n" +
-         "\n" +
-         "first message" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      // rollback first message
-      frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "transaction: tx1\n" +
-         "\n" +
-         "second message" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:789\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForReceipt();
+      conn.connect(defUser, defPass);
+      beginTransaction(conn, txId);
+      send(conn, getQueuePrefix() + getQueueName(), null, "first message", true, null, txId);
+      abortTransaction(conn, txId);
+
+      beginTransaction(conn, txId);
+      send(conn, getQueuePrefix() + getQueueName(), null, "second message", true, null, txId);
+      commitTransaction(conn, txId);
 
       // only second msg should be received since first msg was rolled back
       TextMessage message = (TextMessage) consumer.receive(1000);
@@ -1280,91 +978,52 @@ public class StompTest extends StompTestBase {
    public void testSubscribeToTopic() throws Exception {
       final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      subscribeTopic(conn, null, null, null, true);
 
       assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() {
 
          @Override
          public boolean isSatisfied() throws Exception {
-            if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
+            int length = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
+            if (length - baselineQueueCount == 1) {
                return true;
             } else {
+               log.info("Queue count: " + (length - baselineQueueCount));
                return false;
             }
          }
-      }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
-
-      sendMessage(getName(), topic);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-
-      frame = "UNSUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 1234\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for UNSUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      sendMessage(getName(), topic);
-
-      frame = receiveFrame(1000);
+      }, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS.toMillis(100)));
+
+      sendJmsMessage(getName(), topic);
+
+      ClientStompFrame frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false);
+
+      sendJmsMessage(getName(), topic);
+
+      frame = conn.receiveFrame(1000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
       assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+    conn.disconnect();
    }
-
+      //
    @Test
    public void testSubscribeToQueue() throws Exception {
       final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "receipt: 12\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, null, null, true);
 
       assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
-
          @Override
          public boolean isSatisfied() throws Exception {
             if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
@@ -1375,65 +1034,39 @@ public class StompTest extends StompTestBase {
          }
       }, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
 
-      sendMessage(getName(), queue);
+      sendJmsMessage(getName(), queue);
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
+      ClientStompFrame frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
 
-      frame = "UNSUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "receipt: 1234\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for UNSUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false);
 
-      sendMessage(getName(), queue);
+      sendJmsMessage(getName(), queue);
 
-      frame = receiveFrame(1000);
+      frame = conn.receiveFrame(1000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
       assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeToNonExistentQueue() throws Exception {
       String nonExistentQueue = RandomUtil.randomString();
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, null, null, null, getQueuePrefix() + nonExistentQueue, true);
 
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         nonExistentQueue +
-         "\n" +
-         "receipt: 12\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
+      sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
 
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
+      ClientStompFrame frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + nonExistentQueue, frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
 
       assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
 
@@ -1449,322 +1082,160 @@ public class StompTest extends StompTestBase {
          }
       }, 1000, 50));
 
-      frame = "UNSUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         nonExistentQueue +
-         "\n" +
-         "receipt: 1234\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for UNSUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      unsubscribe(conn, null, getQueuePrefix() + nonExistentQueue, true, false);
 
       assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
 
-      sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
+      sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
 
-      frame = receiveFrame(1000);
+      frame = conn.receiveFrame(1000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopic(conn, null, null, getName());
+
+      conn.disconnect();
 
-      String connectFame = "CONNECT\n" + "login: brianm\n" +
-         "passcode: wombats\n" +
-         "client-id: myclientid\n\n" +
-         Stomp.NULL;
-      sendFrame(connectFame);
-
-      String frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "durable-subscriber-name: " +
-         getName() +
-         "\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(subscribeFrame);
-      waitForFrameToTakeEffect();
-
-      String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(disconnectFrame);
-      waitForFrameToTakeEffect();
+      Thread.sleep(500);
 
       // send the message when the durable subscriber is disconnected
-      sendMessage(getName(), topic);
-
-      reconnect(100);
-      sendFrame(connectFame);
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      sendFrame(subscribeFrame);
-
-      // we must have received the message
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-
-      String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 1234\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(unsubscribeFrame);
-      // wait for UNSUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      sendFrame(disconnectFrame);
+      sendJmsMessage(getName(), topic);
+
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass, "myclientid");
+
+      subscribeTopic(conn, null, null, getName());
+
+      ClientStompFrame frame = conn.receiveFrame(3000);
+      assertNotNull("Should have received a message from the durable subscription", frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, true);
+
+      conn.disconnect();
    }
 
    @Test
    public void testDurableSubscriber() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "durable-subscriber-name: " +
-         getName() +
-         "\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(subscribeFrame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopic(conn, null, null, getName(), true);
+      ClientStompFrame response = subscribeTopic(conn, null, null, getName(), true);
 
       // creating a subscriber with the same durable-subscriber-name must fail
-      sendFrame(subscribeFrame);
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("ERROR"));
+      Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand());
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testDurableUnSubscribe() throws Exception {
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(1000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "durable-subscriber-name: " +
-         getName() +
-         "\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(subscribeFrame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(1000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForFrameToTakeEffect();
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopic(conn, null, null, getName(), true);
+      conn.disconnect();
+      Thread.sleep(500);
 
       assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
 
-      reconnect(100);
-      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(1000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
 
-      String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "durable-subscriber-name: " +
-         getName() +
-         "\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(unsubscribeFrame);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForFrameToTakeEffect();
+      conn.connect(defUser, defPass, "myclientid");
+      unsubscribe(conn, getName(), getTopicPrefix() + getTopicName(), false, true);
+      conn.disconnect();
+      Thread.sleep(500);
 
       assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
    }
 
    @Test
    public void testSubscribeToTopicWithNoLocal() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribeTopic(conn, null, null, null, true, true);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "no-local: true\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      // send a message on the same connection => it should not be received
-      frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n\n" + "Hello World" + Stomp.NULL;
-      sendFrame(frame);
+      // send a message on the same connection => it should not be received is noLocal = true on subscribe
+      send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
 
-      frame = receiveFrame(2000);
+      ClientStompFrame frame = conn.receiveFrame(2000);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
       // send message on another JMS connection => it should be received
-      sendMessage(getName(), topic);
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName(), topic);
+      frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      conn.disconnect();
    }
 
    @Test
    public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribeTopic(conn, null, null, null, true);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      // disconnect, _without unsubscribing_
+      conn.disconnect();
 
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      Thread.sleep(500);
 
-      // disconnect, _without unsubscribing_
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      waitForFrameToTakeEffect();
+      conn.destroy();
 
       // connect again
-      reconnect();
-      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
 
       // send a receipted message to the topic
-      frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\nreceipt:42\n\n\n" + "Hello World" + Stomp.NULL;
-      sendFrame(frame);
-
-      // the topic should exist and receive the message, and we should get the requested receipt
-      frame = receiveFrame(2000);
-      log.info("Received frame: " + frame);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      ClientStompFrame response = send(conn, getTopicPrefix() + getTopicName(), null, "Hello World", true);
+      assertEquals(Stomp.Responses.RECEIPT, response.getCommand());
 
       // ...and nothing else
-      frame = receiveFrame(2000);
+      ClientStompFrame frame = conn.receiveFrame(2000);
       log.info("Received frame: " + frame);
       Assert.assertNull(frame);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    public void testClientAckNotPartOfTransaction() throws Exception {
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getQueuePrefix() +
-         getQueueName() +
-         "\n" +
-         "ack:client\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-
-      sendMessage(getName());
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("MESSAGE"));
-      Assert.assertTrue(frame.indexOf("destination:") > 0);
-      Assert.assertTrue(frame.indexOf(getName()) > 0);
-      Assert.assertTrue(frame.indexOf("message-id:") > 0);
-      Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
-      Matcher cl_matcher = cl.matcher(frame);
-      Assert.assertTrue(cl_matcher.find());
-      String messageID = cl_matcher.group(1);
-
-      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      sendJmsMessage(getName());
 
-      frame = "ACK\n" + "message-id:" + messageID + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL;
-      sendFrame(frame);
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      Assert.assertNotNull(messageID);
+      Assert.assertEquals(getName(), frame.getBody());
 
-      frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      beginTransaction(conn, "tx1");
+      ack(conn, null, messageID, "tx1");
+      abortTransaction(conn, "tx1");
 
-      frame = receiveFrame(1000);
+      frame = conn.receiveFrame(1000);
       Assert.assertNull("No message should have been received as the message was acked even though the transaction has been aborted", frame);
 
-      frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      unsubscribe(conn, null, getQueuePrefix() + getQueueName(), false, false);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    // HORNETQ-1007
    @Test
    public void testMultiProtocolConsumers() throws Exception {
-      final int TIME_OUT = 5000;
+      final int TIME_OUT = 2000;
 
       int count = 1000;
 
@@ -1773,21 +1244,8 @@ public class StompTest extends StompTestBase {
       MessageConsumer consumer2 = session.createConsumer(topic);
 
       // connect and subscribe STOMP consumer
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(TIME_OUT);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-      frame = "SUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 12\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for SUBSCRIBE's receipt
-      frame = receiveFrame(TIME_OUT);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
+      conn.connect(defUser, defPass);
+      subscribeTopic(conn, null, null, null, true);
 
       MessageProducer producer = session.createProducer(topic);
       TextMessage message = session.createTextMessage(getName());
@@ -1796,58 +1254,37 @@ public class StompTest extends StompTestBase {
          producer.send(message);
          Assert.assertNotNull(consumer1.receive(TIME_OUT));
          Assert.assertNotNull(consumer2.receive(TIME_OUT));
-         frame = receiveFrame(TIME_OUT);
-         Assert.assertTrue(frame.startsWith("MESSAGE"));
-         Assert.assertTrue(frame.indexOf("destination:") > 0);
-         Assert.assertTrue(frame.indexOf(getName()) > 0);
+         ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
+         Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+         Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+         Assert.assertEquals(getName(), frame.getBody());
       }
 
       consumer1.close();
       consumer2.close();
-      frame = "UNSUBSCRIBE\n" + "destination:" +
-         getTopicPrefix() +
-         getTopicName() +
-         "\n" +
-         "receipt: 1234\n" +
-         "\n\n" +
-         Stomp.NULL;
-      sendFrame(frame);
-      // wait for UNSUBSCRIBE's receipt
-      frame = receiveFrame(TIME_OUT);
-      Assert.assertTrue(frame.startsWith("RECEIPT"));
-
-      sendMessage(getName(), topic);
-
-      frame = receiveFrame(TIME_OUT);
+      unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false);
+
+      sendJmsMessage(getName(), topic);
+
+      ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed", frame);
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
 
    @Test
    //stomp should return an ERROR when acking a non-existent message
    public void testUnexpectedAck() throws Exception {
-
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(100000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
-
       String messageID = "888888";
-      frame = "ACK\n" + "message-id:" + messageID + "\n" + "\n" + Stomp.NULL;
-      sendFrame(frame);
 
-      frame = receiveFrame(100000);
-      assertNotNull(frame);
+      conn.connect(defUser, defPass);
+      ack(conn, null, messageID, null);
 
-      System.out.println("received frame: " + frame);
-      assertTrue(frame.startsWith("ERROR"));
+      ClientStompFrame frame = conn.receiveFrame(1000);
+      assertNotNull(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
 
-      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      conn.disconnect();
    }
-
 }


Mime
View raw message