activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [03/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:33 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java
index 9e3a60f..f0f9180 100644
--- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java
@@ -16,8 +16,8 @@ import org.junit.After;
 
 import org.junit.Test;
 
-import static org.hornetq.api.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION;
-import static org.hornetq.api.core.management.NotificationType.SECURITY_PERMISSION_VIOLATION;
+import static org.hornetq.api.core.management.CoreNotificationType.SECURITY_AUTHENTICATION_VIOLATION;
+import static org.hornetq.api.core.management.CoreNotificationType.SECURITY_PERMISSION_VIOLATION;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -148,11 +148,11 @@ public class SecurityNotificationTest extends UnitTestCase
    {
       super.setUp();
 
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(true);
-      // the notifications are independent of JMX
-      conf.setJMXManagementEnabled(false);
-      conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration conf = createBasicConfig()
+         .setSecurityEnabled(true)
+         // the notifications are independent of JMX
+         .setJMXManagementEnabled(false)
+         .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
       server = HornetQServers.newHornetQServer(conf, false);
       server.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java
new file mode 100644
index 0000000..ae98d2e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.api.core.SimpleString;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ *
+ * @author @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class BasicOpenWireTest extends OpenWireTestBase
+{
+   @Rule public TestName name = new TestName();
+
+   protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
+   protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+   protected ActiveMQConnection connection;
+   protected String topicName = "amqTestTopic1";
+   protected String queueName = "amqTestQueue1";
+   protected String topicName2 = "amqTestTopic2";
+   protected String queueName2 = "amqTestQueue2";
+   protected String durableQueueName = "durableQueueName";
+
+   protected String messageTextPrefix = "";
+   protected boolean topic = true;
+
+   protected Map<String, SimpleString> testQueues = new HashMap<String, SimpleString>();
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      SimpleString coreQueue = new SimpleString("jms.queue." + queueName);
+      this.server.createQueue(coreQueue, coreQueue, null, false, false);
+      testQueues.put(queueName, coreQueue);
+
+      SimpleString coreQueue2 = new SimpleString("jms.queue." + queueName2);
+      this.server.createQueue(coreQueue2, coreQueue2, null, false, false);
+      testQueues.put(queueName2, coreQueue2);
+
+      SimpleString durableQueue = new SimpleString("jms.queue." + durableQueueName);
+      this.server.createQueue(durableQueue, durableQueue, null, true, false);
+      testQueues.put(durableQueueName, durableQueue);
+
+      if (!enableSecurity)
+      {
+         connection = (ActiveMQConnection) factory.createConnection();
+      }
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      System.out.println("tear down! " + connection);
+      try
+      {
+         if (connection != null)
+         {
+            System.out.println("closing connection");
+            connection.close();
+            System.out.println("connection closed.");
+         }
+
+         Iterator<SimpleString> iterQueues = testQueues.values().iterator();
+         while (iterQueues.hasNext())
+         {
+            SimpleString coreQ = iterQueues.next();
+            this.server.destroyQueue(coreQ);
+            System.out.println("Destroyed queue: " + coreQ);
+         }
+         testQueues.clear();
+      }
+      catch (Throwable e)
+      {
+         System.out.println("Exception !! " + e);
+         e.printStackTrace();
+      }
+      finally
+      {
+         super.tearDown();
+         System.out.println("Super done.");
+      }
+   }
+
+   public ActiveMQDestination createDestination(Session session, byte type, String name) throws Exception
+   {
+      if (name == null)
+      {
+         return createDestination(session, type);
+      }
+
+      switch (type)
+      {
+         case ActiveMQDestination.QUEUE_TYPE:
+            makeSureCoreQueueExist(name);
+            return (ActiveMQDestination) session.createQueue(name);
+         case ActiveMQDestination.TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTopic(name);
+         case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return (ActiveMQDestination) session.createTemporaryQueue();
+         case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTemporaryTopic();
+         default:
+            throw new IllegalArgumentException("type: " + type);
+      }
+   }
+
+   public void makeSureCoreQueueExist(String qname) throws Exception
+   {
+      SimpleString coreQ = testQueues.get(qname);
+      if (coreQ == null)
+      {
+         coreQ = new SimpleString("jms.queue." + qname);
+         this.server.createQueue(coreQ, coreQ, null, false, false);
+         testQueues.put(qname, coreQ);
+      }
+   }
+
+   public ActiveMQDestination createDestination(Session session, byte type) throws JMSException
+   {
+      switch (type)
+      {
+         case ActiveMQDestination.QUEUE_TYPE:
+            return (ActiveMQDestination) session.createQueue(queueName);
+         case ActiveMQDestination.TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTopic(topicName);
+         case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return (ActiveMQDestination) session.createTemporaryQueue();
+         case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTemporaryTopic();
+         default:
+            throw new IllegalArgumentException("type: " + type);
+      }
+   }
+
+   protected ActiveMQDestination createDestination2(Session session, byte type) throws JMSException
+   {
+      switch (type)
+      {
+         case ActiveMQDestination.QUEUE_TYPE:
+            return (ActiveMQDestination) session.createQueue(queueName2);
+         case ActiveMQDestination.TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTopic(topicName2);
+         case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return (ActiveMQDestination) session.createTemporaryQueue();
+         case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return (ActiveMQDestination) session.createTemporaryTopic();
+         default:
+            throw new IllegalArgumentException("type: " + type);
+      }
+   }
+
+   protected void sendMessages(Session session, Destination destination, int count) throws JMSException
+   {
+      MessageProducer producer = session.createProducer(destination);
+      sendMessages(session, producer, count);
+      producer.close();
+   }
+
+   protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException
+   {
+      for (int i = 0; i < count; i++)
+      {
+         producer.send(session.createTextMessage(messageTextPrefix + i));
+      }
+   }
+
+   protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException
+   {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sendMessages(session, destination, count);
+      session.close();
+   }
+
+   /**
+    * @param messsage
+    * @param firstSet
+    * @param secondSet
+    */
+   protected void assertTextMessagesEqual(String messsage, Message[] firstSet,
+         Message[] secondSet) throws JMSException
+   {
+      assertEquals("Message count does not match: " + messsage,
+            firstSet.length, secondSet.length);
+      for (int i = 0; i < secondSet.length; i++)
+      {
+         TextMessage m1 = (TextMessage) firstSet[i];
+         TextMessage m2 = (TextMessage) secondSet[i];
+         assertFalse("Message " + (i + 1) + " did not match : " + messsage
+               + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null
+               ^ m2 == null);
+         assertEquals("Message " + (i + 1) + " did not match: " + messsage
+               + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getText(),
+               m2.getText());
+      }
+   }
+
+   protected Connection createConnection() throws JMSException
+   {
+      return factory.createConnection();
+   }
+
+   protected void safeClose(Session s)
+   {
+      try
+      {
+         s.close();
+      }
+      catch (Throwable e)
+      {
+      }
+   }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java
new file mode 100644
index 0000000..e7b09d2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSSecurityException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+*
+* @author @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+*
+*/
+public class BasicSecurityTest extends BasicOpenWireTest
+{
+   @Before
+   public void setUp() throws Exception
+   {
+      this.enableSecurity = true;
+      super.setUp();
+   }
+
+   @Test
+   public void testConnectionWithCredentials() throws Exception
+   {
+      Connection newConn = null;
+
+      //correct
+      try
+      {
+         newConn = factory.createConnection("openwireSender", "SeNdEr");
+         newConn.start();
+         newConn.close();
+
+         newConn = factory.createConnection("openwireReceiver", "ReCeIvEr");
+         newConn.start();
+         newConn.close();
+
+         newConn = null;
+      }
+      finally
+      {
+         if (newConn != null)
+         {
+            newConn.close();
+         }
+      }
+
+      //wrong password
+      try
+      {
+         newConn = factory.createConnection("openwireSender", "WrongPasswD");
+         newConn.start();
+      }
+      catch (JMSSecurityException e)
+      {
+         //expected
+      }
+      finally
+      {
+         if (newConn != null)
+         {
+            newConn.close();
+         }
+      }
+
+      //wrong user
+      try
+      {
+         newConn = factory.createConnection("wronguser", "SeNdEr");
+         newConn.start();
+      }
+      catch (JMSSecurityException e)
+      {
+         //expected
+      }
+      finally
+      {
+         if (newConn != null)
+         {
+            newConn.close();
+         }
+      }
+
+      //both wrong
+      try
+      {
+         newConn = factory.createConnection("wronguser", "wrongpass");
+         newConn.start();
+      }
+      catch (JMSSecurityException e)
+      {
+         //expected
+      }
+      finally
+      {
+         if (newConn != null)
+         {
+            newConn.close();
+         }
+      }
+
+      //default user
+      try
+      {
+         newConn = factory.createConnection();
+         newConn.start();
+      }
+      catch (JMSSecurityException e)
+      {
+         //expected
+      }
+      finally
+      {
+         if (newConn != null)
+         {
+            newConn.close();
+         }
+      }
+   }
+
+   @Test
+   public void testSendnReceiveAuthorization() throws Exception
+   {
+      Connection sendingConn = null;
+      Connection receivingConn = null;
+
+      //Sender
+      try
+      {
+         Destination dest = new ActiveMQQueue(queueName);
+
+         receivingConn = factory.createConnection("openwireReceiver", "ReCeIvEr");
+         receivingConn.start();
+
+         sendingConn = factory.createConnection("openwireSender", "SeNdEr");
+         sendingConn.start();
+
+         Session sendingSession = sendingConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session receivingSession = receivingConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         TextMessage message = sendingSession.createTextMessage("Hello World");
+
+         MessageProducer producer = null;
+
+         producer = receivingSession.createProducer(dest);
+
+         try
+         {
+            producer.send(message);
+         }
+         catch (JMSSecurityException e)
+         {
+            //expected
+            producer.close();
+         }
+
+         producer = sendingSession.createProducer(dest);
+         producer.send(message);
+
+         MessageConsumer consumer = null;
+         try
+         {
+            consumer = sendingSession.createConsumer(dest);
+         }
+         catch (JMSSecurityException e)
+         {
+            //expected
+         }
+
+         consumer = receivingSession.createConsumer(dest);
+         TextMessage received = (TextMessage) consumer.receive();
+
+         assertNotNull(received);
+         assertEquals("Hello World", received.getText());
+      }
+      finally
+      {
+         if (sendingConn != null)
+         {
+            sendingConn.close();
+         }
+
+         if (receivingConn != null)
+         {
+            receivingConn.close();
+         }
+      }
+   }
+
+   @Test
+   public void testCreateTempDestinationAuthorization() throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      //Sender
+      try
+      {
+         conn1 = factory.createConnection("openwireGuest", "GuEsT");
+         conn1.start();
+
+         conn2 = factory.createConnection("openwireDestinationManager", "DeStInAtIoN");
+         conn2.start();
+
+         Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try
+         {
+            session1.createTemporaryQueue();
+            fail("user shouldn't be able to create temp queue");
+         }
+         catch (JMSSecurityException e)
+         {
+            //expected
+         }
+
+         Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         TemporaryQueue q = session2.createTemporaryQueue();
+         assertNotNull(q);
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java
new file mode 100644
index 0000000..668095f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.ConnectionFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.integration.management.ManagementControlHelper;
+import org.hornetq.tests.unit.util.InVMNamingContext;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.junit.After;
+import org.junit.Before;
+
+public class OpenWireTestBase extends ServiceTestBase
+{
+   public static final String OWHOST = "localhost";
+   public static final int OWPORT = 61616;
+
+   protected HornetQServer server;
+
+   protected JMSServerManagerImpl jmsServer;
+   protected boolean realStore = false;
+   protected boolean enableSecurity = false;
+
+   protected ConnectionFactory coreCf;
+   protected InVMNamingContext namingContext;
+
+   protected MBeanServer mbeanServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      server = this.createServer(realStore, true);
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PORT_PROP_NAME, "61616");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+
+      Configuration serverConfig = server.getConfiguration();
+
+      Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
+      String match = "jms.queue.#";
+      AddressSettings dlaSettings = new AddressSettings();
+      SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
+      dlaSettings.setDeadLetterAddress(dla);
+      addressSettings.put(match, dlaSettings);
+
+      serverConfig.getAcceptorConfigurations().add(transportConfiguration);
+      serverConfig.setSecurityEnabled(enableSecurity);
+
+      extraServerConfig(serverConfig);
+
+      if (enableSecurity)
+      {
+         server.getSecurityManager().addRole("openwireSender", "sender");
+         server.getSecurityManager().addUser("openwireSender", "SeNdEr");
+         //sender cannot receive
+         Role senderRole = new Role("sender", true, false, false, false, true, true, false);
+
+         server.getSecurityManager().addRole("openwireReceiver", "receiver");
+         server.getSecurityManager().addUser("openwireReceiver", "ReCeIvEr");
+         //receiver cannot send
+         Role receiverRole = new Role("receiver", false, true, false, false, true, true, false);
+
+         server.getSecurityManager().addRole("openwireGuest", "guest");
+         server.getSecurityManager().addUser("openwireGuest", "GuEsT");
+
+         //guest cannot do anything
+         Role guestRole = new Role("guest", false, false, false, false, false, false, false);
+
+         server.getSecurityManager().addRole("openwireDestinationManager", "manager");
+         server.getSecurityManager().addUser("openwireDestinationManager", "DeStInAtIoN");
+
+         //guest cannot do anything
+         Role destRole = new Role("manager", false, false, false, false, true, true, false);
+
+         Map<String, Set<Role>> settings = server.getConfiguration().getSecurityRoles();
+         if (settings == null)
+         {
+            settings = new HashMap<String, Set<Role>>();
+            server.getConfiguration().setSecurityRoles(settings);
+         }
+         Set<Role> anySet = settings.get("#");
+         if (anySet == null)
+         {
+            anySet = new HashSet<Role>();
+            settings.put("#", anySet);
+         }
+         anySet.add(senderRole);
+         anySet.add(receiverRole);
+         anySet.add(guestRole);
+         anySet.add(destRole);
+      }
+      jmsServer = new JMSServerManagerImpl(server);
+      namingContext = new InVMNamingContext();
+      jmsServer.setContext(namingContext);
+      jmsServer.start();
+
+      registerConnectionFactory();
+
+      mbeanServer = MBeanServerFactory.createMBeanServer();
+      System.out.println("debug: server started");
+   }
+
+   //override this to add extra server configs
+   protected void extraServerConfig(Configuration serverConfig)
+   {
+   }
+
+   protected void registerConnectionFactory() throws Exception
+   {
+      List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
+      connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      createCF(connectorConfigs, "/cf");
+
+      coreCf = (ConnectionFactory) namingContext.lookup("/cf");
+   }
+
+   protected void createCF(final List<TransportConfiguration> connectorConfigs, final String... jndiBindings) throws Exception
+   {
+      final int retryInterval = 1000;
+      final double retryIntervalMultiplier = 1.0;
+      final int reconnectAttempts = -1;
+      final int callTimeout = 30000;
+      final boolean ha = false;
+      List<String> connectorNames = registerConnectors(server, connectorConfigs);
+
+      String cfName = name.getMethodName();
+      if (cfName == null)
+      {
+         cfName = "cfOpenWire";
+      }
+      ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl()
+         .setName(cfName)
+         .setConnectorNames(connectorNames)
+         .setRetryInterval(retryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setCallTimeout(callTimeout)
+         .setReconnectAttempts(reconnectAttempts);
+      jmsServer.createConnectionFactory(false, configuration, jndiBindings);
+   }
+
+   protected JMSServerControl getJMSServerControl() throws Exception
+   {
+      return ManagementControlHelper.createJMSServerControl(mbeanServer);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      MBeanServerFactory.releaseMBeanServer(mbeanServer);
+      mbeanServer = null;
+      server.stop();
+      super.tearDown();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java
new file mode 100644
index 0000000..b17b8df
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire;
+
+import static org.junit.Assert.assertEquals;
+
+import org.hornetq.core.protocol.openwire.OpenWireUtil;
+import org.junit.Test;
+
+public class OpenWireUtilTest
+{
+   @Test
+   public void testWildcardConversion() throws Exception
+   {
+      String amqTarget = "TEST.ONE.>";
+      String coreTarget = OpenWireUtil.convertWildcard(amqTarget);
+      assertEquals("TEST.ONE.#", coreTarget);
+
+      amqTarget = "TEST.*.ONE";
+      coreTarget = OpenWireUtil.convertWildcard(amqTarget);
+      assertEquals("TEST.*.ONE", coreTarget);
+
+      amqTarget = "a.*.>.>";
+      coreTarget = OpenWireUtil.convertWildcard(amqTarget);
+      assertEquals("a.*.#", coreTarget);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java
new file mode 100644
index 0000000..faf6fe1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * @author @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class SimpleOpenWireTest extends BasicOpenWireTest
+{
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      this.realStore = true;
+      super.setUp();
+   }
+
+   @Test
+   public void testSimpleQueue() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating queue: " + queueName);
+      Destination dest = new ActiveMQQueue(queueName);
+
+      System.out.println("creating producer...");
+      MessageProducer producer = session.createProducer(dest);
+
+      final int num = 1;
+      final String msgBase = "MfromAMQ-";
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = session.createTextMessage("MfromAMQ-" + i);
+         producer.send(msg);
+         System.out.println("sent: ");
+      }
+
+      //receive
+      MessageConsumer consumer = session.createConsumer(dest);
+
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         System.out.println("content: " + content);
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer.receive(1000));
+
+      session.close();
+   }
+
+   @Test
+   public void testSimpleTopic() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating queue: " + topicName);
+      Destination dest = new ActiveMQTopic(topicName);
+
+      MessageConsumer consumer1 = session.createConsumer(dest);
+      MessageConsumer consumer2 = session.createConsumer(dest);
+
+      MessageProducer producer = session.createProducer(dest);
+
+      final int num = 1;
+      final String msgBase = "MfromAMQ-";
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = session.createTextMessage("MfromAMQ-" + i);
+         producer.send(msg);
+         System.out.println("Sent a message");
+      }
+
+      //receive
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer1.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer1.receive(500));
+
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer2.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer2.receive(500));
+      session.close();
+   }
+
+   @Test
+   public void testSimpleTempTopic() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating temp topic");
+      TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+      System.out.println("create consumer 1");
+      MessageConsumer consumer1 = session.createConsumer(tempTopic);
+      System.out.println("create consumer 2");
+      MessageConsumer consumer2 = session.createConsumer(tempTopic);
+
+      System.out.println("create producer");
+      MessageProducer producer = session.createProducer(tempTopic);
+
+      System.out.println("sending messages");
+      final int num = 1;
+      final String msgBase = "MfromAMQ-";
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = session.createTextMessage("MfromAMQ-" + i);
+         producer.send(msg);
+         System.out.println("Sent a message");
+      }
+
+      //receive
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer1.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer1.receive(500));
+
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer2.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer2.receive(500));
+      session.close();
+   }
+
+   @Test
+   public void testSimpleTempQueue() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating temp queue");
+      TemporaryQueue tempQueue = session.createTemporaryQueue();
+
+      System.out.println("create consumer 1");
+      MessageConsumer consumer1 = session.createConsumer(tempQueue);
+
+      System.out.println("create producer");
+      MessageProducer producer = session.createProducer(tempQueue);
+
+      System.out.println("sending messages");
+      final int num = 1;
+      final String msgBase = "MfromAMQ-";
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = session.createTextMessage("MfromAMQ-" + i);
+         producer.send(msg);
+         System.out.println("Sent a message");
+      }
+
+      //receive
+      System.out.println("receiving messages...");
+      for (int i = 0; i < num; i++)
+      {
+         TextMessage msg = (TextMessage) consumer1.receive(5000);
+         System.out.println("received: " + msg);
+         String content = msg.getText();
+         assertEquals(msgBase + i, content);
+      }
+
+      assertNull(consumer1.receive(500));
+      session.close();
+   }
+
+   /**
+    * This is the example shipped with the distribution
+    * @throws Exception
+    */
+   @Test
+   public void testOpenWireExample() throws Exception
+   {
+      Connection exConn = null;
+
+      try
+      {
+         String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+
+         // Step 2. Perfom a lookup on the queue
+         Queue queue = new ActiveMQQueue(durableQueueName);
+
+         // Step 4.Create a JMS Connection
+         exConn = exFact.createConnection();
+
+         // Step 10. Start the Connection
+         exConn.start();
+
+         // Step 5. Create a JMS Session
+         Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // Step 6. Create a JMS Message Producer
+         MessageProducer producer = session.createProducer(queue);
+
+         // Step 7. Create a Text Message
+         TextMessage message = session.createTextMessage("This is a text message");
+
+         //System.out.println("Sent message: " + message.getText());
+
+         // Step 8. Send the Message
+         producer.send(message);
+
+         // Step 9. Create a JMS Message Consumer
+         MessageConsumer messageConsumer = session.createConsumer(queue);
+
+         // Step 11. Receive the message
+         TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+         System.out.println("Received message: " + messageReceived);
+
+         assertEquals("This is a text message", messageReceived.getText());
+      }
+      finally
+      {
+         if (exConn != null)
+         {
+            exConn.close();
+         }
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java
new file mode 100644
index 0000000..d6a9942
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer10Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public int ackMode;
+   public byte destinationType;
+
+   public JMSConsumer10Test(int deliveryMode, int ackMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.ackMode = ackMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testUnackedWithPrefetch1StayInQueue() throws Exception
+   {
+
+      // Set prefetch to 1
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      // Use all the ack modes
+      Session session = connection.createSession(false, ackMode);
+      ActiveMQDestination destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      // Only pick up the first 2 messages.
+      Message message = null;
+      for (int i = 0; i < 2; i++)
+      {
+         message = consumer.receive(1000);
+         assertNotNull(message);
+      }
+      message.acknowledge();
+
+      connection.close();
+      connection = (ActiveMQConnection) factory.createConnection();
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      // Use all the ack modes
+      session = connection.createSession(false, ackMode);
+      consumer = session.createConsumer(destination);
+
+      // Pickup the rest of the messages.
+      for (int i = 0; i < 2; i++)
+      {
+         message = consumer.receive(1000);
+         assertNotNull(message);
+      }
+      message.acknowledge();
+      assertNull(consumer.receiveNoWait());
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java
new file mode 100644
index 0000000..5376f88
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer11Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT},
+         {DeliveryMode.PERSISTENT}
+      });
+   }
+
+   public int deliveryMode;
+
+   public JMSConsumer11Test(int deliveryMode)
+   {
+      this.deliveryMode = deliveryMode;
+   }
+
+   @Test
+   public void testPrefetch1MessageNotDispatched() throws Exception
+   {
+      // Set prefetch to 1
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      Session session = connection.createSession(true, 0);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send 2 messages to the destination.
+      sendMessages(session, destination, 2);
+      session.commit();
+
+      // The prefetch should fill up with 1 message.
+      // Since prefetch is still full, the 2nd message should get dispatched
+      // to another consumer.. lets create the 2nd consumer test that it does
+      // make sure it does.
+      ActiveMQConnection connection2 = (ActiveMQConnection) factory
+            .createConnection();
+      connection2.start();
+      Session session2 = connection2.createSession(true, 0);
+      MessageConsumer consumer2 = session2.createConsumer(destination);
+
+      System.out.println("consumer receiving ...");
+      // Pick up the first message.
+      Message message1 = consumer.receive(1000);
+      System.out.println("received1: " + message1);
+      assertNotNull(message1);
+
+      System.out.println("consumer 2 receiving...");
+      // Pick up the 2nd messages.
+      Message message2 = consumer2.receive(5000);
+      System.out.println("received2: " + message2);
+      assertNotNull(message2);
+
+      System.out.println("commitning sessions !! " + session.getClass().getName());
+      session.commit();
+      System.out.println("commited session, now 2");
+      session2.commit();
+
+      System.out.println("all commited");
+      Message m = consumer.receiveNoWait();
+      System.out.println("recieved 3: " + m);
+      assertNull(m);
+
+      try
+      {
+         connection2.close();
+      }
+      catch (Throwable e)
+      {
+         System.err.println("exception e: " + e);
+         e.printStackTrace();
+      }
+
+      System.out.println("Test finished!!");
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java
new file mode 100644
index 0000000..633cd01
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer12Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSConsumer12Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testDontStart() throws Exception
+   {
+
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 1);
+
+      // Make sure no messages were delivered.
+      assertNull(consumer.receive(1000));
+   }
+
+   @Test
+   public void testStartAfterSend() throws Exception
+   {
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 1);
+
+      // Start the conncection after the message was sent.
+      connection.start();
+
+      // Make sure only 1 message was delivered.
+      assertNotNull(consumer.receive(1000));
+      assertNull(consumer.receiveNoWait());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java
new file mode 100644
index 0000000..82d8326
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer13Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSConsumer13Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testReceiveMessageWithConsumer() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 1);
+
+      // Make sure only 1 message was delivered.
+      Message m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals("0", ((TextMessage) m).getText());
+      assertNull(consumer.receiveNoWait());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java
new file mode 100644
index 0000000..622a55b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer1Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}
+      });
+   }
+
+   public ActiveMQDestination destination;
+   public int deliveryMode;
+   public int prefetch;
+   public int ackMode;
+   public byte destinationType;
+   public boolean durableConsumer;
+
+   public JMSConsumer1Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testMessageListenerWithConsumerCanBeStopped() throws Exception
+   {
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch done1 = new CountDownLatch(1);
+      final CountDownLatch done2 = new CountDownLatch(1);
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
+            .createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            counter.incrementAndGet();
+            if (counter.get() == 1)
+            {
+               done1.countDown();
+            }
+            if (counter.get() == 2)
+            {
+               done2.countDown();
+            }
+         }
+      });
+
+      // Send a first message to make sure that the consumer dispatcher is
+      // running
+      sendMessages(session, destination, 1);
+      assertTrue(done1.await(1, TimeUnit.SECONDS));
+      assertEquals(1, counter.get());
+
+      // Stop the consumer.
+      consumer.stop();
+
+      // Send a message, but should not get delivered.
+      sendMessages(session, destination, 1);
+      assertFalse(done2.await(1, TimeUnit.SECONDS));
+      assertEquals(1, counter.get());
+
+      // Start the consumer, and the message should now get delivered.
+      consumer.start();
+      assertTrue(done2.await(1, TimeUnit.SECONDS));
+      assertEquals(2, counter.get());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java
new file mode 100644
index 0000000..cee9545
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JMSConsumer2Test extends BasicOpenWireTest
+{
+   @Test
+   public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception
+   {
+
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch closeDone = new CountDownLatch(1);
+
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            ActiveMQDestination.QUEUE_TYPE);
+
+      // preload the queue
+      sendMessages(session, destination, 2000);
+
+      final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
+            .createConsumer(destination);
+
+      final Map<Thread, Throwable> exceptions = Collections
+            .synchronizedMap(new HashMap<Thread, Throwable>());
+      Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler()
+      {
+         @Override
+         public void uncaughtException(Thread t, Throwable e)
+         {
+            exceptions.put(t, e);
+         }
+      });
+
+      final class AckAndClose implements Runnable
+      {
+         private final Message message;
+
+         public AckAndClose(Message m)
+         {
+            this.message = m;
+         }
+
+         @Override
+         public void run()
+         {
+            try
+            {
+               int count = counter.incrementAndGet();
+               if (count == 590)
+               {
+                  // close in a separate thread is ok by jms
+                  consumer.close();
+                  closeDone.countDown();
+               }
+               if (count % 200 == 0)
+               {
+                  // ensure there are some outstanding messages
+                  // ack every 200
+                  message.acknowledge();
+               }
+            }
+            catch (Exception e)
+            {
+               exceptions.put(Thread.currentThread(), e);
+            }
+         }
+      }
+
+      final ExecutorService executor = Executors.newCachedThreadPool();
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            // ack and close eventually in separate thread
+            executor.execute(new AckAndClose(m));
+         }
+      });
+
+      assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+      // await possible exceptions
+      Thread.sleep(1000);
+      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+   }
+
+   @Test
+   public void testDupsOkConsumer() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.DUPS_OK_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      // Make sure only 4 message are delivered.
+      for (int i = 0; i < 4; i++)
+      {
+         Message m = consumer.receive(1000);
+         assertNotNull(m);
+      }
+      assertNull(consumer.receive(1000));
+
+      // Close out the consumer.. no other messages should be left on the queue.
+      consumer.close();
+
+      consumer = session.createConsumer(destination);
+      assertNull(consumer.receive(1000));
+   }
+
+   @Test
+   public void testRedispatchOfUncommittedTx() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(true,
+            Session.SESSION_TRANSACTED);
+      ActiveMQDestination destination = createDestination(session,
+            ActiveMQDestination.QUEUE_TYPE);
+
+      sendMessages(connection, destination, 2);
+
+      MessageConsumer consumer = session.createConsumer(destination);
+      Message m = consumer.receive(1000);
+      System.out.println("m1 received: " + m);
+      assertNotNull(m);
+      m = consumer.receive(5000);
+      System.out.println("m2 received: " + m);
+      assertNotNull(m);
+
+      // install another consumer while message dispatch is unacked/uncommitted
+      Session redispatchSession = connection.createSession(true,
+            Session.SESSION_TRANSACTED);
+      MessageConsumer redispatchConsumer = redispatchSession
+            .createConsumer(destination);
+      System.out.println("redispatch consumer: " + redispatchConsumer);
+
+      // no commit so will auto rollback and get re-dispatched to
+      // redisptachConsumer
+      System.out.println("closing session: " + session);
+      session.close();
+
+      Message msg = redispatchConsumer.receive(3000);
+      assertNotNull(msg);
+
+      assertTrue("redelivered flag set", msg.getJMSRedelivered());
+      assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+
+      msg = redispatchConsumer.receive(1000);
+      assertNotNull(msg);
+      assertTrue(msg.getJMSRedelivered());
+      assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+      redispatchSession.commit();
+
+      assertNull(redispatchConsumer.receive(500));
+      System.out.println("closing dispatch session: " + redispatchSession);
+      redispatchSession.close();
+   }
+
+   @Test
+   public void testRedispatchOfRolledbackTx() throws Exception
+   {
+
+      connection.start();
+      Session session = connection.createSession(true,
+            Session.SESSION_TRANSACTED);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      sendMessages(connection, destination, 2);
+
+      MessageConsumer consumer = session.createConsumer(destination);
+      assertNotNull(consumer.receive(1000));
+      assertNotNull(consumer.receive(1000));
+
+      // install another consumer while message dispatch is unacked/uncommitted
+      Session redispatchSession = connection.createSession(true,
+            Session.SESSION_TRANSACTED);
+      MessageConsumer redispatchConsumer = redispatchSession
+            .createConsumer(destination);
+
+      session.rollback();
+      session.close();
+
+      Message msg = redispatchConsumer.receive(1000);
+      assertNotNull(msg);
+      assertTrue(msg.getJMSRedelivered());
+      assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+      msg = redispatchConsumer.receive(1000);
+      assertNotNull(msg);
+      assertTrue(msg.getJMSRedelivered());
+      assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+      redispatchSession.commit();
+
+      assertNull(redispatchConsumer.receive(500));
+      redispatchSession.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java
new file mode 100644
index 0000000..a444fff
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer3Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}
+      });
+   }
+
+   public ActiveMQDestination destination;
+   public int deliveryMode;
+   public int prefetch;
+   public int ackMode;
+   public byte destinationType;
+   public boolean durableConsumer;
+
+   public JMSConsumer3Test(int deliveryMode, int ackMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.ackMode = ackMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testMutiReceiveWithPrefetch1() throws Exception
+   {
+      // Set prefetch to 1
+      ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      // Use all the ack modes
+      Session session = connection.createSession(false, ackMode);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      System.out.println("messages are sent.");
+      // Make sure 4 messages were delivered.
+      Message message = null;
+      for (int i = 0; i < 4; i++)
+      {
+         message = consumer.receive(5000);
+         System.out.println("message received: " + message + " ack mode: " + ackMode);
+         assertNotNull(message);
+      }
+      assertNull(consumer.receiveNoWait());
+      message.acknowledge();
+   }
+
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java
new file mode 100644
index 0000000..d4fe157
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer4Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSConsumer4Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testDurableConsumerSelectorChange() throws Exception
+   {
+      // Receive a message with the JMS API
+      connection.setClientID("test");
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageProducer producer = session.createProducer(destination);
+      producer.setDeliveryMode(deliveryMode);
+      MessageConsumer consumer = session.createDurableSubscriber(
+            (Topic) destination, "test", "color='red'", false);
+
+      // Send the messages
+      TextMessage message = session.createTextMessage("1st");
+      message.setStringProperty("color", "red");
+      producer.send(message);
+
+      Message m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals("1st", ((TextMessage) m).getText());
+
+      // Change the subscription.
+      consumer.close();
+      consumer = session.createDurableSubscriber((Topic) destination, "test",
+            "color='blue'", false);
+
+      message = session.createTextMessage("2nd");
+      message.setStringProperty("color", "red");
+      producer.send(message);
+      message = session.createTextMessage("3rd");
+      message.setStringProperty("color", "blue");
+      producer.send(message);
+
+      // Selector should skip the 2nd message.
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals("3rd", ((TextMessage) m).getText());
+
+      assertNull(consumer.receiveNoWait());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java
new file mode 100644
index 0000000..70d7c41
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer5Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSConsumer5Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testSendReceiveBytesMessage() throws Exception
+   {
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      BytesMessage message = session.createBytesMessage();
+      message.writeBoolean(true);
+      message.writeBoolean(false);
+      producer.send(message);
+
+      // Make sure only 1 message was delivered.
+      BytesMessage m = (BytesMessage) consumer.receive(1000);
+      assertNotNull(m);
+      assertTrue(m.readBoolean());
+      assertFalse(m.readBoolean());
+
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testSetMessageListenerAfterStart() throws Exception
+   {
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      // See if the message get sent to the listener
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            counter.incrementAndGet();
+            if (counter.get() == 4)
+            {
+               System.out.println("ok finished all 4, done sleep");
+               done.countDown();
+            }
+         }
+      });
+
+      assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+      System.out.println("ok await ok");
+      Thread.sleep(200);
+
+      // Make sure only 4 messages were delivered.
+      assertEquals(4, counter.get());
+      System.out.println("test done ok " + counter.get());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java
new file mode 100644
index 0000000..d17dd06
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer6Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "destinationType={0}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {ActiveMQDestination.QUEUE_TYPE},
+         {ActiveMQDestination.TOPIC_TYPE}
+      });
+   }
+
+   public byte destinationType;
+
+   public JMSConsumer6Test(byte destinationType)
+   {
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testPassMessageListenerIntoCreateConsumer() throws Exception
+   {
+
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      // Receive a message with the JMS API
+      connection.start();
+      ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination,
+         new MessageListener()
+         {
+            @Override
+            public void onMessage(Message m)
+            {
+               counter.incrementAndGet();
+               if (counter.get() == 4)
+               {
+                  done.countDown();
+               }
+            }
+         });
+      assertNotNull(consumer);
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+      Thread.sleep(200);
+
+      // Make sure only 4 messages were delivered.
+      assertEquals(4, counter.get());
+   }
+
+   @Test
+   public void testAckOfExpired() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+
+      MessageConsumer consumer = session.createConsumer(destination);
+      connection.setStatsEnabled(true);
+
+      Session sendSession = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(destination);
+      producer.setTimeToLive(1000);
+      final int count = 4;
+      for (int i = 0; i < count; i++)
+      {
+         TextMessage message = sendSession.createTextMessage("" + i);
+         producer.send(message);
+      }
+
+      // let first bunch in queue expire
+      Thread.sleep(2000);
+
+      producer.setTimeToLive(0);
+      for (int i = 0; i < count; i++)
+      {
+         TextMessage message = sendSession.createTextMessage("no expiry" + i);
+         producer.send(message);
+      }
+
+      ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+
+      for (int i = 0; i < count; i++)
+      {
+         TextMessage msg = (TextMessage) amqConsumer.receive();
+         assertNotNull(msg);
+         assertTrue("message has \"no expiry\" text: " + msg.getText(), msg
+               .getText().contains("no expiry"));
+
+         // force an ack when there are expired messages
+         amqConsumer.acknowledge();
+      }
+      assertEquals("consumer has expiredMessages", count, amqConsumer
+            .getConsumerStats().getExpiredMessageCount().getCount());
+   }
+
+}


Mime
View raw message