activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [02/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:32 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java
new file mode 100644
index 0000000..a3cb991
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer7Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public int ackMode;
+   public byte destinationType;
+
+   public JMSConsumer7Test(int deliveryMode, int ackMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.ackMode = ackMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception
+   {
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch sendDone = new CountDownLatch(1);
+      final CountDownLatch got2Done = new CountDownLatch(1);
+
+      // Set prefetch to 1
+      connection.getPrefetchPolicy().setAll(1);
+      // This test case does not work if optimized message dispatch is used as
+      // the main thread send block until the consumer receives the
+      // message. This test depends on thread decoupling so that the main
+      // thread can stop the consumer thread.
+      connection.setOptimizedMessageDispatch(false);
+      connection.start();
+
+      // Use all the ack modes
+      Session session = connection.createSession(false, ackMode);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            try
+            {
+               TextMessage tm = (TextMessage) m;
+               assertEquals("" + counter.get(), tm.getText());
+               counter.incrementAndGet();
+               if (counter.get() == 2)
+               {
+                  sendDone.await();
+                  connection.close();
+                  got2Done.countDown();
+               }
+               System.out.println("acking tm: " + tm.getText());
+               tm.acknowledge();
+            }
+            catch (Throwable e)
+            {
+               System.out.println("ack failed!!");
+               e.printStackTrace();
+            }
+         }
+      });
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+      sendDone.countDown();
+
+      // Wait for first 2 messages to arrive.
+      assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+      // Re-start connection.
+      connection = (ActiveMQConnection) factory.createConnection();
+
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      // Pickup the remaining messages.
+      final CountDownLatch done2 = new CountDownLatch(1);
+      session = connection.createSession(false, ackMode);
+      consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            try
+            {
+               TextMessage tm = (TextMessage) m;
+               System.out.println("2nd received: " + tm.getText());
+               // order is not guaranteed as the connection is started before
+               // the listener is set.
+               // assertEquals("" + counter.get(), tm.getText());
+               counter.incrementAndGet();
+               if (counter.get() == 4)
+               {
+                  done2.countDown();
+               }
+            }
+            catch (Throwable e)
+            {
+               System.err.println("Unexpected exception: " + e);
+            }
+         }
+      });
+
+      assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+      Thread.sleep(200);
+
+      // assert msg 2 was redelivered as close() from onMessages() will only ack
+      // in auto_ack and dups_ok mode
+      assertEquals(5, counter.get());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java
new file mode 100644
index 0000000..cfcfce4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer8Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public int ackMode;
+   public byte destinationType;
+
+   public JMSConsumer8Test(int deliveryMode, int ackMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.ackMode = ackMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception
+   {
+
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch sendDone = new CountDownLatch(1);
+      final CountDownLatch got2Done = new CountDownLatch(1);
+
+      // Set prefetch to 1
+      connection.getPrefetchPolicy().setAll(1);
+      // This test case does not work if optimized message dispatch is used as
+      // the main thread send block until the consumer receives the
+      // message. This test depends on thread decoupling so that the main
+      // thread can stop the consumer thread.
+      connection.setOptimizedMessageDispatch(false);
+      connection.start();
+
+      // Use all the ack modes
+      Session session = connection.createSession(false, ackMode);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            try
+            {
+               TextMessage tm = (TextMessage) m;
+               assertEquals("" + counter.get(), tm.getText());
+               counter.incrementAndGet();
+               m.acknowledge();
+               if (counter.get() == 2)
+               {
+                  sendDone.await();
+                  connection.close();
+                  got2Done.countDown();
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+            }
+         }
+      });
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+      sendDone.countDown();
+
+      // Wait for first 2 messages to arrive.
+      assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+      // Re-start connection.
+      connection = (ActiveMQConnection) factory.createConnection();
+
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      // Pickup the remaining messages.
+      final CountDownLatch done2 = new CountDownLatch(1);
+      session = connection.createSession(false, ackMode);
+      consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            try
+            {
+               TextMessage tm = (TextMessage) m;
+               counter.incrementAndGet();
+               if (counter.get() == 4)
+               {
+                  done2.countDown();
+               }
+            }
+            catch (Throwable e)
+            {
+               System.err.println("Unexpected exception " + e);
+            }
+         }
+      });
+
+      assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+      Thread.sleep(200);
+
+      // close from onMessage with Auto_ack will ack
+      // Make sure only 4 messages were delivered.
+      assertEquals(4, counter.get());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java
new file mode 100644
index 0000000..a69e0e3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSConsumerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSConsumer9Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSConsumer9Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testMessageListenerWithConsumerWithPrefetch1() throws Exception
+   {
+
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      // Receive a message with the JMS API
+      connection.getPrefetchPolicy().setAll(1);
+      connection.start();
+
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            counter.incrementAndGet();
+            if (counter.get() == 4)
+            {
+               done.countDown();
+            }
+         }
+      });
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+      Thread.sleep(200);
+
+      // Make sure only 4 messages were delivered.
+      assertEquals(4, counter.get());
+   }
+
+   public void testMessageListenerWithConsumer() throws Exception
+   {
+
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      consumer.setMessageListener(new MessageListener()
+      {
+         @Override
+         public void onMessage(Message m)
+         {
+            counter.incrementAndGet();
+            if (counter.get() == 4)
+            {
+               done.countDown();
+            }
+         }
+      });
+
+      // Send the messages
+      sendMessages(session, destination, 4);
+
+      assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+      Thread.sleep(200);
+
+      // Make sure only 4 messages were delivered.
+      assertEquals(4, counter.get());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java
new file mode 100644
index 0000000..dd1e931
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JMSDurableTopicRedeliverTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest
+{
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      durable = true;
+      super.setUp();
+   }
+
+   /**
+    * Sends and consumes the messages.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testRedeliverNewSession() throws Exception
+   {
+      String text = "TEST: " + System.currentTimeMillis();
+      Message sendMessage = session.createTextMessage(text);
+
+      if (verbose)
+      {
+         System.out.println("About to send a message: " + sendMessage
+               + " with text: " + text);
+      }
+      producer.send(producerDestination, sendMessage);
+
+      // receive but don't acknowledge
+      Message unackMessage = consumer.receive(1000);
+      assertNotNull(unackMessage);
+      String unackId = unackMessage.getJMSMessageID();
+      assertEquals(((TextMessage) unackMessage).getText(), text);
+      assertFalse(unackMessage.getJMSRedelivered());
+      assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
+      consumeSession.close();
+      consumer.close();
+
+      // receive then acknowledge
+      consumeSession = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      consumer = createConsumer(getName());
+      Message ackMessage = consumer.receive(1000);
+      assertNotNull(ackMessage);
+
+      ackMessage.acknowledge();
+
+      String ackId = ackMessage.getJMSMessageID();
+      assertEquals(((TextMessage) ackMessage).getText(), text);
+      assertEquals(2, ackMessage.getIntProperty("JMSXDeliveryCount"));
+      assertEquals(unackId, ackId);
+      consumeSession.close();
+      consumer.close();
+
+      consumeSession = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      consumer = createConsumer(getName());
+      assertNull(consumer.receive(1000));
+   }
+
+   protected String getName()
+   {
+      return "JMSDurableTopicRedeliverTest";
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java
new file mode 100644
index 0000000..203ade9
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JMSIndividualAckTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JMSIndividualAckTest extends BasicOpenWireTest
+{
+
+   /**
+    * Tests if acknowledged messages are being consumed.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testAckedMessageAreConsumed() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+      Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+   }
+
+   /**
+    * Tests if acknowledged messages are being consumed.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testLastMessageAcked() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+      Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageProducer producer = session.createProducer(queue);
+      TextMessage msg1 = session.createTextMessage("msg1");
+      TextMessage msg2 = session.createTextMessage("msg2");
+      TextMessage msg3 = session.createTextMessage("msg3");
+      producer.send(msg1);
+      producer.send(msg2);
+      producer.send(msg3);
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false,
+            ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      assertEquals(msg1, msg);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      assertEquals(msg2, msg);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+      session.close();
+   }
+
+   /**
+    * Tests if unacknowledged messages are being re-delivered when the consumer
+    * connects again.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+      Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      // Don't ack the message.
+
+      // Reset the session. This should cause the unacknowledged message to be
+      // re-delivered.
+      session.close();
+      session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(2000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      session.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java
new file mode 100644
index 0000000..fccd136
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java
@@ -0,0 +1,639 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSMessageTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSMessageTest extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+      });
+   }
+
+   public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+   public byte destinationType = ActiveMQDestination.QUEUE_TYPE;
+   public ActiveMQDestination destination;
+   public boolean durableConsumer;
+
+   public JMSMessageTest(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testTextMessage() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // Send the message.
+      {
+         TextMessage message = session.createTextMessage();
+         message.setText("Hi");
+         producer.send(message);
+      }
+
+      // Check the Message
+      {
+         TextMessage message = (TextMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertEquals("Hi", message.getText());
+      }
+
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testBytesMessageLength() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // Send the message
+      {
+         BytesMessage message = session.createBytesMessage();
+         message.writeInt(1);
+         message.writeInt(2);
+         message.writeInt(3);
+         message.writeInt(4);
+         producer.send(message);
+      }
+
+      // Check the message.
+      {
+         BytesMessage message = (BytesMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertEquals(16, message.getBodyLength());
+      }
+
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testObjectMessage() throws Exception
+   {
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // send the message.
+      {
+         ObjectMessage message = session.createObjectMessage();
+         message.setObject("Hi");
+         producer.send(message);
+      }
+
+      // Check the message
+      {
+         ObjectMessage message = (ObjectMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertEquals("Hi", message.getObject());
+      }
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testBytesMessage() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // Send the message
+      {
+         BytesMessage message = session.createBytesMessage();
+         message.writeBoolean(true);
+         producer.send(message);
+      }
+
+      // Check the message
+      {
+         BytesMessage message = (BytesMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertTrue(message.readBoolean());
+
+         try
+         {
+            message.readByte();
+            fail("Expected exception not thrown.");
+         }
+         catch (MessageEOFException e)
+         {
+         }
+
+      }
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testStreamMessage() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // Send the message.
+      {
+         StreamMessage message = session.createStreamMessage();
+         message.writeString("This is a test to see how it works.");
+         producer.send(message);
+      }
+
+      // Check the message.
+      {
+         StreamMessage message = (StreamMessage) consumer.receive(1000);
+         assertNotNull(message);
+
+         // Invalid conversion should throw exception and not move the stream
+         // position.
+         try
+         {
+            message.readByte();
+            fail("Should have received NumberFormatException");
+         }
+         catch (NumberFormatException e)
+         {
+         }
+
+         assertEquals("This is a test to see how it works.",
+               message.readString());
+
+         // Invalid conversion should throw exception and not move the stream
+         // position.
+         try
+         {
+            message.readByte();
+            fail("Should have received MessageEOFException");
+         }
+         catch (MessageEOFException e)
+         {
+         }
+      }
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testMapMessage() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // send the message.
+      {
+         MapMessage message = session.createMapMessage();
+         message.setBoolean("boolKey", true);
+         producer.send(message);
+      }
+
+      // get the message.
+      {
+         MapMessage message = (MapMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertTrue(message.getBoolean("boolKey"));
+      }
+      assertNull(consumer.receiveNoWait());
+   }
+
+   static class ForeignMessage implements TextMessage
+   {
+
+      public int deliveryMode;
+
+      private String messageId;
+      private long timestamp;
+      private String correlationId;
+      private Destination replyTo;
+      private Destination destination;
+      private boolean redelivered;
+      private String type;
+      private long expiration;
+      private int priority;
+      private String text;
+      private final HashMap<String, Object> props = new HashMap<String, Object>();
+
+      @Override
+      public String getJMSMessageID() throws JMSException
+      {
+         return messageId;
+      }
+
+      @Override
+      public void setJMSMessageID(String arg0) throws JMSException
+      {
+         messageId = arg0;
+      }
+
+      @Override
+      public long getJMSTimestamp() throws JMSException
+      {
+         return timestamp;
+      }
+
+      @Override
+      public void setJMSTimestamp(long arg0) throws JMSException
+      {
+         timestamp = arg0;
+      }
+
+      @Override
+      public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+      {
+         return null;
+      }
+
+      @Override
+      public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException
+      {
+      }
+
+      @Override
+      public void setJMSCorrelationID(String arg0) throws JMSException
+      {
+         correlationId = arg0;
+      }
+
+      @Override
+      public String getJMSCorrelationID() throws JMSException
+      {
+         return correlationId;
+      }
+
+      @Override
+      public Destination getJMSReplyTo() throws JMSException
+      {
+         return replyTo;
+      }
+
+      @Override
+      public void setJMSReplyTo(Destination arg0) throws JMSException
+      {
+         replyTo = arg0;
+      }
+
+      @Override
+      public Destination getJMSDestination() throws JMSException
+      {
+         return destination;
+      }
+
+      @Override
+      public void setJMSDestination(Destination arg0) throws JMSException
+      {
+         destination = arg0;
+      }
+
+      @Override
+      public int getJMSDeliveryMode() throws JMSException
+      {
+         return deliveryMode;
+      }
+
+      @Override
+      public void setJMSDeliveryMode(int arg0) throws JMSException
+      {
+         deliveryMode = arg0;
+      }
+
+      @Override
+      public boolean getJMSRedelivered() throws JMSException
+      {
+         return redelivered;
+      }
+
+      @Override
+      public void setJMSRedelivered(boolean arg0) throws JMSException
+      {
+         redelivered = arg0;
+      }
+
+      @Override
+      public String getJMSType() throws JMSException
+      {
+         return type;
+      }
+
+      @Override
+      public void setJMSType(String arg0) throws JMSException
+      {
+         type = arg0;
+      }
+
+      @Override
+      public long getJMSExpiration() throws JMSException
+      {
+         return expiration;
+      }
+
+      @Override
+      public void setJMSExpiration(long arg0) throws JMSException
+      {
+         expiration = arg0;
+      }
+
+      @Override
+      public int getJMSPriority() throws JMSException
+      {
+         return priority;
+      }
+
+      @Override
+      public void setJMSPriority(int arg0) throws JMSException
+      {
+         priority = arg0;
+      }
+
+      @Override
+      public void clearProperties() throws JMSException
+      {
+      }
+
+      @Override
+      public boolean propertyExists(String arg0) throws JMSException
+      {
+         return false;
+      }
+
+      @Override
+      public boolean getBooleanProperty(String arg0) throws JMSException
+      {
+         return false;
+      }
+
+      @Override
+      public byte getByteProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public short getShortProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public int getIntProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public long getLongProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public float getFloatProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public double getDoubleProperty(String arg0) throws JMSException
+      {
+         return 0;
+      }
+
+      @Override
+      public String getStringProperty(String arg0) throws JMSException
+      {
+         return (String) props.get(arg0);
+      }
+
+      @Override
+      public Object getObjectProperty(String arg0) throws JMSException
+      {
+         return props.get(arg0);
+      }
+
+      @Override
+      public Enumeration<?> getPropertyNames() throws JMSException
+      {
+         return new Vector<String>(props.keySet()).elements();
+      }
+
+      @Override
+      public void setBooleanProperty(String arg0, boolean arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setByteProperty(String arg0, byte arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setShortProperty(String arg0, short arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setIntProperty(String arg0, int arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setLongProperty(String arg0, long arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setFloatProperty(String arg0, float arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setDoubleProperty(String arg0, double arg1) throws JMSException
+      {
+      }
+
+      @Override
+      public void setStringProperty(String arg0, String arg1) throws JMSException
+      {
+         props.put(arg0, arg1);
+      }
+
+      @Override
+      public void setObjectProperty(String arg0, Object arg1) throws JMSException
+      {
+         props.put(arg0, arg1);
+      }
+
+      @Override
+      public void acknowledge() throws JMSException
+      {
+      }
+
+      @Override
+      public void clearBody() throws JMSException
+      {
+      }
+
+      @Override
+      public void setText(String arg0) throws JMSException
+      {
+         text = arg0;
+      }
+
+      @Override
+      public String getText() throws JMSException
+      {
+         return text;
+      }
+
+      @Override
+      public <T> T getBody(Class<T> arg0) throws JMSException
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      @Override
+      public long getJMSDeliveryTime() throws JMSException
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      @Override
+      public boolean isBodyAssignableTo(Class arg0) throws JMSException
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      @Override
+      public void setJMSDeliveryTime(long arg0) throws JMSException
+      {
+         // TODO Auto-generated method stub
+      }
+   }
+
+   @Test
+   public void testForeignMessage() throws Exception
+   {
+
+      // Receive a message with the JMS API
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      destination = createDestination(session, destinationType);
+      MessageConsumer consumer = session.createConsumer(destination);
+      MessageProducer producer = session.createProducer(destination);
+
+      // Send the message.
+      {
+         ForeignMessage message = new ForeignMessage();
+         message.text = "Hello";
+         message.setStringProperty("test", "value");
+         long timeToLive = 10000L;
+         long start = System.currentTimeMillis();
+         producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+         long end = System.currentTimeMillis();
+
+         // validate jms spec 1.1 section 3.4.11 table 3.1
+         // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority,
+         // JMSMessageID, and JMSTimestamp
+         // must be set by sending a message.
+
+         assertNotNull(message.getJMSDestination());
+         assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+         assertTrue(start + timeToLive <= message.getJMSExpiration());
+         assertTrue(end + timeToLive >= message.getJMSExpiration());
+         assertEquals(7, message.getJMSPriority());
+         assertNotNull(message.getJMSMessageID());
+         assertTrue(start <= message.getJMSTimestamp());
+         assertTrue(end >= message.getJMSTimestamp());
+      }
+
+      // Validate message is OK.
+      {
+         TextMessage message = (TextMessage) consumer.receive(1000);
+         assertNotNull(message);
+         assertEquals("Hello", message.getText());
+         assertEquals("value", message.getStringProperty("test"));
+      }
+
+      assertNull(consumer.receiveNoWait());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java
new file mode 100644
index 0000000..fb975b8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import org.junit.Before;
+
+/**
+ * adapted from: org.apache.activemq.JMSQueueRedeliverTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest
+{
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      topic = false;
+      super.setUp();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java
new file mode 100644
index 0000000..ae28baf
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSUsecaseTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSUsecase1Test extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSUsecase1Test(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testSendReceive() throws Exception
+   {
+      // Send a message to the broker.
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, destinationType);
+      System.out.println("destionation: " + destination);
+      MessageProducer producer = session.createProducer(destination);
+      producer.setDeliveryMode(this.deliveryMode);
+      MessageConsumer consumer = session.createConsumer(destination);
+      ActiveMQMessage message = new ActiveMQMessage();
+      producer.send(message);
+
+      // Make sure only 1 message was delivered.
+      assertNotNull(consumer.receive(1000));
+      assertNull(consumer.receiveNoWait());
+   }
+
+   @Test
+   public void testSendReceiveTransacted() throws Exception
+   {
+      // Send a message to the broker.
+      connection.start();
+      Session session = connection.createSession(true,
+            Session.SESSION_TRANSACTED);
+      ActiveMQDestination destination = createDestination(session,
+            destinationType);
+      MessageProducer producer = session.createProducer(destination);
+      producer.setDeliveryMode(this.deliveryMode);
+      MessageConsumer consumer = session.createConsumer(destination);
+      producer.send(session.createTextMessage("test"));
+
+      // Message should not be delivered until commit.
+      assertNull(consumer.receiveNoWait());
+      session.commit();
+
+      // Make sure only 1 message was delivered.
+      Message message = consumer.receive(1000);
+      assertNotNull(message);
+      assertFalse(message.getJMSRedelivered());
+      assertNull(consumer.receiveNoWait());
+
+      // Message should be redelivered is rollback is used.
+      session.rollback();
+
+      // Make sure only 1 message was delivered.
+      message = consumer.receive(2000);
+      assertNotNull(message);
+      assertTrue(message.getJMSRedelivered());
+      assertNull(consumer.receiveNoWait());
+
+      // If we commit now, the message should not be redelivered.
+      session.commit();
+      assertNull(consumer.receiveNoWait());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java
new file mode 100644
index 0000000..9d25feb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * adapted from: org.apache.activemq.JMSUsecaseTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+@RunWith(Parameterized.class)
+public class JMSUsecaseTest extends BasicOpenWireTest
+{
+   @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}")
+   public static Collection<Object[]> getParams()
+   {
+      return Arrays.asList(new Object[][] {
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE},
+         {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}
+      });
+   }
+
+   public int deliveryMode;
+   public byte destinationType;
+
+   public JMSUsecaseTest(int deliveryMode, byte destinationType)
+   {
+      this.deliveryMode = deliveryMode;
+      this.destinationType = destinationType;
+   }
+
+   @Test
+   public void testQueueBrowser() throws Exception
+   {
+      // Send a message to the broker.
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, destinationType);
+      MessageProducer producer = session.createProducer(destination);
+      producer.setDeliveryMode(this.deliveryMode);
+      sendMessages(session, producer, 5);
+      producer.close();
+
+      QueueBrowser browser = session.createBrowser((Queue) destination);
+      Enumeration<?> enumeration = browser.getEnumeration();
+      for (int i = 0; i < 5; i++)
+      {
+         Thread.sleep(100);
+         assertTrue(enumeration.hasMoreElements());
+         Message m = (Message) enumeration.nextElement();
+         assertNotNull(m);
+         assertEquals("" + i, ((TextMessage) m).getText());
+      }
+      assertFalse(enumeration.hasMoreElements());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java
new file mode 100644
index 0000000..8cd09cf
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsAutoAckListenerTest
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsAutoAckListenerTest extends BasicOpenWireTest implements MessageListener
+{
+   private final CountDownLatch latch = new CountDownLatch(1);
+
+   @Test
+   public void testAckedMessageAreConsumed() throws Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      consumer.setMessageListener(this);
+
+      latch.await(10, TimeUnit.SECONDS);
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      // Attempt to Consume the message...check if message was acknowledge
+      consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+   }
+
+   public void onMessage(Message message)
+   {
+      System.out.println("Received message: " + message);
+      assertNotNull(message);
+      latch.countDown();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java
new file mode 100644
index 0000000..0e766e1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsAutoAckTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsAutoAckTest extends BasicOpenWireTest
+{
+   /**
+    * Tests if acknowleged messages are being consumed.
+    *
+    * @throws javax.jms.JMSException
+    */
+   @Test
+   public void testAckedMessageAreConsumed() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java
new file mode 100644
index 0000000..9e7d042
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsClientAckTest
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsClientAckTest extends BasicOpenWireTest
+{
+   /**
+    * Tests if acknowledged messages are being consumed.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testAckedMessageAreConsumed() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+   }
+
+   /**
+    * Tests if acknowledged messages are being consumed.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testLastMessageAcked() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+      producer.send(session.createTextMessage("Hello2"));
+      producer.send(session.createTextMessage("Hello3"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+   }
+
+   /**
+    * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException
+   {
+      connection.start();
+      Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+      // Don't ack the message.
+
+      // Reset the session. This should cause the unacknowledged message to be
+      // re-delivered.
+      session.close();
+      session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(2000);
+      assertNotNull(msg);
+      msg.acknowledge();
+
+      session.close();
+   }
+
+   protected String getQueueName()
+   {
+      return queueName;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java
new file mode 100644
index 0000000..b8d237f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Random;
+import java.util.Vector;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsConnectionStartStopTest
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsConnectionStartStopTest extends BasicOpenWireTest
+{
+   private Connection startedConnection;
+   private Connection stoppedConnection;
+
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      startedConnection = factory.createConnection();
+      startedConnection.start();
+      stoppedConnection = factory.createConnection();
+   }
+
+   /**
+    * @see junit.framework.TestCase#tearDown()
+    */
+   @After
+   public void tearDown() throws Exception
+   {
+      stoppedConnection.close();
+      startedConnection.close();
+      super.tearDown();
+   }
+
+   /**
+    * Tests if the consumer receives the messages that were sent before the
+    * connection was started.
+    *
+    * @throws JMSException
+    */
+   @Test
+   public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException
+   {
+      Session startedSession = startedConnection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      Session stoppedSession = stoppedConnection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+
+      // Setup the consumers.
+      Topic topic = startedSession.createTopic("test");
+      MessageConsumer startedConsumer = startedSession.createConsumer(topic);
+      MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
+
+      // Send the message.
+      MessageProducer producer = startedSession.createProducer(topic);
+      TextMessage message = startedSession.createTextMessage("Hello");
+      producer.send(message);
+
+      // Test the assertions.
+      Message m = startedConsumer.receive(1000);
+      assertNotNull(m);
+
+      m = stoppedConsumer.receive(1000);
+      assertNull(m);
+
+      stoppedConnection.start();
+      m = stoppedConsumer.receive(5000);
+      assertNotNull(m);
+
+      startedSession.close();
+      stoppedSession.close();
+   }
+
+   /**
+    * Tests if the consumer is able to receive messages eveb when the
+    * connecction restarts multiple times.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testMultipleConnectionStops() throws Exception
+   {
+      testStoppedConsumerHoldsMessagesTillStarted();
+      stoppedConnection.stop();
+      testStoppedConsumerHoldsMessagesTillStarted();
+      stoppedConnection.stop();
+      testStoppedConsumerHoldsMessagesTillStarted();
+   }
+
+   @Test
+   public void testConcurrentSessionCreateWithStart() throws Exception
+   {
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(50,
+            Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+            new SynchronousQueue<Runnable>());
+      final Vector<Throwable> exceptions = new Vector<Throwable>();
+      final AtomicInteger counter = new AtomicInteger(0);
+      final Random rand = new Random();
+      Runnable createSessionTask = new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+               stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               counter.incrementAndGet();
+            }
+            catch (Exception e)
+            {
+               exceptions.add(e);
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+      };
+
+      Runnable startStopTask = new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+               stoppedConnection.start();
+               stoppedConnection.stop();
+            }
+            catch (Exception e)
+            {
+               exceptions.add(e);
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+      };
+
+      for (int i = 0; i < 1000; i++)
+      {
+         executor.execute(createSessionTask);
+         executor.execute(startStopTask);
+      }
+
+      executor.shutdown();
+
+      assertTrue("executor terminated",
+            executor.awaitTermination(30, TimeUnit.SECONDS));
+      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java
new file mode 100644
index 0000000..d3bd648
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsConsumerResetActiveListenerTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsConsumerResetActiveListenerTest extends BasicOpenWireTest
+{
+
+   /**
+    * verify the (undefined by spec) behaviour of setting a listener while
+    * receiving a message.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testSetListenerFromListener() throws Exception
+   {
+      Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      Destination dest = session.createQueue(queueName);
+      final MessageConsumer consumer = session.createConsumer(dest);
+
+      final CountDownLatch latch = new CountDownLatch(2);
+      final AtomicBoolean first = new AtomicBoolean(true);
+      final Vector<Object> results = new Vector<Object>();
+      consumer.setMessageListener(new MessageListener()
+      {
+
+         public void onMessage(Message message)
+         {
+            if (first.compareAndSet(true, false))
+            {
+               try
+               {
+                  consumer.setMessageListener(this);
+                  results.add(message);
+               }
+               catch (JMSException e)
+               {
+                  results.add(e);
+               }
+            }
+            else
+            {
+               results.add(message);
+            }
+            latch.countDown();
+         }
+      });
+
+      connection.start();
+
+      MessageProducer producer = session.createProducer(dest);
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      producer.send(session.createTextMessage("First"));
+      producer.send(session.createTextMessage("Second"));
+
+      assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
+
+      assertEquals("we have a result", 2, results.size());
+      Object result = results.get(0);
+      assertTrue(result instanceof TextMessage);
+      assertEquals("result is first", "First", ((TextMessage) result).getText());
+      result = results.get(1);
+      assertTrue(result instanceof TextMessage);
+      assertEquals("result is first", "Second",
+            ((TextMessage) result).getText());
+   }
+
+   /**
+    * and a listener on a new consumer, just in case.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testNewConsumerSetListenerFromListener() throws Exception
+   {
+      final Session session = connection.createSession(false,
+            Session.CLIENT_ACKNOWLEDGE);
+      final Destination dest = session.createQueue(queueName);
+      final MessageConsumer consumer = session.createConsumer(dest);
+
+      final CountDownLatch latch = new CountDownLatch(2);
+      final AtomicBoolean first = new AtomicBoolean(true);
+      final Vector<Object> results = new Vector<Object>();
+      consumer.setMessageListener(new MessageListener()
+      {
+
+         public void onMessage(Message message)
+         {
+            if (first.compareAndSet(true, false))
+            {
+               try
+               {
+                  MessageConsumer anotherConsumer = session
+                        .createConsumer(dest);
+                  anotherConsumer.setMessageListener(this);
+                  results.add(message);
+               }
+               catch (JMSException e)
+               {
+                  results.add(e);
+               }
+            }
+            else
+            {
+               results.add(message);
+            }
+            latch.countDown();
+         }
+      });
+
+      connection.start();
+
+      MessageProducer producer = session.createProducer(dest);
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      producer.send(session.createTextMessage("First"));
+      producer.send(session.createTextMessage("Second"));
+
+      assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
+
+      assertEquals("we have a result", 2, results.size());
+      Object result = results.get(0);
+      assertTrue(result instanceof TextMessage);
+      assertEquals("result is first", "First", ((TextMessage) result).getText());
+      result = results.get(1);
+      assertTrue(result instanceof TextMessage);
+      assertEquals("result is first", "Second",
+            ((TextMessage) result).getText());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java
new file mode 100644
index 0000000..fec6c79
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.hornetq.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+/**
+ * adapted from: org.apache.activemq.JmsCreateConsumerInOnMessageTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsCreateConsumerInOnMessageTest extends BasicOpenWireTest implements MessageListener
+{
+   private Session publisherSession;
+   private Session consumerSession;
+   private MessageConsumer consumer;
+   private MessageConsumer testConsumer;
+   private MessageProducer producer;
+   private Topic topic;
+   private Object lock = new Object();
+
+   /**
+    * Tests if a consumer can be created asynchronusly
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testCreateConsumer() throws Exception
+   {
+      connection.setClientID("connection:" + "JmsCreateConsumerInOnMessageTest");
+      publisherSession = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      consumerSession = connection.createSession(false,
+            Session.AUTO_ACKNOWLEDGE);
+      topic = (Topic) super.createDestination(consumerSession,
+            ActiveMQDestination.TOPIC_TYPE);
+      consumer = consumerSession.createConsumer(topic);
+      consumer.setMessageListener(this);
+      producer = publisherSession.createProducer(topic);
+      connection.start();
+      Message msg = publisherSession.createMessage();
+      producer.send(msg);
+
+      System.out.println("message sent: " + msg);
+      if (testConsumer == null)
+      {
+         synchronized (lock)
+         {
+            lock.wait(3000);
+         }
+      }
+      assertTrue(testConsumer != null);
+   }
+
+   /**
+    * Use the asynchronous subscription mechanism
+    *
+    * @param message
+    */
+   public void onMessage(Message message)
+   {
+      System.out.println("____________onmessage " + message);
+      try
+      {
+         testConsumer = consumerSession.createConsumer(topic);
+         consumerSession.createProducer(topic);
+         synchronized (lock)
+         {
+            lock.notify();
+         }
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+         assertTrue(false);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java
new file mode 100644
index 0000000..823e6e6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import javax.jms.DeliveryMode;
+
+/**
+ * adapted from: org.apache.activemq.JmsDurableQueueWildcardSendReceiveTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest
+{
+   public void setUp() throws Exception
+   {
+      topic = false;
+      deliveryMode = DeliveryMode.PERSISTENT;
+      super.setUp();
+   }
+
+   @Override
+   protected String getConsumerSubject()
+   {
+      return "FOO.>";
+   }
+
+   @Override
+   protected String getProducerSubject()
+   {
+      return "FOO.BAR.HUMBUG";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java
new file mode 100644
index 0000000..71dc288
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.openwire.amq;
+
+import org.junit.Before;
+
+/**
+ * adapted from: org.apache.activemq.JmsDurableTopicSelectorTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest
+{
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      durable = true;
+      super.setUp();
+   }
+
+}


Mime
View raw message