activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r995381 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
Date Thu, 09 Sep 2010 11:22:28 GMT
Author: gtully
Date: Thu Sep  9 11:22:27 2010
New Revision: 995381

URL: http://svn.apache.org/viewvc?rev=995381&view=rev
Log:
commit test case with policy maxPageSize workaround in place that demonstrates it works as
expected - https://issues.apache.org/activemq/browse/AMQ-2217

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
  (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java?rev=995381&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
Thu Sep  9 11:22:27 2010
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsConnectionStartStopTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+/**
+ * Test case intended to demonstrate delivery interruption to queue consumers when
+ * a JMS selector leaves some messages on the queue (due to use of a JMS Selector)
+ * 
+ * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't
use
+ * a selector to qualify their input.
+ * 
+ * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the
consumer
+ * eventually halts.
+ * 
+ * The expected behavior is for the delivery to the client to be maintained regardless of
the depth
+ * of the queue, particularly when the messages in the queue do not meet the selector criteria
of the
+ * client.
+ *
+ * https://issues.apache.org/activemq/browse/AMQ-2217
+ * 
+ */
+public class DiscriminatingConsumerLoadTest extends TestSupport {
+
+	private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+	.getLog(DiscriminatingConsumerLoadTest.class);
+
+	private Connection producerConnection;
+	private Connection consumerConnection;
+	private int counterSent = 0;
+	private int counterReceived = 0;
+	
+	public static final String JMSTYPE_EATME		= "DiscriminatingLoadClient.EatMe";
+	public static final String JMSTYPE_IGNOREME 	= "DiscriminatingLoadClient.IgnoreMe";
+
+	private int testSize = 5000; // setting this to a small number will pass all tests
+
+    BrokerService broker;
+
+	protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+
+        // workaround is to ensure sufficient dispatch buffer for the destination
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setMaxPageSize(testSize);
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+
+		super.setUp();
+		this.producerConnection = this.createConnection();
+		this.consumerConnection = this.createConnection();
+	}
+
+	/**
+	 * @see junit.framework.TestCase#tearDown()
+	 */
+	protected void tearDown() throws Exception {
+		if (producerConnection != null) {
+			producerConnection.close();
+			producerConnection = null;
+		}
+		if (consumerConnection != null) {
+			consumerConnection.close();
+			consumerConnection = null;
+		}
+		super.tearDown();
+        broker.stop();
+	}
+
+	/**
+	 * Test to check if a single consumer with no JMS selector will receive all intended messages
+	 * 
+	 * @throws java.lang.Exception
+	 */
+	public void testNonDiscriminatingConsumer() throws Exception {
+		
+		consumerConnection = createConnection();
+		consumerConnection.start();
+		LOG.info("consumerConnection = " +consumerConnection);
+
+		try {Thread.sleep(1000); } catch (Exception e) {}
+
+		// here we pass in null for the JMS selector
+		Consumer consumer = new Consumer(consumerConnection, null);
+		Thread consumerThread = new Thread(consumer);
+
+		consumerThread.start();
+
+		producerConnection = createConnection();
+		producerConnection.start();
+		LOG.info("producerConnection = " +producerConnection);
+
+		try {Thread.sleep(3000); } catch (Exception e) {}
+
+		Producer producer = new Producer(producerConnection);
+		Thread producerThread = new Thread(producer);
+		producerThread.start();
+
+		// now that everything is running, let's wait for the consumer thread to finish ...
+		consumerThread.join();
+		producer.stop = true;
+
+		if (consumer.getCount() == testSize )
+			LOG.info("test complete .... all messsages consumed!!");
+		else
+			LOG.info("test failed .... Sent " + (testSize / 1) + 
+					" messages intended to be consumed ( " + testSize + " total), but only consumed " +
consumer.getCount());
+
+
+		assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed "
+ consumer.getCount(),
+				(consumer.getCount() == testSize ));
+		assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
+
+
+	}
+	
+	/**
+	 * Test to check if a single consumer with a JMS selector will receive all intended messages
+	 * 
+	 * @throws java.lang.Exception
+	 */
+	public void testDiscriminatingConsumer() throws Exception {
+
+		consumerConnection = createConnection();
+		consumerConnection.start();
+		LOG.info("consumerConnection = " +consumerConnection);
+
+		try {Thread.sleep(1000); } catch (Exception e) {}
+
+		// here we pass the JMS selector we intend to consume
+		Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
+		Thread consumerThread = new Thread(consumer);
+
+		consumerThread.start();
+
+		producerConnection = createConnection();
+		producerConnection.start();
+		LOG.info("producerConnection = " +producerConnection);
+
+		try {Thread.sleep(3000); } catch (Exception e) {}
+
+		Producer producer = new Producer(producerConnection);
+		Thread producerThread = new Thread(producer);
+		producerThread.start();
+
+		// now that everything is running, let's wait for the consumer thread to finish ...
+		consumerThread.join();
+		producer.stop = true;
+
+		if (consumer.getCount() == (testSize / 2))
+        {
+			LOG.info("test complete .... all messsages consumed!!");
+        }
+        else
+		{
+			LOG.info("test failed .... Sent " + testSize  + " original messages, only half of which
(" + (testSize / 2) + 
+					") were intended to be consumed: consumer paused at: " + consumer.getCount());
+			//System.out.println("test failed .... Sent " + testSize  + " original messages, only
half of which (" + (testSize / 2) +
+			//		") were intended to be consumed: consumer paused at: " + consumer.getCount());
+			
+		}
+			
+		assertTrue("Sent " + testSize  + " original messages, only half of which (" + (testSize
/ 2) + 
+					") were intended to be consumed: consumer paused at: " + consumer.getCount(),
+				(consumer.getCount() == (testSize / 2)));
+		assertTrue("Delivery of messages to consumer was halted during this test as it only wants
half", consumer.deliveryHalted());
+	}
+	
+	/**
+	 * Helper class that will publish 2 * testSize messages.  The messages will be distributed
evenly
+	 * between the following two JMS types:
+	 * 
+	 * @see JMSTYPE_INTENDED_FOR_CONSUMPTION
+	 * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
+	 * 
+	 * @author jlyons
+	 *
+	 */
+	private class Producer extends Thread 
+	{
+		private int counterSent = 0;
+		private Connection connection = null;
+		public boolean stop = false;
+
+		public Producer(Connection connection)
+		{
+			this.connection = connection;
+		}
+
+		public void run() {
+			try {
+				final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+				final Queue queue = session.createQueue("test");
+
+				// wait for 10 seconds to allow consumer.receive to be run
+				// first
+				Thread.sleep(10000);
+				MessageProducer producer = session.createProducer(queue);
+
+				while (!stop && (counterSent < testSize))
+				{
+					// first send a message intended to be consumed ....
+					TextMessage message = session.createTextMessage("*** Ill ....... Ini ***");  // alma
mater ...
+					message.setJMSType(JMSTYPE_EATME);
+					//LOG.info("sending .... JMSType = " + message.getJMSType());
+					producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
+					
+					counterSent++;
+
+					// now send a message intended to be consumed by some other consumer in the the future
+					// ... we expect these messages to accrue in the queue 
+					message = session.createTextMessage("*** Ill ....... Ini ***");  // alma mater ...
+					message.setJMSType(JMSTYPE_IGNOREME);
+					//LOG.info("sending .... JMSType = " + message.getJMSType());
+					producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
+
+					counterSent++;
+				}
+
+				session.close();
+
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+			LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue");
+		}
+		
+		public int getCount()
+		{
+			return this.counterSent;
+		}
+		
+	}
+	
+	/**
+	 * Helper class that will consume messages from the queue based on the supplied JMS selector.
+	 * Thread will stop after the first receive(..) timeout, or once all expected messages have
+	 * been received (see testSize).  If the thread stops due to a timeout, it is experiencing
the
+	 * delivery pause that is symptomatic of a bug in the broker.
+	 * 
+	 * @author jlyons
+	 *
+	 */
+	private class Consumer extends Thread 
+	{
+		protected int counterReceived = 0;
+		private Connection connection = null;
+		private String jmsSelector = null;
+		private boolean deliveryHalted = false;
+		
+		public Consumer(Connection connection, String jmsSelector)
+		{
+			this.connection = connection;
+			this.jmsSelector = jmsSelector;
+		}
+
+		public void run() {
+			boolean testComplete = false;
+			try {
+				Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+				final Queue queue = session.createQueue("test");
+				MessageConsumer consumer = null;
+				if (null != this.jmsSelector)
+				{
+					consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
+				}
+				else
+				{
+					consumer = session.createConsumer(queue);					
+				}
+
+				while (!deliveryHalted && (counterReceived < testSize))
+				{
+					TextMessage result = (TextMessage) consumer.receive(30000);
+					if (result != null) {
+						counterReceived++;
+						//System.out.println("consuming .... JMSType = " + result.getJMSType() + " received
= " + counterReceived);
+						LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
+					} else
+					{
+						LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped
delivery ...  received = " + counterReceived);
+						deliveryHalted = true;
+					}
+				}
+				session.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+
+		}
+		
+		public int getCount()
+		{
+			return this.counterReceived;
+		}
+		
+		public boolean deliveryHalted()
+		{
+			return this.deliveryHalted;
+		}
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message