Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 72709 invoked from network); 26 Jan 2009 13:11:36 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Jan 2009 13:11:36 -0000 Received: (qmail 17362 invoked by uid 500); 26 Jan 2009 13:11:36 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 17334 invoked by uid 500); 26 Jan 2009 13:11:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 17325 invoked by uid 99); 26 Jan 2009 13:11:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jan 2009 05:11:35 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jan 2009 13:11:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 78EC923888E6; Mon, 26 Jan 2009 13:11:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r737685 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java Date: Mon, 26 Jan 2009 13:11:08 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090126131108.78EC923888E6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Mon Jan 26 13:11:07 2009 New Revision: 737685 URL: http://svn.apache.org/viewvc?rev=737685&view=rev Log: adding some message group tests Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java?rev=737685&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java Mon Jan 26 13:11:07 2009 @@ -0,0 +1,165 @@ +package org.apache.activemq.broker.region.group; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class MessageGroupTest extends JmsTestSupport { + + private static final Log LOG = LogFactory.getLog(CombinationTestSupport.class); + + public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the messages. + for (int i = 0; i < 4; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + message.setIntProperty("JMSXGroupSeq", i + 1); + LOG.info("sending message: " + message); + producer.send(message); + } + + // All the messages should have been sent down connection 1.. just get + // the first 3 + for (int i = 0; i < 3; i++) { + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1); + } + + // Setup a second connection + Connection connection1 = factory.createConnection(userName, password); + connection1.start(); + Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(destination); + + // Close the first consumer. + consumer1.close(); + + // The last messages should now go the the second consumer. + for (int i = 0; i < 1; i++) { + TextMessage m1 = (TextMessage)consumer2.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i); + } + + //assert that there are no other messages left for the consumer 2 + Message m = consumer2.receive(100); + assertNull("consumer 2 has some messages left", m); + } + + public void testAddingConsumer() throws Exception { + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + //MessageConsumer consumer = session.createConsumer(destination); + + TextMessage message = session.createTextMessage("message"); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + + LOG.info("sending message: " + message); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(destination); + + TextMessage msg = (TextMessage)consumer.receive(); + assertNotNull(msg); + boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); + assertTrue(first); + } + + public void testClosingMessageGroup() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the messages. + for (int i = 0; i < 4; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + LOG.info("sending message: " + message); + producer.send(message); + } + + + + // All the messages should have been sent down consumer1.. just get + // the first 3 + for (int i = 0; i < 3; i++) { + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + } + + // Setup a second consumer + Connection connection1 = factory.createConnection(userName, password); + connection1.start(); + Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(destination); + + //assert that there are no messages for the consumer 2 + Message m = consumer2.receive(100); + assertNull("consumer 2 has some messages", m); + + // Close the group + TextMessage message = session.createTextMessage("message " + 5); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + message.setIntProperty("JMSXGroupSeq", -1); + LOG.info("sending message: " + message); + producer.send(message); + + //Send some more messages + for (int i = 0; i < 4; i++) { + message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + LOG.info("sending message: " + message); + producer.send(message); + } + + // Receive the fourth message + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + 4, m1); + + // Receive the closing message + m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + 5, m1); + + //assert that there are no messages for the consumer 1 + m = consumer1.receive(100); + assertNull("consumer 1 has some messages left", m); + + // The messages should now go to the second consumer. + for (int i = 0; i < 4; i++) { + m1 = (TextMessage)consumer2.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + } + + } + +}