activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [16/50] [abbrv] activemq-artemis git commit: ARTEMIS-788 Stomp refactor + track autocreation for addresses
Date Fri, 09 Dec 2016 19:49:00 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 26f2a2f..bcac436 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -26,34 +26,17 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import java.io.IOException;
-import java.net.Socket;
-import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
+import java.util.UUID;
+
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -63,6 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
@@ -73,13 +57,16 @@ import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
 import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.After;
 import org.junit.Before;
 
 public abstract class StompTestBase extends ActiveMQTestBase {
 
+   protected String hostname = "127.0.0.1";
+
    protected final int port = 61613;
 
    private ConnectionFactory connectionFactory;
@@ -98,98 +85,56 @@ public abstract class StompTestBase extends ActiveMQTestBase {
 
    protected String defPass = "wombats";
 
-   protected boolean autoCreateServer = true;
-
-   private List<Bootstrap> bootstraps = new ArrayList<>();
-
-   //   private Channel channel;
-
-   private List<BlockingQueue<String>> priorityQueues = new ArrayList<>();
-
-   private List<EventLoopGroup> groups = new ArrayList<>();
-
-   private List<Channel> channels = new ArrayList<>();
-
    // Implementation methods
    // -------------------------------------------------------------------------
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-      if (autoCreateServer) {
-         server = createServer();
-         addServer(server.getActiveMQServer());
-         server.start();
-         connectionFactory = createConnectionFactory();
-         createBootstrap();
-
-         if (isSecurityEnabled()) {
-            connection = connectionFactory.createConnection("brianm", "wombats");
-         } else {
-            connection = connectionFactory.createConnection();
-         }
-         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         queue = session.createQueue(getQueueName());
-         topic = session.createTopic(getTopicName());
-         connection.start();
-      }
+   public boolean isCompressLargeMessages() {
+      return false;
    }
 
-   private void createBootstrap() {
-      createBootstrap(0, port);
+   public boolean isSecurityEnabled() {
+      return false;
    }
 
-   protected void createBootstrap(int port) {
-      createBootstrap(0, port);
+   public boolean isPersistenceEnabled() {
+      return false;
    }
 
-   protected void createBootstrap(final int index, int port) {
-      priorityQueues.add(index, new ArrayBlockingQueue<String>(1000));
-      groups.add(index, new NioEventLoopGroup());
-      bootstraps.add(index, new Bootstrap());
-      bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
-         @Override
-         public void initChannel(SocketChannel ch) throws Exception {
-            addChannelHandlers(index, ch);
-         }
-      });
-
-      // Start the client.
-      try {
-         channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel());
-         handshake();
-      } catch (InterruptedException e) {
-         e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-      }
-
+   public boolean isEnableStompMessageId() {
+      return false;
    }
 
-   protected void handshake() throws InterruptedException {
+   public Integer getStompMinLargeMessageSize() {
+      return null;
    }
 
-   protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
-      ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler(index));
+   public List<String> getIncomingInterceptors() {
+      return null;
    }
 
-   protected void setUpAfterServer() throws Exception {
-      setUpAfterServer(false);
+   public List<String> getOutgoingInterceptors() {
+      return null;
    }
 
-   protected void setUpAfterServer(boolean jmsCompressLarge) throws Exception {
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = createServer();
+      server.start();
       connectionFactory = createConnectionFactory();
-      ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) connectionFactory;
 
-      activeMQConnectionFactory.setCompressLargeMessage(jmsCompressLarge);
-      createBootstrap();
+      ((ActiveMQConnectionFactory)connectionFactory).setCompressLargeMessage(isCompressLargeMessages());
 
-      connection = connectionFactory.createConnection();
-      connection.start();
+      if (isSecurityEnabled()) {
+         connection = connectionFactory.createConnection("brianm", "wombats");
+      } else {
+         connection = connectionFactory.createConnection();
+      }
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       queue = session.createQueue(getQueueName());
       topic = session.createTopic(getTopicName());
-
+      connection.start();
    }
 
    /**
@@ -198,14 +143,30 @@ public abstract class StompTestBase extends ActiveMQTestBase {
     */
    protected JMSServerManager createServer() throws Exception {
       Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + ","  + MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME);
       params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
       params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
+      if (isEnableStompMessageId()) {
+         params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, true);
+      }
+      if (getStompMinLargeMessageSize() != null) {
+         params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, 2048);
+      }
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-      TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
 
-      Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-      config.addAcceptorConfiguration(allTransport);
+      Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled())
+                                                .setPersistenceEnabled(isPersistenceEnabled())
+                                                .addAcceptorConfiguration(stompTransport)
+                                                .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()))
+                                                .setConnectionTtlCheckInterval(500);
+
+      if (getIncomingInterceptors() != null) {
+         config.setIncomingInterceptorClassNames(getIncomingInterceptors());
+      }
+
+      if (getOutgoingInterceptors() != null) {
+         config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
+      }
 
       ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
 
@@ -222,195 +183,350 @@ public abstract class StompTestBase extends ActiveMQTestBase {
       }
 
       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
+      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName()));
       jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
       server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
       server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
       return server;
    }
 
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      if (autoCreateServer) {
-         connection.close();
-
-         for (EventLoopGroup group : groups) {
-            if (group != null) {
-               for (Channel channel : channels) {
-                  channel.close();
-               }
-               group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
-            }
-         }
-      }
-      super.tearDown();
+   protected ConnectionFactory createConnectionFactory() {
+      return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
    }
 
-   protected void cleanUp() throws Exception {
-      connection.close();
-      if (groups.get(0) != null) {
-         groups.get(0).shutdown();
-      }
+   protected String getQueueName() {
+      return "testQueue";
    }
 
-   protected void reconnect() throws Exception {
-      reconnect(0);
+   protected String getQueuePrefix() {
+      return "";
    }
 
-   protected void reconnect(long sleep) throws Exception {
-      groups.get(0).shutdown();
+   protected String getTopicName() {
+      return "testtopic";
+   }
 
-      if (sleep > 0) {
-         Thread.sleep(sleep);
-      }
+   protected String getTopicPrefix() {
+      return "";
+   }
 
-      createBootstrap();
+   public void sendJmsMessage(String msg) throws Exception {
+      sendJmsMessage(msg, queue);
    }
 
-   protected ConnectionFactory createConnectionFactory() {
-      return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+   public void sendJmsMessage(String msg, Destination destination) throws Exception {
+      MessageProducer producer = session.createProducer(destination);
+      TextMessage message = session.createTextMessage(msg);
+      producer.send(message);
    }
 
-   protected Socket createSocket() throws IOException {
-      return new Socket("localhost", port);
+   public void sendJmsMessage(byte[] data, Destination destination) throws Exception {
+      sendJmsMessage(data, "foo", "xyz", destination);
    }
 
-   protected String getQueueName() {
-      return "test";
+   public void sendJmsMessage(String msg, String propertyName, String propertyValue) throws Exception {
+      sendJmsMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue);
    }
 
-   protected String getQueuePrefix() {
-      return "";
+   public void sendJmsMessage(byte[] data,
+                              String propertyName,
+                              String propertyValue,
+                              Destination destination) throws Exception {
+      MessageProducer producer = session.createProducer(destination);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty(propertyName, propertyValue);
+      message.writeBytes(data);
+      producer.send(message);
    }
 
-   protected String getTopicName() {
-      return "testtopic";
+   public void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+      ClientStompFrame abortFrame = conn.createFrame(Stomp.Commands.ABORT)
+                                        .addHeader(Stomp.Headers.TRANSACTION, txID);
+
+      conn.sendFrame(abortFrame);
    }
 
-   protected String getTopicPrefix() {
-      return "";
+   public void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+      ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.BEGIN)
+                                        .addHeader(Stomp.Headers.TRANSACTION, txID);
+
+      conn.sendFrame(beginFrame);
    }
 
-   protected void assertChannelClosed() throws InterruptedException {
-      assertChannelClosed(0);
+   public void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+      commitTransaction(conn, txID, false);
    }
 
-   protected void assertChannelClosed(int index) throws InterruptedException {
-      boolean closed = channels.get(index).closeFuture().await(5000);
-      assertTrue("channel not closed", closed);
+   public void commitTransaction(StompClientConnection conn,
+                                 String txID,
+                                 boolean receipt) throws IOException, InterruptedException {
+      ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.COMMIT)
+                                        .addHeader(Stomp.Headers.TRANSACTION, txID);
+      String uuid = UUID.randomUUID().toString();
+      if (receipt) {
+         beginFrame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      }
+      ClientStompFrame resp = conn.sendFrame(beginFrame);
+      if (receipt) {
+         assertEquals(uuid, resp.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+      }
    }
 
-   public void sendFrame(String data) throws Exception {
-      IntegrationTestLogger.LOGGER.info("Sending: " + data);
-      sendFrame(0, data);
+   public void ack(StompClientConnection conn,
+                   String subscriptionId,
+                   ClientStompFrame messageIdFrame) throws IOException, InterruptedException {
+      String messageID = messageIdFrame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK)
+                                      .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID);
+
+      if (subscriptionId != null) {
+         frame.addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId);
+      }
+
+      ClientStompFrame response = conn.sendFrame(frame);
+      if (response != null) {
+         throw new IOException("failed to ack " + response);
+      }
    }
 
-   public void sendFrame(int index, String data) throws Exception {
-      channels.get(index).writeAndFlush(data);
+   public void ack(StompClientConnection conn,
+                   String subscriptionId,
+                   String mid,
+                   String txID) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK)
+                                      .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
+                                      .addHeader(Stomp.Headers.Message.MESSAGE_ID, mid);
+      if (txID != null) {
+         frame.addHeader(Stomp.Headers.TRANSACTION, txID);
+      }
+
+      conn.sendFrame(frame);
    }
 
-   public void sendFrame(byte[] data) throws Exception {
-      sendFrame(0, data);
+   public void nack(StompClientConnection conn, String subscriptionId, String messageId) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.NACK)
+                                      .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
+                                      .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageId);
+
+      conn.sendFrame(frame);
    }
 
-   public void sendFrame(int index, byte[] data) throws Exception {
-      ByteBuf buffer = Unpooled.buffer(data.length);
-      buffer.writeBytes(data);
-      channels.get(index).writeAndFlush(buffer);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null);
    }
 
-   public String receiveFrame(long timeOut) throws Exception {
-      return receiveFrame(0, timeOut);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, null, null);
    }
 
-   public String receiveFrame(int index, long timeOut) throws Exception {
-      String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS);
-      return msg;
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack,
+                                     String durableId) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, null);
    }
 
-   public void sendMessage(String msg) throws Exception {
-      sendMessage(msg, queue);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack,
+                                     String durableId,
+                                     boolean receipt) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, null, receipt);
    }
 
-   public void sendMessage(String msg, Destination destination) throws Exception {
-      MessageProducer producer = session.createProducer(destination);
-      TextMessage message = session.createTextMessage(msg);
-      producer.send(message);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack,
+                                     String durableId,
+                                     String selector) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, selector, false);
    }
 
-   public void sendMessage(byte[] data, Destination destination) throws Exception {
-      sendMessage(data, "foo", "xyz", destination);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack,
+                                     String durableId,
+                                     String selector,
+                                     boolean receipt) throws IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
    }
 
-   public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
-      sendMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue);
+   public ClientStompFrame subscribe(StompClientConnection conn,
+                                     String subscriptionId,
+                                     String ack,
+                                     String durableId,
+                                     String selector,
+                                     String destination,
+                                     boolean receipt) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, AddressInfo.RoutingType.ANYCAST.toString())
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination);
+      if (subscriptionId != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId);
+      }
+      if (ack != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
+      }
+      if (durableId != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId);
+      }
+      if (selector != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
+      }
+      String uuid = UUID.randomUUID().toString();
+      if (receipt) {
+         frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      }
+
+      frame = conn.sendFrame(frame);
+
+      if (receipt) {
+         assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+      }
+
+      return frame;
    }
 
-   public void sendMessage(byte[] data,
-                           String propertyName,
-                           String propertyValue,
-                           Destination destination) throws Exception {
-      MessageProducer producer = session.createProducer(destination);
-      BytesMessage message = session.createBytesMessage();
-      message.setStringProperty(propertyName, propertyValue);
-      message.writeBytes(data);
-      producer.send(message);
+   public ClientStompFrame subscribeTopic(StompClientConnection conn,
+                                          String subscriptionId,
+                                          String ack,
+                                          String durableId) throws IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, false);
    }
 
-   protected void waitForReceipt() throws Exception {
-      String frame = receiveFrame(50000);
-      assertNotNull(frame);
-      assertTrue(frame.indexOf("RECEIPT") > -1);
+   public ClientStompFrame subscribeTopic(StompClientConnection conn,
+                                          String subscriptionId,
+                                          String ack,
+                                          String durableId,
+                                          boolean receipt) throws IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false);
    }
 
-   protected void waitForFrameToTakeEffect() throws InterruptedException {
-      // bit of a dirty hack :)
-      // another option would be to force some kind of receipt to be returned
-      // from the frame
-      Thread.sleep(500);
+   public ClientStompFrame subscribeTopic(StompClientConnection conn,
+                                          String subscriptionId,
+                                          String ack,
+                                          String durableId,
+                                          boolean receipt,
+                                          boolean noLocal) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, AddressInfo.RoutingType.MULTICAST.toString())
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + getTopicName());
+      if (subscriptionId != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId);
+      }
+      if (ack != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
+      }
+      if (durableId != null) {
+         frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId);
+      }
+      String uuid = UUID.randomUUID().toString();
+      if (receipt) {
+         frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      }
+      if (noLocal) {
+         frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true");
+      }
+
+      frame = conn.sendFrame(frame);
+
+      if (receipt) {
+         assertNotNull("Requested receipt, but response is null", frame);
+         assertTrue(frame.getHeader(Stomp.Headers.Response.RECEIPT_ID).equals(uuid));
+      }
+
+      return frame;
    }
 
-   public boolean isSecurityEnabled() {
-      return false;
+   public ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException {
+      return unsubscribe(conn, subscriptionId, null, false, false);
    }
 
-   class StompClientHandler extends SimpleChannelInboundHandler<String> {
+   public ClientStompFrame unsubscribe(StompClientConnection conn,
+                                       String subscriptionId,
+                                       boolean receipt) throws IOException, InterruptedException {
+      return unsubscribe(conn, subscriptionId, null, receipt, false);
+   }
 
-      int index = 0;
+   public ClientStompFrame unsubscribe(StompClientConnection conn,
+                                       String subscriptionId,
+                                       String destination,
+                                       boolean receipt,
+                                       boolean durable) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
+      if (durable && subscriptionId != null) {
+         frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subscriptionId);
+      } else if (!durable && subscriptionId != null) {
+         frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId);
+      }
 
-      StompClientHandler(int index) {
-         this.index = index;
+      if (destination != null) {
+         frame.addHeader(Stomp.Headers.Unsubscribe.DESTINATION, destination);
       }
 
-      StringBuffer currentMessage = new StringBuffer("");
+      String uuid = UUID.randomUUID().toString();
 
-      @Override
-      protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
-         currentMessage.append(msg);
-         String fullMessage = currentMessage.toString();
-         if (fullMessage.contains("\0\n")) {
-            int messageEnd = fullMessage.indexOf("\0\n");
-            String actualMessage = fullMessage.substring(0, messageEnd);
-            fullMessage = fullMessage.substring(messageEnd + 2);
-            currentMessage = new StringBuffer("");
-            BlockingQueue queue = priorityQueues.get(index);
-            if (queue == null) {
-               queue = new ArrayBlockingQueue(1000);
-               priorityQueues.add(index, queue);
-            }
-            queue.add(actualMessage);
-            if (fullMessage.length() > 0) {
-               channelRead(ctx, fullMessage);
-            }
-         }
+      if (receipt) {
+         frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
       }
 
-      @Override
-      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-         cause.printStackTrace();
-         ctx.close();
+      frame = conn.sendFrame(frame);
+
+      if (receipt) {
+         assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+         assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
       }
+
+      return frame;
+   }
+
+   public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body) throws IOException, InterruptedException {
+      return send(conn, destination, contentType, body, false);
+   }
+
+   public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt) throws IOException, InterruptedException {
+      return send(conn, destination, contentType, body, receipt, null);
+   }
+
+   public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType) throws IOException, InterruptedException {
+      return send(conn, destination, contentType, body, receipt, destinationType, null);
    }
 
+   public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType, String txId) throws IOException, InterruptedException {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, destination)
+                                   .setBody(body);
+
+      if (contentType != null) {
+         frame.addHeader(Stomp.Headers.CONTENT_TYPE, contentType);
+      }
+
+      if (destinationType != null) {
+         frame.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, destinationType.toString());
+      }
+
+      if (txId != null) {
+         frame.addHeader(Stomp.Headers.TRANSACTION, txId);
+      }
+
+      String uuid = UUID.randomUUID().toString();
+
+      if (receipt) {
+         frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      }
+      frame = conn.sendFrame(frame);
+
+      if (receipt) {
+         assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+         assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+      }
+
+      IntegrationTestLogger.LOGGER.info("Received: " + frame);
+
+      return frame;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
new file mode 100644
index 0000000..9bb9bf2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompTestWithInterceptors extends StompTestBase {
+
+   @Override
+   public List<String> getIncomingInterceptors() {
+      List<String> stompIncomingInterceptor = new ArrayList<>();
+      stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyIncomingStompFrameInterceptor");
+      stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyCoreInterceptor");
+
+      return stompIncomingInterceptor;
+   }
+
+   @Override
+   public List<String> getOutgoingInterceptors() {
+      List<String> stompOutgoingInterceptor = new ArrayList<>();
+      stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyOutgoingStompFrameInterceptor");
+
+      return stompOutgoingInterceptor;
+   }
+
+   @Test
+   public void stompFrameInterceptor() throws Exception {
+      MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
+      MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
+
+      Thread.sleep(200);
+
+      // So we clear them here
+      MyCoreInterceptor.incomingInterceptedFrames.clear();
+
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      conn.sendFrame(subFrame);
+
+      assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
+      sendJmsMessage(getName());
+
+      // Something was supposed to be called on sendMessages
+      assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
+
+      conn.receiveFrame(10000);
+
+      ClientStompFrame frame = conn.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      conn.sendFrame(frame);
+
+      conn.disconnect();
+
+      List<String> incomingCommands = new ArrayList<>(4);
+      incomingCommands.add("CONNECT");
+      incomingCommands.add("SUBSCRIBE");
+      incomingCommands.add("SEND");
+      incomingCommands.add("DISCONNECT");
+
+      List<String> outgoingCommands = new ArrayList<>(3);
+      outgoingCommands.add("CONNECTED");
+      outgoingCommands.add("MESSAGE");
+      outgoingCommands.add("MESSAGE");
+
+      long timeout = System.currentTimeMillis() + 1000;
+
+      // Things are async, giving some time to things arrive before we actually assert
+      while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
+         MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
+         timeout > System.currentTimeMillis()) {
+         Thread.sleep(10);
+      }
+
+      Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
+      Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
+
+      for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) {
+         Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
+         Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
+      }
+
+      for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) {
+         Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
+      }
+
+      Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
+      Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
+   }
+
+   public static class MyCoreInterceptor implements Interceptor {
+
+      static List<Packet> incomingInterceptedFrames = new ArrayList<>();
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) {
+         IntegrationTestLogger.LOGGER.info("Core intercepted: " + packet);
+         incomingInterceptedFrames.add(packet);
+         return true;
+      }
+   }
+
+   public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor {
+
+      static List<StompFrame> incomingInterceptedFrames = new ArrayList<>();
+
+      @Override
+      public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+         incomingInterceptedFrames.add(stompFrame);
+         stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
+         return true;
+      }
+   }
+
+   public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor {
+
+      static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>();
+
+      @Override
+      public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+         outgoingInterceptedFrames.add(stompFrame);
+         stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
+         return true;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
new file mode 100644
index 0000000..18410be
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StompTestWithLargeMessages extends StompTestBase {
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   public boolean isCompressLargeMessages() {
+      return true;
+   }
+
+   @Override
+   public boolean isPersistenceEnabled() {
+      return true;
+   }
+
+   @Override
+   public Integer getStompMinLargeMessageSize() {
+      return 2048;
+   }
+
+   //stomp sender -> large -> stomp receiver
+   @Test
+   public void testSendReceiveLargePersistentMessages() throws Exception {
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      int count = 10;
+      int msgSize = 1024 * 1024;
+      char[] contents = new char[msgSize];
+      for (int i = 0; i < msgSize; i++) {
+         contents[i] = 'A';
+      }
+      String body = new String(contents);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame frame = conn.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("persistent", "true");
+         frame.setBody(body);
+         conn.sendFrame(frame);
+      }
+
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      conn.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame frame = conn.receiveFrame(60000);
+         Assert.assertNotNull(frame);
+         System.out.println("part of frame: " + frame.getBody().substring(0, 200));
+         Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+         Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+         int index = frame.getBody().indexOf("AAAA");
+         assertEquals(msgSize, (frame.getBody().length() - index));
+      }
+
+      ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      unsubFrame.addHeader("receipt", "567");
+      ClientStompFrame response = conn.sendFrame(unsubFrame);
+      assertNotNull(response);
+      assertNotNull(response.getCommand().equals("RECEIPT"));
+
+      conn.disconnect();
+   }
+
+   //core sender -> large -> stomp receiver
+   @Test
+   public void testReceiveLargePersistentMessagesFromCore() throws Exception {
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+      char[] contents = new char[msgSize];
+      for (int i = 0; i < msgSize; i++) {
+         contents[i] = 'B';
+      }
+      String msg = new String(contents);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      conn.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame frame = conn.receiveFrame(60000);
+         Assert.assertNotNull(frame);
+         System.out.println("part of frame: " + frame.getBody().substring(0, 200));
+         Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+         Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+         int index = frame.getBody().indexOf("BBB");
+         assertEquals(msgSize, (frame.getBody().length() - index));
+      }
+
+      ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      unsubFrame.addHeader("receipt", "567");
+      ClientStompFrame response = conn.sendFrame(unsubFrame);
+      assertNotNull(response);
+      assertNotNull(response.getCommand().equals("RECEIPT"));
+
+      conn.disconnect();
+   }
+
+   //stomp v12 sender -> large -> stomp v12 receiver
+   @Test
+   public void testSendReceiveLargePersistentMessagesV12() throws Exception {
+      StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+      connV12.connect(defUser, defPass);
+
+      int count = 10;
+      int szBody = 1024 * 1024;
+      char[] contents = new char[szBody];
+      for (int i = 0; i < szBody; i++) {
+         contents[i] = 'A';
+      }
+      String body = new String(contents);
+
+      ClientStompFrame frame = connV12.createFrame("SEND");
+      frame.addHeader("destination-type", "ANYCAST");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("persistent", "true");
+      frame.setBody(body);
+
+      for (int i = 0; i < count; i++) {
+         connV12.sendFrame(frame);
+      }
+
+      ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+
+      connV12.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+         Assert.assertNotNull(receiveFrame);
+         System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+         Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+         Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+         assertEquals(szBody, receiveFrame.getBody().length());
+      }
+
+      // remove susbcription
+      ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV12.sendFrame(unsubFrame);
+
+      connV12.disconnect();
+   }
+
+   //core sender -> large -> stomp v12 receiver
+   @Test
+   public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception {
+      int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+      char[] contents = new char[msgSize];
+      for (int i = 0; i < msgSize; i++) {
+         contents[i] = 'B';
+      }
+      String msg = new String(contents);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+      connV12.connect(defUser, defPass);
+
+      ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      connV12.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+         Assert.assertNotNull(receiveFrame);
+         System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+         Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+         Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+         assertEquals(msgSize, receiveFrame.getBody().length());
+      }
+
+      // remove susbcription
+      ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV12.sendFrame(unsubFrame);
+
+      connV12.disconnect();
+   }
+
+   //core sender -> large (compressed regular) -> stomp v10 receiver
+   @Test
+   public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+      LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      char[] contents = input.toArray();
+      String msg = new String(contents);
+
+      String leadingPart = msg.substring(0, 100);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      conn.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame receiveFrame = conn.receiveFrame(30000);
+         Assert.assertNotNull(receiveFrame);
+         System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 250));
+         Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+         Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+         int index = receiveFrame.getBody().indexOf(leadingPart);
+         assertEquals(msg.length(), (receiveFrame.getBody().length() - index));
+      }
+
+      // remove suscription
+      ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      unsubFrame.addHeader("receipt", "567");
+      ClientStompFrame response = conn.sendFrame(unsubFrame);
+      assertNotNull(response);
+      assertNotNull(response.getCommand().equals("RECEIPT"));
+
+      conn.disconnect();
+   }
+
+   //core sender -> large (compressed regular) -> stomp v12 receiver
+   @Test
+   public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception {
+      LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+      LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      char[] contents = input.toArray();
+      String msg = new String(contents);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+      connV12.connect(defUser, defPass);
+
+      ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+
+      connV12.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+         Assert.assertNotNull(receiveFrame);
+         System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+         Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+         Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+         assertEquals(contents.length, receiveFrame.getBody().length());
+      }
+
+      // remove susbcription
+      ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV12.sendFrame(unsubFrame);
+
+      connV12.disconnect();
+   }
+
+   //core sender -> large (compressed large) -> stomp v12 receiver
+   @Test
+   public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception {
+      LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+      input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+      LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      char[] contents = input.toArray();
+      String msg = new String(contents);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      IntegrationTestLogger.LOGGER.info("Message count for " + getQueueName() + ": " + server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(getQueueName())).getMessageCount());
+
+      StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+      connV12.connect(defUser, defPass);
+
+      ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+
+      connV12.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+         Assert.assertNotNull(receiveFrame);
+         System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+         Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+         Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+         assertEquals(contents.length, receiveFrame.getBody().length());
+      }
+
+      // remove susbcription
+      ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV12.sendFrame(unsubFrame);
+
+      connV12.disconnect();
+   }
+
+   //core sender -> large (compressed large) -> stomp v10 receiver
+   @Test
+   public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
+      LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+      input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+      LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      char[] contents = input.toArray();
+      String msg = new String(contents);
+
+      String leadingPart = msg.substring(0, 100);
+
+      int count = 10;
+      for (int i = 0; i < count; i++) {
+         this.sendJmsMessage(msg);
+      }
+
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("subscription-type", "ANYCAST");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      conn.sendFrame(subFrame);
+
+      for (int i = 0; i < count; i++) {
+         ClientStompFrame frame = conn.receiveFrame(60000);
+         Assert.assertNotNull(frame);
+         System.out.println("part of frame: " + frame.getBody().substring(0, 250));
+         Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+         Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+         int index = frame.getBody().toString().indexOf(leadingPart);
+         assertEquals(msg.length(), (frame.getBody().toString().length() - index));
+      }
+
+      ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      unsubFrame.addHeader("receipt", "567");
+      conn.sendFrame(unsubFrame);
+
+      conn.disconnect();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
new file mode 100644
index 0000000..69c214b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+import java.util.Enumeration;
+
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompTestWithMessageID extends StompTestBase {
+
+   @Override
+   public boolean isEnableStompMessageId() {
+      return true;
+   }
+
+   @Test
+   public void testEnableMessageID() throws Exception {
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World 1");
+      conn.sendFrame(frame);
+
+      frame = conn.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World 2");
+      conn.sendFrame(frame);
+
+      QueueBrowser browser = session.createBrowser(queue);
+
+      Enumeration enu = browser.getEnumeration();
+
+      while (enu.hasMoreElements()) {
+         Message msg = (Message) enu.nextElement();
+         String msgId = msg.getStringProperty("amqMessageId");
+         assertNotNull(msgId);
+         assertTrue(msgId.indexOf("STOMP") == 0);
+      }
+
+      browser.close();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      TextMessage message = (TextMessage) consumer.receive(1000);
+      Assert.assertNotNull(message);
+
+      message = (TextMessage) consumer.receive(1000);
+      Assert.assertNotNull(message);
+
+      message = (TextMessage) consumer.receive(2000);
+      Assert.assertNull(message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
index e9d5550..a6ce6c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
@@ -19,27 +19,34 @@ package org.apache.activemq.artemis.tests.integration.stomp;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class StompTestWithSecurity extends StompTestBase {
 
+   @Override
+   public boolean isSecurityEnabled() {
+      return true;
+   }
+
    @Test
    public void testJMSXUserID() throws Exception {
       server.getActiveMQServer().getConfiguration().setPopulateValidatedUser(true);
 
       MessageConsumer consumer = session.createConsumer(queue);
 
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      conn.connect(defUser, defPass);
 
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      ClientStompFrame frame = conn.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      conn.sendFrame(frame);
 
-      sendFrame(frame);
+      conn.disconnect();
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -54,9 +61,4 @@ public class StompTestWithSecurity extends StompTestBase {
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
-
-   @Override
-   public boolean isSecurityEnabled() {
-      return true;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
index 06771bb..c48fd8d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-public abstract class AbstractClientStompFrame implements ClientStompFrame {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 
-   protected static final String HEADER_RECEIPT = "receipt";
+public abstract class AbstractClientStompFrame implements ClientStompFrame {
 
    protected static final Set<String> validCommands = new HashSet<>();
    protected String command;
@@ -36,19 +36,19 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
    protected String EOL = "\n";
 
    static {
-      validCommands.add("CONNECT");
-      validCommands.add("CONNECTED");
-      validCommands.add("SEND");
-      validCommands.add("RECEIPT");
-      validCommands.add("SUBSCRIBE");
-      validCommands.add("UNSUBSCRIBE");
-      validCommands.add("MESSAGE");
-      validCommands.add("BEGIN");
-      validCommands.add("COMMIT");
-      validCommands.add("ABORT");
-      validCommands.add("ACK");
-      validCommands.add("DISCONNECT");
-      validCommands.add("ERROR");
+      validCommands.add(Stomp.Commands.CONNECT);
+      validCommands.add(Stomp.Responses.CONNECTED);
+      validCommands.add(Stomp.Commands.SEND);
+      validCommands.add(Stomp.Responses.RECEIPT);
+      validCommands.add(Stomp.Commands.SUBSCRIBE);
+      validCommands.add(Stomp.Commands.UNSUBSCRIBE);
+      validCommands.add(Stomp.Responses.MESSAGE);
+      validCommands.add(Stomp.Commands.BEGIN);
+      validCommands.add(Stomp.Commands.COMMIT);
+      validCommands.add(Stomp.Commands.ABORT);
+      validCommands.add(Stomp.Commands.ACK);
+      validCommands.add(Stomp.Commands.DISCONNECT);
+      validCommands.add(Stomp.Responses.ERROR);
    }
 
    public AbstractClientStompFrame(String command) {
@@ -80,37 +80,15 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
 
    @Override
    public ByteBuffer toByteBuffer() {
-      if (isPing()) {
-         ByteBuffer buffer = ByteBuffer.allocateDirect(1);
-         buffer.put((byte) 0x0A);
-         buffer.rewind();
-         return buffer;
-      }
-      StringBuffer sb = new StringBuffer();
-      sb.append(command + EOL);
-      int n = headers.size();
-      for (int i = 0; i < n; i++) {
-         sb.append(headers.get(i).key + ":" + headers.get(i).val + EOL);
-      }
-      sb.append(EOL);
-      if (body != null) {
-         sb.append(body);
-      }
-      sb.append((char) 0);
-
-      String data = sb.toString();
-
-      byte[] byteValue = data.getBytes(StandardCharsets.UTF_8);
-
-      ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
-      buffer.put(byteValue);
-
-      buffer.rewind();
-      return buffer;
+      return toByteBufferInternal(null);
    }
 
    @Override
    public ByteBuffer toByteBufferWithExtra(String str) {
+      return toByteBufferInternal(str);
+   }
+
+   public ByteBuffer toByteBufferInternal(String str) {
       StringBuffer sb = new StringBuffer();
       sb.append(command + EOL);
       int n = headers.size();
@@ -122,7 +100,9 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
          sb.append(body);
       }
       sb.append((char) 0);
-      sb.append(str);
+      if (str != null) {
+         sb.append(str);
+      }
 
       String data = sb.toString();
 
@@ -137,26 +117,29 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
 
    @Override
    public boolean needsReply() {
-      if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
+      if (Stomp.Commands.CONNECT.equals(command) || headerKeys.contains(Stomp.Headers.RECEIPT_REQUESTED)) {
          return true;
       }
       return false;
    }
 
    @Override
-   public void setCommand(String command) {
+   public ClientStompFrame setCommand(String command) {
       this.command = command;
+      return this;
    }
 
    @Override
-   public void addHeader(String key, String val) {
+   public ClientStompFrame addHeader(String key, String val) {
       headers.add(new Header(key, val));
       headerKeys.add(key);
+      return this;
    }
 
    @Override
-   public void setBody(String body) {
+   public ClientStompFrame setBody(String body) {
       this.body = body;
+      return this;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index ce94ec3..d8a487e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -27,29 +27,12 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 
 public abstract class AbstractStompClientConnection implements StompClientConnection {
 
-   public static final String STOMP_COMMAND = "STOMP";
-
-   public static final String ACCEPT_HEADER = "accept-version";
-   public static final String HOST_HEADER = "host";
-   public static final String VERSION_HEADER = "version";
-   public static final String RECEIPT_HEADER = "receipt";
-
-   protected static final String CONNECT_COMMAND = "CONNECT";
-   protected static final String CONNECTED_COMMAND = "CONNECTED";
-   protected static final String DISCONNECT_COMMAND = "DISCONNECT";
-
-   protected static final String LOGIN_HEADER = "login";
-   protected static final String PASSCODE_HEADER = "passcode";
-
-   //ext
-   protected static final String CLIENT_ID_HEADER = "client-id";
-
    protected Pinger pinger;
-
    protected String version;
    protected String host;
    protected int port;
@@ -58,13 +41,10 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
    protected StompFrameFactory factory;
    protected final SocketChannel socketChannel;
    protected ByteBuffer readBuffer;
-
    protected List<Byte> receiveList;
-
    protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
-
    protected boolean connected = false;
-   private int serverPingCounter;
+   protected int serverPingCounter;
 
    public AbstractStompClientConnection(String version, String host, int port) throws IOException {
       this.version = version;
@@ -90,11 +70,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       new ReaderThread().start();
    }
 
-   @Override
-   public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+   private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
       ClientStompFrame response = null;
-      IntegrationTestLogger.LOGGER.info("Sending frame:\n" + frame);
-      ByteBuffer buffer = frame.toByteBuffer();
+      IntegrationTestLogger.LOGGER.info("Sending " + (wicked ? "*wicked* " : "") + "frame:\n" + frame);
+      ByteBuffer buffer;
+      if (wicked) {
+         buffer = frame.toByteBufferWithExtra("\n");
+      } else {
+         buffer = frame.toByteBuffer();
+      }
       while (buffer.remaining() > 0) {
          socketChannel.write(buffer);
       }
@@ -105,7 +89,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
 
          //filter out server ping
          while (response != null) {
-            if (response.getCommand().equals("STOMP")) {
+            if (response.getCommand().equals(Stomp.Commands.STOMP)) {
                response = receiveFrame();
             } else {
                break;
@@ -113,32 +97,19 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
          }
       }
 
+      IntegrationTestLogger.LOGGER.info("Received:\n" + response);
+
       return response;
    }
 
    @Override
-   public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException {
-      ClientStompFrame response = null;
-      ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
-
-      while (buffer.remaining() > 0) {
-         socketChannel.write(buffer);
-      }
-
-      //now response
-      if (frame.needsReply()) {
-         response = receiveFrame();
+   public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+      return sendFrameInternal(frame, false);
+   }
 
-         //filter out server ping
-         while (response != null) {
-            if (response.getCommand().equals("STOMP")) {
-               response = receiveFrame();
-            } else {
-               break;
-            }
-         }
-      }
-      return response;
+   @Override
+   public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+      return sendFrameInternal(frame, true);
    }
 
    @Override
@@ -186,17 +157,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       readBuffer.rewind();
    }
 
-   @Override
-   public int getServerPingNumber() {
-      return serverPingCounter;
-   }
-
    protected void incrementServerPing() {
       serverPingCounter++;
    }
 
    private boolean validateFrame(ClientStompFrame f) {
-      String h = f.getHeader("content-length");
+      String h = f.getHeader(Stomp.Headers.CONTENT_LENGTH);
       if (h != null) {
          int len = Integer.valueOf(h);
          if (f.getBody().getBytes(StandardCharsets.UTF_8).length < len) {
@@ -271,34 +237,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       return this.frameQueue.size();
    }
 
-   @Override
-   public void startPinger(long interval) {
-      pinger = new Pinger(interval);
-      pinger.startPing();
-   }
-
-   @Override
-   public void stopPinger() {
-      if (pinger != null) {
-         pinger.stopPing();
-         try {
-            pinger.join();
-         } catch (InterruptedException e) {
-            e.printStackTrace();
-         }
-         pinger = null;
-      }
-   }
-
-   private class Pinger extends Thread {
+   protected class Pinger extends Thread {
 
       long pingInterval;
       ClientStompFrame pingFrame;
       volatile boolean stop = false;
 
-      private Pinger(long interval) {
+      Pinger(long interval) {
          this.pingInterval = interval;
-         pingFrame = createFrame("STOMP");
+         pingFrame = createFrame(Stomp.Commands.STOMP);
          pingFrame.setBody("\n");
          pingFrame.setForceOneway();
          pingFrame.setPing(true);
@@ -329,5 +276,4 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
          }
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
index 53bced4..93801f9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
@@ -27,11 +27,11 @@ public interface ClientStompFrame {
 
    boolean needsReply();
 
-   void setCommand(String command);
+   ClientStompFrame setCommand(String command);
 
-   void addHeader(String string, String string2);
+   ClientStompFrame addHeader(String string, String string2);
 
-   void setBody(String string);
+   ClientStompFrame setBody(String string);
 
    String getCommand();
 
@@ -43,8 +43,8 @@ public interface ClientStompFrame {
 
    boolean isPing();
 
-   void setForceOneway();
+   ClientStompFrame setForceOneway();
 
-   void setPing(boolean b);
+   ClientStompFrame setPing(boolean b);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
index 5273236..92629ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
@@ -30,18 +30,18 @@ public class ClientStompFrameV10 extends AbstractClientStompFrame {
    }
 
    @Override
-   public boolean isPing() {
-      return false;
+   public ClientStompFrame setForceOneway() {
+      throw new IllegalStateException("Doesn't apply with V1.0!");
    }
 
    @Override
-   public void setForceOneway() {
+   public ClientStompFrame setPing(boolean b) {
       throw new IllegalStateException("Doesn't apply with V1.0!");
    }
 
    @Override
-   public void setPing(boolean b) {
-      throw new IllegalStateException("Doesn't apply with V1.0!");
+   public boolean isPing() {
+      return false;
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
index 22d7146..4d8d1e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
@@ -16,14 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
-/**
- * pls use factory to create frames.
- */
-public class ClientStompFrameV11 extends AbstractClientStompFrame {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+
+public class ClientStompFrameV11 extends ClientStompFrameV10 {
 
    static {
-      validCommands.add("NACK");
-      validCommands.add("STOMP");
+      validCommands.add(Stomp.Commands.NACK);
+      validCommands.add(Stomp.Commands.STOMP);
    }
 
    boolean forceOneway = false;
@@ -38,8 +37,9 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame {
    }
 
    @Override
-   public void setForceOneway() {
+   public ClientStompFrame setForceOneway() {
       forceOneway = true;
+      return this;
    }
 
    @Override
@@ -47,15 +47,17 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame {
       if (forceOneway)
          return false;
 
-      if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
+      if (Stomp.Commands.STOMP.equals(command)) {
          return true;
       }
-      return false;
+
+      return super.needsReply();
    }
 
    @Override
-   public void setPing(boolean b) {
+   public ClientStompFrame setPing(boolean b) {
       isPing = b;
+      return this;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
index 5ca530e..eaffd1c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
@@ -16,17 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
-/**
- */
-public class ClientStompFrameV12 extends AbstractClientStompFrame {
-
-   static {
-      validCommands.add("NACK");
-      validCommands.add("STOMP");
-   }
-
-   boolean forceOneway = false;
-   boolean isPing = false;
+public class ClientStompFrameV12 extends ClientStompFrameV11 {
 
    public ClientStompFrameV12(String command) {
       this(command, true, true);
@@ -45,32 +35,6 @@ public class ClientStompFrameV12 extends AbstractClientStompFrame {
    }
 
    @Override
-   public void setForceOneway() {
-      forceOneway = true;
-   }
-
-   @Override
-   public boolean needsReply() {
-      if (forceOneway)
-         return false;
-
-      if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
-         return true;
-      }
-      return false;
-   }
-
-   @Override
-   public void setPing(boolean b) {
-      isPing = b;
-   }
-
-   @Override
-   public boolean isPing() {
-      return isPing;
-   }
-
-   @Override
    public String toString() {
       return "[1.2]" + super.toString();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
index 12f52d0..7be09a5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
@@ -18,9 +18,6 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
 
-/**
- * pls use factory to create frames.
- */
 public interface StompClientConnection {
 
    ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
@@ -35,7 +32,7 @@ public interface StompClientConnection {
 
    ClientStompFrame connect(String defUser, String defPass) throws Exception;
 
-   void connect(String defUser, String defPass, String clientId) throws Exception;
+   ClientStompFrame connect(String defUser, String defPass, String clientId) throws Exception;
 
    boolean isConnected();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
index 7a1a529..d32823b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
@@ -18,52 +18,47 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
 
-/**
- * pls use factory to create frames.
- */
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+
 public class StompClientConnectionV10 extends AbstractStompClientConnection {
 
    public StompClientConnectionV10(String host, int port) throws IOException {
       super("1.0", host, port);
    }
 
+   public StompClientConnectionV10(String version, String host, int port) throws IOException {
+      super(version, host, port);
+   }
+
    @Override
    public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(LOGIN_HEADER, username);
-      frame.addHeader(PASSCODE_HEADER, passcode);
-
-      ClientStompFrame response = this.sendFrame(frame);
-
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         connected = true;
-      } else {
-         System.out.println("Connection failed with: " + response);
-         connected = false;
-      }
-      return response;
+      return connect(username, passcode, null);
    }
 
    @Override
-   public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(LOGIN_HEADER, username);
-      frame.addHeader(PASSCODE_HEADER, passcode);
-      frame.addHeader(CLIENT_ID_HEADER, clientID);
+   public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
+      frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+      frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
+      if (clientID != null) {
+         frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID);
+      }
 
       ClientStompFrame response = this.sendFrame(frame);
 
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
+      if (response.getCommand().equals(Stomp.Responses.CONNECTED)) {
          connected = true;
       } else {
-         System.out.println("Connection failed with: " + response);
+         IntegrationTestLogger.LOGGER.warn("Connection failed with: " + response);
          connected = false;
       }
+      return response;
    }
 
    @Override
    public void disconnect() throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT);
       this.sendFrame(frame);
 
       close();


Mime
View raw message