Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E5BC11752B for ; Tue, 11 Nov 2014 11:00:31 +0000 (UTC) Received: (qmail 63939 invoked by uid 500); 11 Nov 2014 11:00:31 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 63866 invoked by uid 500); 11 Nov 2014 11:00:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63840 invoked by uid 99); 11 Nov 2014 11:00:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 11:00:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 560AB9ABC83; Tue, 11 Nov 2014 11:00:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 11:00:32 -0000 Message-Id: In-Reply-To: <8993358cfa3a48d182aa897a718a8a96@git.apache.org> References: <8993358cfa3a48d182aa897a718a8a96@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java new file mode 100644 index 0000000..a3cb991 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer7Test.java @@ -0,0 +1,162 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer7Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE} + }); + } + + public int deliveryMode; + public int ackMode; + public byte destinationType; + + public JMSConsumer7Test(int deliveryMode, int ackMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.ackMode = ackMode; + this.destinationType = destinationType; + } + + @Test + public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception + { + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + final CountDownLatch got2Done = new CountDownLatch(1); + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + // This test case does not work if optimized message dispatch is used as + // the main thread send block until the consumer receives the + // message. This test depends on thread decoupling so that the main + // thread can stop the consumer thread. + connection.setOptimizedMessageDispatch(false); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + try + { + TextMessage tm = (TextMessage) m; + assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 2) + { + sendDone.await(); + connection.close(); + got2Done.countDown(); + } + System.out.println("acking tm: " + tm.getText()); + tm.acknowledge(); + } + catch (Throwable e) + { + System.out.println("ack failed!!"); + e.printStackTrace(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + sendDone.countDown(); + + // Wait for first 2 messages to arrive. + assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + + // Re-start connection. + connection = (ActiveMQConnection) factory.createConnection(); + + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Pickup the remaining messages. + final CountDownLatch done2 = new CountDownLatch(1); + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + try + { + TextMessage tm = (TextMessage) m; + System.out.println("2nd received: " + tm.getText()); + // order is not guaranteed as the connection is started before + // the listener is set. + // assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 4) + { + done2.countDown(); + } + } + catch (Throwable e) + { + System.err.println("Unexpected exception: " + e); + } + } + }); + + assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // assert msg 2 was redelivered as close() from onMessages() will only ack + // in auto_ack and dups_ok mode + assertEquals(5, counter.get()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java new file mode 100644 index 0000000..cfcfce4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer8Test.java @@ -0,0 +1,159 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer8Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE} + }); + } + + public int deliveryMode; + public int ackMode; + public byte destinationType; + + public JMSConsumer8Test(int deliveryMode, int ackMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.ackMode = ackMode; + this.destinationType = destinationType; + } + + @Test + public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception + { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + final CountDownLatch got2Done = new CountDownLatch(1); + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + // This test case does not work if optimized message dispatch is used as + // the main thread send block until the consumer receives the + // message. This test depends on thread decoupling so that the main + // thread can stop the consumer thread. + connection.setOptimizedMessageDispatch(false); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + try + { + TextMessage tm = (TextMessage) m; + assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + m.acknowledge(); + if (counter.get() == 2) + { + sendDone.await(); + connection.close(); + got2Done.countDown(); + } + } + catch (Throwable e) + { + e.printStackTrace(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + sendDone.countDown(); + + // Wait for first 2 messages to arrive. + assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + + // Re-start connection. + connection = (ActiveMQConnection) factory.createConnection(); + + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Pickup the remaining messages. + final CountDownLatch done2 = new CountDownLatch(1); + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + try + { + TextMessage tm = (TextMessage) m; + counter.incrementAndGet(); + if (counter.get() == 4) + { + done2.countDown(); + } + } + catch (Throwable e) + { + System.err.println("Unexpected exception " + e); + } + } + }); + + assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // close from onMessage with Auto_ack will ack + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java new file mode 100644 index 0000000..a69e0e3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer9Test.java @@ -0,0 +1,141 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer9Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSConsumer9Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testMessageListenerWithConsumerWithPrefetch1() throws Exception + { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + counter.incrementAndGet(); + if (counter.get() == 4) + { + done.countDown(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void testMessageListenerWithConsumer() throws Exception + { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + counter.incrementAndGet(); + if (counter.get() == 4) + { + done.countDown(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java new file mode 100644 index 0000000..dd1e931 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSDurableTopicRedeliverTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.Before; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JMSDurableTopicRedeliverTest + * + * @author Howard Gao + * + */ +public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest +{ + + @Override + @Before + public void setUp() throws Exception + { + durable = true; + super.setUp(); + } + + /** + * Sends and consumes the messages. + * + * @throws Exception + */ + @Test + public void testRedeliverNewSession() throws Exception + { + String text = "TEST: " + System.currentTimeMillis(); + Message sendMessage = session.createTextMessage(text); + + if (verbose) + { + System.out.println("About to send a message: " + sendMessage + + " with text: " + text); + } + producer.send(producerDestination, sendMessage); + + // receive but don't acknowledge + Message unackMessage = consumer.receive(1000); + assertNotNull(unackMessage); + String unackId = unackMessage.getJMSMessageID(); + assertEquals(((TextMessage) unackMessage).getText(), text); + assertFalse(unackMessage.getJMSRedelivered()); + assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1); + consumeSession.close(); + consumer.close(); + + // receive then acknowledge + consumeSession = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + consumer = createConsumer(getName()); + Message ackMessage = consumer.receive(1000); + assertNotNull(ackMessage); + + ackMessage.acknowledge(); + + String ackId = ackMessage.getJMSMessageID(); + assertEquals(((TextMessage) ackMessage).getText(), text); + assertEquals(2, ackMessage.getIntProperty("JMSXDeliveryCount")); + assertEquals(unackId, ackId); + consumeSession.close(); + consumer.close(); + + consumeSession = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + consumer = createConsumer(getName()); + assertNull(consumer.receive(1000)); + } + + protected String getName() + { + return "JMSDurableTopicRedeliverTest"; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java new file mode 100644 index 0000000..203ade9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSIndividualAckTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JMSIndividualAckTest + * + * @author Howard Gao + * + */ +public class JMSIndividualAckTest extends BasicOpenWireTest +{ + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + @Test + public void testAckedMessageAreConsumed() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + @Test + public void testLastMessageAcked() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageProducer producer = session.createProducer(queue); + TextMessage msg1 = session.createTextMessage("msg1"); + TextMessage msg2 = session.createTextMessage("msg2"); + TextMessage msg3 = session.createTextMessage("msg3"); + producer.send(msg1); + producer.send(msg2); + producer.send(msg3); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, + ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNotNull(msg); + assertEquals(msg1, msg); + msg = consumer.receive(1000); + assertNotNull(msg); + assertEquals(msg2, msg); + msg = consumer.receive(1000); + assertNull(msg); + session.close(); + } + + /** + * Tests if unacknowledged messages are being re-delivered when the consumer + * connects again. + * + * @throws JMSException + */ + @Test + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = (Queue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + // Don't ack the message. + + // Reset the session. This should cause the unacknowledged message to be + // re-delivered. + session.close(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + session.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java new file mode 100644 index 0000000..fccd136 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSMessageTest.java @@ -0,0 +1,639 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Vector; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageEOFException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSMessageTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSMessageTest extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + }); + } + + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + public byte destinationType = ActiveMQDestination.QUEUE_TYPE; + public ActiveMQDestination destination; + public boolean durableConsumer; + + public JMSMessageTest(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testTextMessage() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + TextMessage message = session.createTextMessage(); + message.setText("Hi"); + producer.send(message); + } + + // Check the Message + { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("Hi", message.getText()); + } + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testBytesMessageLength() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message + { + BytesMessage message = session.createBytesMessage(); + message.writeInt(1); + message.writeInt(2); + message.writeInt(3); + message.writeInt(4); + producer.send(message); + } + + // Check the message. + { + BytesMessage message = (BytesMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(16, message.getBodyLength()); + } + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testObjectMessage() throws Exception + { + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // send the message. + { + ObjectMessage message = session.createObjectMessage(); + message.setObject("Hi"); + producer.send(message); + } + + // Check the message + { + ObjectMessage message = (ObjectMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("Hi", message.getObject()); + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testBytesMessage() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message + { + BytesMessage message = session.createBytesMessage(); + message.writeBoolean(true); + producer.send(message); + } + + // Check the message + { + BytesMessage message = (BytesMessage) consumer.receive(1000); + assertNotNull(message); + assertTrue(message.readBoolean()); + + try + { + message.readByte(); + fail("Expected exception not thrown."); + } + catch (MessageEOFException e) + { + } + + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testStreamMessage() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + StreamMessage message = session.createStreamMessage(); + message.writeString("This is a test to see how it works."); + producer.send(message); + } + + // Check the message. + { + StreamMessage message = (StreamMessage) consumer.receive(1000); + assertNotNull(message); + + // Invalid conversion should throw exception and not move the stream + // position. + try + { + message.readByte(); + fail("Should have received NumberFormatException"); + } + catch (NumberFormatException e) + { + } + + assertEquals("This is a test to see how it works.", + message.readString()); + + // Invalid conversion should throw exception and not move the stream + // position. + try + { + message.readByte(); + fail("Should have received MessageEOFException"); + } + catch (MessageEOFException e) + { + } + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testMapMessage() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // send the message. + { + MapMessage message = session.createMapMessage(); + message.setBoolean("boolKey", true); + producer.send(message); + } + + // get the message. + { + MapMessage message = (MapMessage) consumer.receive(1000); + assertNotNull(message); + assertTrue(message.getBoolean("boolKey")); + } + assertNull(consumer.receiveNoWait()); + } + + static class ForeignMessage implements TextMessage + { + + public int deliveryMode; + + private String messageId; + private long timestamp; + private String correlationId; + private Destination replyTo; + private Destination destination; + private boolean redelivered; + private String type; + private long expiration; + private int priority; + private String text; + private final HashMap props = new HashMap(); + + @Override + public String getJMSMessageID() throws JMSException + { + return messageId; + } + + @Override + public void setJMSMessageID(String arg0) throws JMSException + { + messageId = arg0; + } + + @Override + public long getJMSTimestamp() throws JMSException + { + return timestamp; + } + + @Override + public void setJMSTimestamp(long arg0) throws JMSException + { + timestamp = arg0; + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException + { + } + + @Override + public void setJMSCorrelationID(String arg0) throws JMSException + { + correlationId = arg0; + } + + @Override + public String getJMSCorrelationID() throws JMSException + { + return correlationId; + } + + @Override + public Destination getJMSReplyTo() throws JMSException + { + return replyTo; + } + + @Override + public void setJMSReplyTo(Destination arg0) throws JMSException + { + replyTo = arg0; + } + + @Override + public Destination getJMSDestination() throws JMSException + { + return destination; + } + + @Override + public void setJMSDestination(Destination arg0) throws JMSException + { + destination = arg0; + } + + @Override + public int getJMSDeliveryMode() throws JMSException + { + return deliveryMode; + } + + @Override + public void setJMSDeliveryMode(int arg0) throws JMSException + { + deliveryMode = arg0; + } + + @Override + public boolean getJMSRedelivered() throws JMSException + { + return redelivered; + } + + @Override + public void setJMSRedelivered(boolean arg0) throws JMSException + { + redelivered = arg0; + } + + @Override + public String getJMSType() throws JMSException + { + return type; + } + + @Override + public void setJMSType(String arg0) throws JMSException + { + type = arg0; + } + + @Override + public long getJMSExpiration() throws JMSException + { + return expiration; + } + + @Override + public void setJMSExpiration(long arg0) throws JMSException + { + expiration = arg0; + } + + @Override + public int getJMSPriority() throws JMSException + { + return priority; + } + + @Override + public void setJMSPriority(int arg0) throws JMSException + { + priority = arg0; + } + + @Override + public void clearProperties() throws JMSException + { + } + + @Override + public boolean propertyExists(String arg0) throws JMSException + { + return false; + } + + @Override + public boolean getBooleanProperty(String arg0) throws JMSException + { + return false; + } + + @Override + public byte getByteProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public short getShortProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public int getIntProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public long getLongProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public float getFloatProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public double getDoubleProperty(String arg0) throws JMSException + { + return 0; + } + + @Override + public String getStringProperty(String arg0) throws JMSException + { + return (String) props.get(arg0); + } + + @Override + public Object getObjectProperty(String arg0) throws JMSException + { + return props.get(arg0); + } + + @Override + public Enumeration getPropertyNames() throws JMSException + { + return new Vector(props.keySet()).elements(); + } + + @Override + public void setBooleanProperty(String arg0, boolean arg1) throws JMSException + { + } + + @Override + public void setByteProperty(String arg0, byte arg1) throws JMSException + { + } + + @Override + public void setShortProperty(String arg0, short arg1) throws JMSException + { + } + + @Override + public void setIntProperty(String arg0, int arg1) throws JMSException + { + } + + @Override + public void setLongProperty(String arg0, long arg1) throws JMSException + { + } + + @Override + public void setFloatProperty(String arg0, float arg1) throws JMSException + { + } + + @Override + public void setDoubleProperty(String arg0, double arg1) throws JMSException + { + } + + @Override + public void setStringProperty(String arg0, String arg1) throws JMSException + { + props.put(arg0, arg1); + } + + @Override + public void setObjectProperty(String arg0, Object arg1) throws JMSException + { + props.put(arg0, arg1); + } + + @Override + public void acknowledge() throws JMSException + { + } + + @Override + public void clearBody() throws JMSException + { + } + + @Override + public void setText(String arg0) throws JMSException + { + text = arg0; + } + + @Override + public String getText() throws JMSException + { + return text; + } + + @Override + public T getBody(Class arg0) throws JMSException + { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getJMSDeliveryTime() throws JMSException + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean isBodyAssignableTo(Class arg0) throws JMSException + { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setJMSDeliveryTime(long arg0) throws JMSException + { + // TODO Auto-generated method stub + } + } + + @Test + public void testForeignMessage() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + ForeignMessage message = new ForeignMessage(); + message.text = "Hello"; + message.setStringProperty("test", "value"); + long timeToLive = 10000L; + long start = System.currentTimeMillis(); + producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive); + long end = System.currentTimeMillis(); + + // validate jms spec 1.1 section 3.4.11 table 3.1 + // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, + // JMSMessageID, and JMSTimestamp + // must be set by sending a message. + + assertNotNull(message.getJMSDestination()); + assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode()); + assertTrue(start + timeToLive <= message.getJMSExpiration()); + assertTrue(end + timeToLive >= message.getJMSExpiration()); + assertEquals(7, message.getJMSPriority()); + assertNotNull(message.getJMSMessageID()); + assertTrue(start <= message.getJMSTimestamp()); + assertTrue(end >= message.getJMSTimestamp()); + } + + // Validate message is OK. + { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("Hello", message.getText()); + assertEquals("value", message.getStringProperty("test")); + } + + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java new file mode 100644 index 0000000..fb975b8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSQueueRedeliverTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import org.junit.Before; + +/** + * adapted from: org.apache.activemq.JMSQueueRedeliverTest + * + * @author Howard Gao + * + */ +public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest +{ + @Override + @Before + public void setUp() throws Exception + { + topic = false; + super.setUp(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java new file mode 100644 index 0000000..ae28baf --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecase1Test.java @@ -0,0 +1,122 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSUsecaseTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSUsecase1Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSUsecase1Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testSendReceive() throws Exception + { + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, destinationType); + System.out.println("destionation: " + destination); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + MessageConsumer consumer = session.createConsumer(destination); + ActiveMQMessage message = new ActiveMQMessage(); + producer.send(message); + + // Make sure only 1 message was delivered. + assertNotNull(consumer.receive(1000)); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testSendReceiveTransacted() throws Exception + { + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(true, + Session.SESSION_TRANSACTED); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + MessageConsumer consumer = session.createConsumer(destination); + producer.send(session.createTextMessage("test")); + + // Message should not be delivered until commit. + assertNull(consumer.receiveNoWait()); + session.commit(); + + // Make sure only 1 message was delivered. + Message message = consumer.receive(1000); + assertNotNull(message); + assertFalse(message.getJMSRedelivered()); + assertNull(consumer.receiveNoWait()); + + // Message should be redelivered is rollback is used. + session.rollback(); + + // Make sure only 1 message was delivered. + message = consumer.receive(2000); + assertNotNull(message); + assertTrue(message.getJMSRedelivered()); + assertNull(consumer.receiveNoWait()); + + // If we commit now, the message should not be redelivered. + session.commit(); + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java new file mode 100644 index 0000000..9d25feb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSUsecaseTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSUsecaseTest + * + * @author Howard Gao + * + */ +@RunWith(Parameterized.class) +public class JMSUsecaseTest extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSUsecaseTest(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testQueueBrowser() throws Exception + { + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + sendMessages(session, producer, 5); + producer.close(); + + QueueBrowser browser = session.createBrowser((Queue) destination); + Enumeration enumeration = browser.getEnumeration(); + for (int i = 0; i < 5; i++) + { + Thread.sleep(100); + assertTrue(enumeration.hasMoreElements()); + Message m = (Message) enumeration.nextElement(); + assertNotNull(m); + assertEquals("" + i, ((TextMessage) m).getText()); + } + assertFalse(enumeration.hasMoreElements()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java new file mode 100644 index 0000000..8cd09cf --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckListenerTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsAutoAckListenerTest + * @author Howard Gao + * + */ +public class JmsAutoAckListenerTest extends BasicOpenWireTest implements MessageListener +{ + private final CountDownLatch latch = new CountDownLatch(1); + + @Test + public void testAckedMessageAreConsumed() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + + latch.await(10, TimeUnit.SECONDS); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Attempt to Consume the message...check if message was acknowledge + consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + public void onMessage(Message message) + { + System.out.println("Received message: " + message); + assertNotNull(message); + latch.countDown(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java new file mode 100644 index 0000000..0e766e1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsAutoAckTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsAutoAckTest + * + * @author Howard Gao + * + */ +public class JmsAutoAckTest extends BasicOpenWireTest +{ + /** + * Tests if acknowleged messages are being consumed. + * + * @throws javax.jms.JMSException + */ + @Test + public void testAckedMessageAreConsumed() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java new file mode 100644 index 0000000..9e7d042 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsClientAckTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsClientAckTest + * @author Howard Gao + * + */ +public class JmsClientAckTest extends BasicOpenWireTest +{ + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + @Test + public void testAckedMessageAreConsumed() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + @Test + public void testLastMessageAcked() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + producer.send(session.createTextMessage("Hello2")); + producer.send(session.createTextMessage("Hello3")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if unacknowledged messages are being re-delivered when the consumer connects again. + * + * @throws JMSException + */ + @Test + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException + { + connection.start(); + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + // Don't ack the message. + + // Reset the session. This should cause the unacknowledged message to be + // re-delivered. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + session.close(); + } + + protected String getQueueName() + { + return queueName; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java new file mode 100644 index 0000000..b8d237f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConnectionStartStopTest.java @@ -0,0 +1,185 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Random; +import java.util.Vector; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsConnectionStartStopTest + * @author Howard Gao + * + */ +public class JmsConnectionStartStopTest extends BasicOpenWireTest +{ + private Connection startedConnection; + private Connection stoppedConnection; + + @Before + public void setUp() throws Exception + { + super.setUp(); + startedConnection = factory.createConnection(); + startedConnection.start(); + stoppedConnection = factory.createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @After + public void tearDown() throws Exception + { + stoppedConnection.close(); + startedConnection.close(); + super.tearDown(); + } + + /** + * Tests if the consumer receives the messages that were sent before the + * connection was started. + * + * @throws JMSException + */ + @Test + public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException + { + Session startedSession = startedConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Session stoppedSession = stoppedConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Setup the consumers. + Topic topic = startedSession.createTopic("test"); + MessageConsumer startedConsumer = startedSession.createConsumer(topic); + MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic); + + // Send the message. + MessageProducer producer = startedSession.createProducer(topic); + TextMessage message = startedSession.createTextMessage("Hello"); + producer.send(message); + + // Test the assertions. + Message m = startedConsumer.receive(1000); + assertNotNull(m); + + m = stoppedConsumer.receive(1000); + assertNull(m); + + stoppedConnection.start(); + m = stoppedConsumer.receive(5000); + assertNotNull(m); + + startedSession.close(); + stoppedSession.close(); + } + + /** + * Tests if the consumer is able to receive messages eveb when the + * connecction restarts multiple times. + * + * @throws Exception + */ + @Test + public void testMultipleConnectionStops() throws Exception + { + testStoppedConsumerHoldsMessagesTillStarted(); + stoppedConnection.stop(); + testStoppedConsumerHoldsMessagesTillStarted(); + stoppedConnection.stop(); + testStoppedConsumerHoldsMessagesTillStarted(); + } + + @Test + public void testConcurrentSessionCreateWithStart() throws Exception + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(50, + Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + final Vector exceptions = new Vector(); + final AtomicInteger counter = new AtomicInteger(0); + final Random rand = new Random(); + Runnable createSessionTask = new Runnable() + { + @Override + public void run() + { + try + { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + counter.incrementAndGet(); + } + catch (Exception e) + { + exceptions.add(e); + } + catch (Throwable t) + { + } + } + }; + + Runnable startStopTask = new Runnable() + { + @Override + public void run() + { + try + { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.start(); + stoppedConnection.stop(); + } + catch (Exception e) + { + exceptions.add(e); + } + catch (Throwable t) + { + } + } + }; + + for (int i = 0; i < 1000; i++) + { + executor.execute(createSessionTask); + executor.execute(startStopTask); + } + + executor.shutdown(); + + assertTrue("executor terminated", + executor.awaitTermination(30, TimeUnit.SECONDS)); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java new file mode 100644 index 0000000..d3bd648 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsConsumerResetActiveListenerTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsConsumerResetActiveListenerTest + * + * @author Howard Gao + * + */ +public class JmsConsumerResetActiveListenerTest extends BasicOpenWireTest +{ + + /** + * verify the (undefined by spec) behaviour of setting a listener while + * receiving a message. + * + * @throws Exception + */ + @Test + public void testSetListenerFromListener() throws Exception + { + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + Destination dest = session.createQueue(queueName); + final MessageConsumer consumer = session.createConsumer(dest); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean first = new AtomicBoolean(true); + final Vector results = new Vector(); + consumer.setMessageListener(new MessageListener() + { + + public void onMessage(Message message) + { + if (first.compareAndSet(true, false)) + { + try + { + consumer.setMessageListener(this); + results.add(message); + } + catch (JMSException e) + { + results.add(e); + } + } + else + { + results.add(message); + } + latch.countDown(); + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("we have a result", 2, results.size()); + Object result = results.get(0); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "First", ((TextMessage) result).getText()); + result = results.get(1); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "Second", + ((TextMessage) result).getText()); + } + + /** + * and a listener on a new consumer, just in case. + * + * @throws Exception + */ + @Test + public void testNewConsumerSetListenerFromListener() throws Exception + { + final Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + final Destination dest = session.createQueue(queueName); + final MessageConsumer consumer = session.createConsumer(dest); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean first = new AtomicBoolean(true); + final Vector results = new Vector(); + consumer.setMessageListener(new MessageListener() + { + + public void onMessage(Message message) + { + if (first.compareAndSet(true, false)) + { + try + { + MessageConsumer anotherConsumer = session + .createConsumer(dest); + anotherConsumer.setMessageListener(this); + results.add(message); + } + catch (JMSException e) + { + results.add(e); + } + } + else + { + results.add(message); + } + latch.countDown(); + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("we have a result", 2, results.size()); + Object result = results.get(0); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "First", ((TextMessage) result).getText()); + result = results.get(1); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "Second", + ((TextMessage) result).getText()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java new file mode 100644 index 0000000..fec6c79 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsCreateConsumerInOnMessageTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JmsCreateConsumerInOnMessageTest + * + * @author Howard Gao + * + */ +public class JmsCreateConsumerInOnMessageTest extends BasicOpenWireTest implements MessageListener +{ + private Session publisherSession; + private Session consumerSession; + private MessageConsumer consumer; + private MessageConsumer testConsumer; + private MessageProducer producer; + private Topic topic; + private Object lock = new Object(); + + /** + * Tests if a consumer can be created asynchronusly + * + * @throws Exception + */ + @Test + public void testCreateConsumer() throws Exception + { + connection.setClientID("connection:" + "JmsCreateConsumerInOnMessageTest"); + publisherSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + consumerSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + topic = (Topic) super.createDestination(consumerSession, + ActiveMQDestination.TOPIC_TYPE); + consumer = consumerSession.createConsumer(topic); + consumer.setMessageListener(this); + producer = publisherSession.createProducer(topic); + connection.start(); + Message msg = publisherSession.createMessage(); + producer.send(msg); + + System.out.println("message sent: " + msg); + if (testConsumer == null) + { + synchronized (lock) + { + lock.wait(3000); + } + } + assertTrue(testConsumer != null); + } + + /** + * Use the asynchronous subscription mechanism + * + * @param message + */ + public void onMessage(Message message) + { + System.out.println("____________onmessage " + message); + try + { + testConsumer = consumerSession.createConsumer(topic); + consumerSession.createProducer(topic); + synchronized (lock) + { + lock.notify(); + } + } + catch (Exception ex) + { + ex.printStackTrace(); + assertTrue(false); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java new file mode 100644 index 0000000..823e6e6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableQueueWildcardSendReceiveTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import javax.jms.DeliveryMode; + +/** + * adapted from: org.apache.activemq.JmsDurableQueueWildcardSendReceiveTest + * + * @author Howard Gao + * + */ +public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest +{ + public void setUp() throws Exception + { + topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + super.setUp(); + } + + @Override + protected String getConsumerSubject() + { + return "FOO.>"; + } + + @Override + protected String getProducerSubject() + { + return "FOO.BAR.HUMBUG"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java new file mode 100644 index 0000000..71dc288 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JmsDurableTopicSelectorTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import org.junit.Before; + +/** + * adapted from: org.apache.activemq.JmsDurableTopicSelectorTest + * + * @author Howard Gao + * + */ +public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest +{ + @Override + @Before + public void setUp() throws Exception + { + durable = true; + super.setUp(); + } + +}