activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1352081 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
Date Wed, 20 Jun 2012 13:08:11 GMT
Author: tabish
Date: Wed Jun 20 13:08:10 2012
New Revision: 1352081

URL: http://svn.apache.org/viewvc?rev=1352081&view=rev
Log:
Don't use fixed ports for broker instances, let the connector find an open port. 

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java?rev=1352081&r1=1352080&r2=1352081&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java Wed
Jun 20 13:08:10 2012
@@ -17,14 +17,14 @@
 
 package org.apache.activemq.bugs;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -34,952 +34,701 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+public class AMQ3274Test {
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class);
 
-/**
- *
- */
-public class AMQ3274Test
-{
-	protected static int		Next_broker_num = 0;
-	protected EmbeddedTcpBroker	broker1;
-	protected EmbeddedTcpBroker	broker2;
-
-	protected int			nextEchoId = 0;
-	protected boolean		testError = false;
-
-	protected int			echoResponseFill = 0;   // Number of "filler" response messages per request
-
-	protected static Log		LOG;
-
-	static
-	{
-		LOG = LogFactory.getLog(AMQ3274Test.class);
-	}
-
-	public AMQ3274Test ()
-	throws Exception
-	{
-		broker1 = new EmbeddedTcpBroker();
-		broker2 = new EmbeddedTcpBroker();
-
-		broker1.coreConnectTo(broker2, true);
-		broker2.coreConnectTo(broker1, true);
-	}
-
-	public void logMessage (String msg)
-	{
-		System.out.println(msg);
-		System.out.flush();
-	}
-
-
-	/**
-	 *
-	 */
-
-	public void testMessages (Session sess, MessageProducer req_prod, Destination resp_dest,
int num_msg)
-	throws Exception
-	{
-		MessageConsumer resp_cons;
-		TextMessage		msg;
-		MessageClient	cons_client;
-		int				cur;
-		int				tot_expected;
-
-		resp_cons = sess.createConsumer(resp_dest);
-
-		cons_client = new MessageClient(resp_cons, num_msg);
-		cons_client.start();
-
-		cur = 0;
-		while ( ( cur < num_msg ) && ( ! testError ) )
-		{
-			msg = sess.createTextMessage("MSG AAAA " + cur);
-			msg.setIntProperty("SEQ", 100 + cur);
-			msg.setStringProperty("TEST", "TOPO");
-			msg.setJMSReplyTo(resp_dest);
-
-			if ( cur == ( num_msg - 1 ) )
-				msg.setBooleanProperty("end-of-response", true);
-
-			req_prod.send(msg);
-
-			cur++;
-		}
-
-			//
-			// Give the consumer some time to receive the response.
-			//
-		cons_client.waitShutdown(5000);
-
-			//
-			// Now shutdown the consumer if it's still running.
-			//
-		if ( cons_client.shutdown() )
-			LOG.debug("Consumer client shutdown complete");
-		else
-			LOG.debug("Consumer client shutdown incomplete!!!");
-
-
-			//
-			// Check that the correct number of messages was received.
-			//
-		tot_expected = num_msg * ( echoResponseFill + 1 );
-
-		if ( cons_client.getNumMsgReceived() == tot_expected )
-		{
-			LOG.info("Have " + tot_expected + " messages, as-expected");
-		}
-		else
-		{
-			testError = true;
-			LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
-		}
-
-		resp_cons.close();
-	}
-
-
-	/**
-	 * Test one destination between the given "producer broker" and "consumer broker" specified.
-	 */
-	public void testOneDest (Connection conn, Session sess, Destination cons_dest, String prod_broker_url,
-	                         String cons_broker_url, int num_msg)
-	throws Exception
-	{
-		int			echo_id;
-
-		EchoService		echo_svc;
-		String			echo_queue_name;
-		Destination		prod_dest;
-		MessageProducer		msg_prod;
-
-		synchronized ( this )
-		{
-			echo_id = this.nextEchoId;
-			this.nextEchoId++;
-		}
-
-		echo_queue_name = "echo.queue." + echo_id;
-
-			//
-			// Remove any previously-created echo queue with the same name.
-			//
-		LOG.trace("destroying the echo queue in case an old one exists");
-		removeQueue(conn, echo_queue_name);
-
-
-			//
-			// Now start the echo service with that queue.
-			//
-		echo_svc = new EchoService(echo_queue_name, prod_broker_url);
-		echo_svc.start();
-
-
-			//
-			// Create the Producer to the echo request Queue
-			//
-		LOG.trace("Creating echo queue and producer");
-		prod_dest = sess.createQueue(echo_queue_name);
-		msg_prod = sess.createProducer(prod_dest);
-
-
-			//
-			// Pass messages around.
-			//
-		testMessages(sess, msg_prod, cons_dest, num_msg);
-
-
-		//
-		//
-		//
-
-		echo_svc.shutdown();
-		msg_prod.close();
-	}
-
-
-	/**
-	 * TEST TEMPORARY TOPICS
-	 */
-	public void testTempTopic (String prod_broker_url, String cons_broker_url)
-	throws Exception
-	{
-		Connection		conn;
-		Session			sess;
-		Destination		cons_dest;
-		int			echo_id;
-		int			num_msg;
-
-		num_msg = 5;
-
-		LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " ("
+ num_msg +
-		         " messages)");
-
-
-			//
-			// Connect to the bus.
-			//
-
-		conn = createConnection(cons_broker_url);
-		conn.start();
-		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-			//
-			// Create the destination on which messages are being tested.
-			//
-
-		LOG.trace("Creating destination");
-		cons_dest = sess.createTemporaryTopic();
-
-		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
-			//
-			// Cleanup
-			//
-
-		sess.close();
-		conn.close();
-	}
-
-
-	/**
-	 * TEST TOPICS
-	 */
-	public void testTopic (String prod_broker_url, String cons_broker_url)
-	throws Exception
-	{
-		int				num_msg;
-
-		Connection		conn;
-		Session			sess;
-		String			topic_name;
-
-		Destination		cons_dest;
-
-		num_msg = 5;
-
-		LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg
+
-		         " messages)");
-
-
-			//
-			// Connect to the bus.
-			//
-
-		conn = createConnection(cons_broker_url);
-		conn.start();
-		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-			//
-			// Create the destination on which messages are being tested.
-			//
-
-		topic_name = "topotest2.perm.topic";
-		LOG.trace("Removing existing Topic");
-		removeTopic(conn, topic_name);
-		LOG.trace("Creating Topic, " + topic_name);
-		cons_dest = sess.createTopic(topic_name);
-
-		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
-			//
-			// Cleanup
-			//
-
-		removeTopic(conn, topic_name);
-		sess.close();
-		conn.close();
-	}
-
-
-	/**
-	 * TEST TEMPORARY QUEUES
-	 */
-	public void testTempQueue (String prod_broker_url, String cons_broker_url)
-	throws Exception
-	{
-		int		echo_id;
-		int		num_msg;
-
-		Connection	conn;
-		Session		sess;
-
-		Destination	cons_dest;
-
-		num_msg = 5;
-
-		LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " ("
+ num_msg +
-		         " messages)");
-
-
-			//
-			// Connect to the bus.
-			//
-
-		conn = createConnection(cons_broker_url);
-		conn.start();
-		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-			//
-			// Create the destination on which messages are being tested.
-			//
-
-		LOG.trace("Creating destination");
-		cons_dest = sess.createTemporaryQueue();
-
-		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
-			//
-			// Cleanup
-			//
-
-		sess.close();
-		conn.close();
-	}
-
-
-	/**
-	 * TEST QUEUES
-	 */
-	public void testQueue (String prod_broker_url, String cons_broker_url)
-	throws Exception
-	{
-		int				num_msg;
-
-		Connection		conn;
-		Session			sess;
-		String			queue_name;
-
-		Destination		cons_dest;
-
-		num_msg = 5;
-
-		LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg
+
-		         " messages)");
-
-
-			//
-			// Connect to the bus.
-			//
-
-		conn = createConnection(cons_broker_url);
-		conn.start();
-		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-			//
-			// Create the destination on which messages are being tested.
-			//
-
-		queue_name = "topotest2.perm.queue";
-		LOG.trace("Removing existing Queue");
-		removeQueue(conn, queue_name);
-		LOG.trace("Creating Queue, " + queue_name);
-		cons_dest = sess.createQueue(queue_name);
-
-		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
-			//
-			// Cleanup
-			//
-
-		removeQueue(conn, queue_name);
-		sess.close();
-		conn.close();
-	}
-
-	@Test
-	public void run ()
-	throws Exception
-	{
-		Thread	start1;
-		Thread	start2;
-
-		testError = false;
-
-			// Use threads to avoid startup deadlock since the first broker started waits until
-			//	it knows the name of the remote broker before finishing its startup, which means
-			//	the remote must already be running.
-
-		start1 = new Thread() {
-			public void run()
-			{
-				try {
-					broker1.start();
-				} catch (Exception ex) {
-					LOG.error(null, ex);
-				}
-			}
-		};
-
-		start2 = new Thread() {
-			public void run()
-			{
-				try {
-					broker2.start();
-				} catch (Exception ex) {
-					LOG.error(null, ex);
-				}
-			}
-		};
-
-		start1.start();
-		start2.start();
-
-		start1.join();
-		start2.join();
-
-		if ( ! testError )
-			this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
-		if ( ! testError )
-			this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
-		if ( ! testError )
-			this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
-		if ( ! testError )
-			this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
-		Thread.sleep(100);
-
-		shutdown();
-
-		assertTrue(! testError);
-	}
-
-	public void shutdown ()
-	throws Exception
-	{
-		broker1.stop();
-		broker2.stop();
-	}
-
-	/**
-	 * @param args the command line arguments
-	 */
-	public static void main(String[] args)
-	{
-		AMQ3274Test    main_obj;
-
-		try
-		{
-			main_obj = new AMQ3274Test();
-			main_obj.run();
-		}
-		catch (Exception ex)
-		{
-			ex.printStackTrace();
-			
-			LOG.error(null, ex);
-
-			System.exit(0);
-		}
-	}
-
-	protected Connection	createConnection (String url)
-	throws Exception
-	{
-		return	org.apache.activemq.ActiveMQConnection.makeConnection(url);
-	}
-
-	protected static void	removeQueue (Connection conn, String dest_name)
-	throws java.lang.Exception
-	{
-		org.apache.activemq.command.ActiveMQDestination		dest;
-
-		if ( conn instanceof org.apache.activemq.ActiveMQConnection )
-		{
-			dest = org.apache.activemq.command.ActiveMQDestination.
-			createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
-			((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
-		}
-	}
-
-	protected static void	removeTopic (Connection conn, String dest_name)
-	throws java.lang.Exception
-	{
-		org.apache.activemq.command.ActiveMQDestination		dest;
-
-		if ( conn instanceof org.apache.activemq.ActiveMQConnection )
-		{
-			dest = org.apache.activemq.command.ActiveMQDestination.
-			createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
-			((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
-		}
-	}
-
-	public static String fmtMsgInfo (Message msg)
-	throws Exception
-	{
-		StringBuilder		msg_desc;
-		String			prop;
-		Enumeration		prop_enum;
-
-		msg_desc = new StringBuilder();
-		msg_desc = new StringBuilder();
-
-		if ( msg instanceof TextMessage )
-		{
-			msg_desc.append(((TextMessage) msg).getText());
-		}
-		else
-		{
-			msg_desc.append("[");
-			msg_desc.append(msg.getClass().getName());
-			msg_desc.append("]");
-		}
-
-		prop_enum = msg.getPropertyNames();
-		while ( prop_enum.hasMoreElements() )
-		{
-			prop = (String) prop_enum.nextElement();
-			msg_desc.append("; ");
-			msg_desc.append(prop);
-			msg_desc.append("=");
-			msg_desc.append(msg.getStringProperty(prop));
-		}
-
-		return	msg_desc.toString();
-	}
-
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-/////////////////////////////////////////////////  INTERNAL CLASSES  /////////////////////////////////////////////////
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-	protected class EmbeddedTcpBroker
-	{
-		protected BrokerService		brokerSvc;
-		protected int			brokerNum;
-		protected String		brokerName;
-		protected String		brokerId;
-		protected int			port;
-		protected String		tcpUrl;
-
-		public EmbeddedTcpBroker ()
-		throws Exception
-		{
-			brokerSvc = new BrokerService();
-
-			synchronized ( this.getClass() )
-			{
-				brokerNum = Next_broker_num;
-				Next_broker_num++;
-			}
-
-			brokerName = "broker" + brokerNum;
-			brokerId = "b" + brokerNum;
-
-			brokerSvc.setBrokerName(brokerName);
-			brokerSvc.setBrokerId(brokerId);
-
-			brokerSvc.setPersistent(false);
-			brokerSvc.setUseJmx(false); // TBD
-
-			port = 60000 + ( brokerNum * 10 );
-
-				// Configure the transport connector (TCP)
-			tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
-			brokerSvc.addConnector(tcpUrl);
-		}
-
-		public Connection	createConnection ()
-		throws URISyntaxException, JMSException
-		{
-			Connection	result;
-
-			result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
-
-			return	result;
-		}
-
-		public String	getConnectionUrl ()
-		{
-			return	this.tcpUrl;
-		}
-
-
-		/**
-		 * Create network connections to the given broker using the network-connector
-		 * configuration of CORE brokers (e.g. core1.bus.dev1.coresys.tmcs)
-		 *
-		 * @param other
-		 * @param duplex_f
-		 */
-		public void coreConnectTo (EmbeddedTcpBroker other, boolean duplex_f)
-		throws Exception
-		{
-			this.makeConnectionTo(other, duplex_f, true);
-			this.makeConnectionTo(other, duplex_f, false);
-		}
-
-		public void start ()
-		throws Exception
-		{
-			brokerSvc.start();
-			//brokerSvc.waitUntilStarted();
-		}
-
-		public void stop ()
-		throws Exception
-		{
-			brokerSvc.stop();
-		}
-
-
-		/**
-		 * Make one connection to the other embedded broker, of the specified type (queue or topic)
-		 * using the standard CORE broker networking.
-		 * 
-		 * @param other
-		 * @param duplex_f
-		 * @param queue_f
-		 * @throws Exception
-		 */
-		protected void	makeConnectionTo (EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
-		throws Exception
-		{
-			NetworkConnector	nw_conn;
-			String				prefix;
-			ActiveMQDestination excl_dest;
-			ArrayList			excludes;
-
-			nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
-			nw_conn.setDuplex(duplex_f);
-
-			if ( queue_f )
-				nw_conn.setConduitSubscriptions(false);
-			else
-				nw_conn.setConduitSubscriptions(true);
-
-			nw_conn.setNetworkTTL(5);
-			nw_conn.setSuppressDuplicateQueueSubscriptions(true);
-			nw_conn.setDecreaseNetworkConsumerPriority(true);
-			nw_conn.setBridgeTempDestinations(true);
-
-			if ( queue_f )
-			{
-				prefix = "queue";
-				excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
-			}
-			else
-			{
-				prefix = "topic";
-				excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
-			}
-
-			excludes = new ArrayList();
-			excludes.add(excl_dest);
-			nw_conn.setExcludedDestinations(excludes);
-
-			if ( duplex_f )
-				nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
-			else
-				nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
-
-			brokerSvc.addNetworkConnector(nw_conn);
-		}
-	}
-
-	protected class MessageClient extends java.lang.Thread
-	{
-		protected MessageConsumer	msgCons;
-		protected boolean		shutdownInd;
-		protected int			expectedCount;
-		protected int			lastSeq = 0;
-		protected int			msgCount = 0;
-		protected boolean		haveFirstSeq;
-		protected CountDownLatch	shutdownLatch;
-
-		public MessageClient (MessageConsumer cons, int num_to_expect)
-		{
-			msgCons = cons;
-			expectedCount = ( num_to_expect * ( echoResponseFill + 1 ) );
-			shutdownLatch = new CountDownLatch(1);
-		}
-
-		public void run ()
-		{
-			CountDownLatch	latch;
-
-			try
-			{
-				synchronized ( this )
-				{
-					latch = shutdownLatch;
-				}
-
-				shutdownInd = false;
-				processMessages();
-
-				latch.countDown();
-			}
-			catch ( Exception exc )
-			{
-				LOG.error("message client error", exc);
-			}
-		}
-
-		public void waitShutdown (long timeout)
-		{
-			CountDownLatch	latch;
-
-			try
-			{
-				synchronized ( this )
-				{
-					latch = shutdownLatch;
-				}
-
-				if ( latch != null )
-					latch.await(timeout, TimeUnit.MILLISECONDS);
-				else
-					LOG.info("echo client shutdown: client does not appear to be active");
-			}
-			catch ( InterruptedException int_exc )
-			{
-				LOG.warn("wait for message client shutdown interrupted", int_exc);
-			}
-		}
-
-		public boolean shutdown ()
-		{
-			boolean down_ind;
-
-			if ( ! shutdownInd )
-			{
-				shutdownInd = true;
-			}
-
-			waitShutdown(200);
-
-			synchronized ( this )
-			{
-				if ( ( shutdownLatch == null ) || ( shutdownLatch.getCount() == 0 ) )
-					down_ind = true;
-				else
-					down_ind = false;
-			}
-
-			return	down_ind;
-		}
-
-		public int	getNumMsgReceived ()
-		{
-			return	msgCount;
-		}
-
-		protected void processMessages ()
-		throws Exception
-		{
-			Message in_msg;
-
-			haveFirstSeq = false;
-
-				//
-				// Stop at shutdown time or after any test error is detected.
-				//
-
-			while ( ( ! shutdownInd ) && ( ! testError ) )
-			{
-				in_msg = msgCons.receive(100);
-
-				if ( in_msg != null )
-				{
-					msgCount++;
-					checkMessage(in_msg);
-				}
-			}
-		}
-
-		protected void	checkMessage (Message in_msg)
-		throws Exception
-		{
-			int				seq;
-
-			LOG.debug("received message " + fmtMsgInfo(in_msg));
-
-				//
-				// Only check messages with a sequence number.
-				//
-
-			if ( in_msg.propertyExists("SEQ") )
-			{
-				seq = in_msg.getIntProperty("SEQ");
-
-				if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 ) ) )
-				{
-					LOG.error("***ERROR*** incorrect sequence number; expected " +
-					          Integer.toString(lastSeq + 1) + " but have " +
-					          Integer.toString(seq));
-
-					testError = true;
-				}
-
-				lastSeq = seq;
-
-				if ( msgCount > expectedCount )
-				{
-					LOG.warn("*** have more messages than expected; have "	+ msgCount +
-					         "; expect " + expectedCount);
-
-					testError = true;
-				}
-			}
-
-			if ( in_msg.propertyExists("end-of-response") )
-			{
-				LOG.trace("received end-of-response message");
-				shutdownInd = true;
-			}
-		}
-	}
-
-	/**
-	 *
-	 */
-	protected class EchoService extends java.lang.Thread
-	{
-		protected String		destName;
-		protected Connection		jmsConn;
-		protected Session		sess;
-		protected MessageConsumer	msg_cons;
-		protected boolean		Shutdown_ind;
-
-		protected Destination		req_dest;
-		protected Destination		resp_dest;
-		protected MessageProducer	msg_prod;
-
-		protected CountDownLatch	waitShutdown;
-
-		public EchoService (String dest, Connection broker_conn)
-		throws Exception
-		{
-			destName = dest;
-			jmsConn = broker_conn;
-
-			Shutdown_ind = false;
-
-			sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			req_dest = sess.createQueue(destName);
-			msg_cons = sess.createConsumer(req_dest);
-
-			jmsConn.start();
-
-			waitShutdown = new CountDownLatch(1);
-		}
-
-		public EchoService (String dest, String broker_url)
-		throws Exception
-		{
-			this(dest, ActiveMQConnection.makeConnection(broker_url));
-		}
-
-		public void run ()
-		{
-			Message	req;
-
-			try
-			{
-				LOG.info("STARTING ECHO SERVICE");
-
-				while ( ! Shutdown_ind )
-				{
-					req = msg_cons.receive(100);
-					if ( req != null )
-					{
-						if ( LOG.isDebugEnabled() )
-							LOG.debug("ECHO request message " + req.toString());
-						
-						resp_dest = req.getJMSReplyTo();
-						if ( resp_dest != null )
-						{
-							msg_prod = sess.createProducer(resp_dest);
-							msg_prod.send(req);
-							msg_prod.close();
-							msg_prod = null;
-						}
-						else
-						{
-							LOG.warn("invalid request: no reply-to destination given");
-						}
-					}
-				}
-			}
-			catch (Exception ex)
-			{
-				LOG.error(null, ex);
-			}
-			finally
-			{
-				LOG.info("shutting down test echo service");
-
-				try
-				{
-					jmsConn.stop();
-				}
-				catch ( javax.jms.JMSException jms_exc )
-				{
-					LOG.warn("error on shutting down JMS connection", jms_exc);
-				}
-
-				synchronized ( this )
-				{
-					waitShutdown.countDown();
-				}
-			}
-		}
-
-
-		/**
-		 * Shut down the service, waiting up to 3 seconds for the service to terminate.
-		 */
-		public void shutdown ()
-		{
-			CountDownLatch	wait_l;
-
-			synchronized ( this )
-			{
-				wait_l = waitShutdown;
-			}
-
-			Shutdown_ind = true;
-			
-			try
-			{
-				if ( wait_l != null )
-				{
-					if ( wait_l.await(3000, TimeUnit.MILLISECONDS) )
-						LOG.info("echo service shutdown complete");
-					else
-						LOG.warn("timeout waiting for echo service shutdown");
-				}
-				else
-				{
-					LOG.info("echo service shutdown: service does not appear to be active");
-				}
-			}
-			catch ( InterruptedException int_exc )
-			{
-				LOG.warn("interrupted while waiting for echo service shutdown");
-			}
-		}
-	}
+    protected static int Next_broker_num = 0;
+    protected EmbeddedTcpBroker broker1;
+    protected EmbeddedTcpBroker broker2;
+
+    protected int nextEchoId = 0;
+    protected boolean testError = false;
+
+    protected int echoResponseFill = 0; // Number of "filler" response messages per request
+
+    public AMQ3274Test() throws Exception {
+        broker1 = new EmbeddedTcpBroker();
+        broker2 = new EmbeddedTcpBroker();
+
+        broker1.coreConnectTo(broker2, true);
+        broker2.coreConnectTo(broker1, true);
+    }
+
+    public void logMessage(String msg) {
+        System.out.println(msg);
+        System.out.flush();
+    }
+
+    public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest,
int num_msg) throws Exception {
+        MessageConsumer resp_cons;
+        TextMessage msg;
+        MessageClient cons_client;
+        int cur;
+        int tot_expected;
+
+        resp_cons = sess.createConsumer(resp_dest);
+
+        cons_client = new MessageClient(resp_cons, num_msg);
+        cons_client.start();
+
+        cur = 0;
+        while ((cur < num_msg) && (!testError)) {
+            msg = sess.createTextMessage("MSG AAAA " + cur);
+            msg.setIntProperty("SEQ", 100 + cur);
+            msg.setStringProperty("TEST", "TOPO");
+            msg.setJMSReplyTo(resp_dest);
+
+            if (cur == (num_msg - 1))
+                msg.setBooleanProperty("end-of-response", true);
+
+            req_prod.send(msg);
+
+            cur++;
+        }
+
+        cons_client.waitShutdown(5000);
+
+        if (cons_client.shutdown()) {
+            LOG.debug("Consumer client shutdown complete");
+        } else {
+            LOG.debug("Consumer client shutdown incomplete!!!");
+        }
+
+        tot_expected = num_msg * (echoResponseFill + 1);
+
+        if (cons_client.getNumMsgReceived() == tot_expected) {
+            LOG.info("Have " + tot_expected + " messages, as-expected");
+        } else {
+            testError = true;
+            LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " +
tot_expected);
+        }
+
+        resp_cons.close();
+    }
+
+    /**
+     * Test one destination between the given "producer broker" and
+     * "consumer broker" specified.
+     */
+    public void testOneDest(Connection conn, Session sess, Destination cons_dest, String
prod_broker_url, String cons_broker_url, int num_msg) throws Exception {
+        int echo_id;
+
+        EchoService echo_svc;
+        String echo_queue_name;
+        Destination prod_dest;
+        MessageProducer msg_prod;
+
+        synchronized (this) {
+            echo_id = this.nextEchoId;
+            this.nextEchoId++;
+        }
+
+        echo_queue_name = "echo.queue." + echo_id;
+
+        LOG.trace("destroying the echo queue in case an old one exists");
+        removeQueue(conn, echo_queue_name);
+
+        echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+        echo_svc.start();
+
+        LOG.trace("Creating echo queue and producer");
+        prod_dest = sess.createQueue(echo_queue_name);
+        msg_prod = sess.createProducer(prod_dest);
+
+        testMessages(sess, msg_prod, cons_dest, num_msg);
+
+        echo_svc.shutdown();
+        msg_prod.close();
+    }
+
+    /**
+     * TEST TEMPORARY TOPICS
+     */
+    public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception
{
+        Connection conn;
+        Session sess;
+        Destination cons_dest;
+        int num_msg;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url +
" (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryTopic();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TOPICS
+     */
+    public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception
{
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String topic_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " ("
+ num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        topic_name = "topotest2.perm.topic";
+        LOG.trace("Removing existing Topic");
+        removeTopic(conn, topic_name);
+        LOG.trace("Creating Topic, " + topic_name);
+        cons_dest = sess.createTopic(topic_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        removeTopic(conn, topic_name);
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST TEMPORARY QUEUES
+     */
+    public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception
{
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url +
" (" + num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryQueue();
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        sess.close();
+        conn.close();
+    }
+
+    /**
+     * TEST QUEUES
+     */
+    public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception
{
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String queue_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " ("
+ num_msg + " messages)");
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        queue_name = "topotest2.perm.queue";
+        LOG.trace("Removing existing Queue");
+        removeQueue(conn, queue_name);
+        LOG.trace("Creating Queue, " + queue_name);
+        cons_dest = sess.createQueue(queue_name);
+
+        testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+        removeQueue(conn, queue_name);
+        sess.close();
+        conn.close();
+    }
+
+    @Test
+    public void run() throws Exception {
+        Thread start1;
+        Thread start2;
+
+        testError = false;
+
+        // Use threads to avoid startup deadlock since the first broker started waits until
+        // it knows the name of the remote broker before finishing its startup, which means
+        // the remote must already be running.
+
+        start1 = new Thread() {
+            public void run() {
+                try {
+                    broker1.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start2 = new Thread() {
+            public void run() {
+                try {
+                    broker2.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start1.start();
+        start2.start();
+
+        start1.join();
+        start2.join();
+
+        if (!testError) {
+            this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        if (!testError) {
+            this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+        }
+        Thread.sleep(100);
+
+        shutdown();
+
+        assertTrue(!testError);
+    }
+
+    public void shutdown() throws Exception {
+        broker1.stop();
+        broker2.stop();
+    }
+
+    /**
+     * @param args
+     *            the command line arguments
+     */
+    public static void main(String[] args) {
+        AMQ3274Test main_obj;
+
+        try {
+            main_obj = new AMQ3274Test();
+            main_obj.run();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            LOG.error(null, ex);
+            System.exit(0);
+        }
+    }
+
+    protected Connection createConnection(String url) throws Exception {
+        return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+    }
+
+    protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception
{
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception
{
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+                    (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static String fmtMsgInfo(Message msg) throws Exception {
+        StringBuilder msg_desc;
+        String prop;
+        Enumeration prop_enum;
+
+        msg_desc = new StringBuilder();
+        msg_desc = new StringBuilder();
+
+        if (msg instanceof TextMessage) {
+            msg_desc.append(((TextMessage) msg).getText());
+        } else {
+            msg_desc.append("[");
+            msg_desc.append(msg.getClass().getName());
+            msg_desc.append("]");
+        }
+
+        prop_enum = msg.getPropertyNames();
+        while (prop_enum.hasMoreElements()) {
+            prop = (String) prop_enum.nextElement();
+            msg_desc.append("; ");
+            msg_desc.append(prop);
+            msg_desc.append("=");
+            msg_desc.append(msg.getStringProperty(prop));
+        }
+
+        return msg_desc.toString();
+    }
+
+    // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////// INTERNAL CLASSES
+    // /////////////////////////////////////////////////
+    // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    protected class EmbeddedTcpBroker {
+        protected BrokerService brokerSvc;
+        protected int brokerNum;
+        protected String brokerName;
+        protected String brokerId;
+        protected int port;
+        protected String tcpUrl;
+
+        public EmbeddedTcpBroker() throws Exception {
+            brokerSvc = new BrokerService();
+
+            synchronized (this.getClass()) {
+                brokerNum = Next_broker_num;
+                Next_broker_num++;
+            }
+
+            brokerName = "broker" + brokerNum;
+            brokerId = "b" + brokerNum;
+
+            brokerSvc.setBrokerName(brokerName);
+            brokerSvc.setBrokerId(brokerId);
+            brokerSvc.setPersistent(false);
+            brokerSvc.setUseJmx(false);
+            tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString();
+        }
+
+        public Connection createConnection() throws URISyntaxException, JMSException {
+            Connection result;
+
+            result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+            return result;
+        }
+
+        public String getConnectionUrl() {
+            return this.tcpUrl;
+        }
+
+        /**
+         * Create network connections to the given broker using the
+         * network-connector configuration of CORE brokers (e.g.
+         * core1.bus.dev1.coresys.tmcs)
+         *
+         * @param other
+         * @param duplex_f
+         */
+        public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception
{
+            this.makeConnectionTo(other, duplex_f, true);
+            this.makeConnectionTo(other, duplex_f, false);
+        }
+
+        public void start() throws Exception {
+            brokerSvc.start();
+        }
+
+        public void stop() throws Exception {
+            brokerSvc.stop();
+        }
+
+        /**
+         * Make one connection to the other embedded broker, of the specified
+         * type (queue or topic) using the standard CORE broker networking.
+         *
+         * @param other
+         * @param duplex_f
+         * @param queue_f
+         * @throws Exception
+         */
+        protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean
queue_f) throws Exception {
+            NetworkConnector nw_conn;
+            String prefix;
+            ActiveMQDestination excl_dest;
+            ArrayList<ActiveMQDestination> excludes;
+
+            nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+            nw_conn.setDuplex(duplex_f);
+
+            if (queue_f)
+                nw_conn.setConduitSubscriptions(false);
+            else
+                nw_conn.setConduitSubscriptions(true);
+
+            nw_conn.setNetworkTTL(5);
+            nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+            nw_conn.setDecreaseNetworkConsumerPriority(true);
+            nw_conn.setBridgeTempDestinations(true);
+
+            if (queue_f) {
+                prefix = "queue";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+            } else {
+                prefix = "topic";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+            }
+
+            excludes = new ArrayList<ActiveMQDestination>();
+            excludes.add(excl_dest);
+            nw_conn.setExcludedDestinations(excludes);
+
+            if (duplex_f)
+                nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+            else
+                nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+            brokerSvc.addNetworkConnector(nw_conn);
+        }
+    }
+
+    protected class MessageClient extends java.lang.Thread {
+        protected MessageConsumer msgCons;
+        protected boolean shutdownInd;
+        protected int expectedCount;
+        protected int lastSeq = 0;
+        protected int msgCount = 0;
+        protected boolean haveFirstSeq;
+        protected CountDownLatch shutdownLatch;
+
+        public MessageClient(MessageConsumer cons, int num_to_expect) {
+            msgCons = cons;
+            expectedCount = (num_to_expect * (echoResponseFill + 1));
+            shutdownLatch = new CountDownLatch(1);
+        }
+
+        public void run() {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                shutdownInd = false;
+                processMessages();
+
+                latch.countDown();
+            } catch (Exception exc) {
+                LOG.error("message client error", exc);
+            }
+        }
+
+        public void waitShutdown(long timeout) {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                if (latch != null)
+                    latch.await(timeout, TimeUnit.MILLISECONDS);
+                else
+                    LOG.info("echo client shutdown: client does not appear to be active");
+            } catch (InterruptedException int_exc) {
+                LOG.warn("wait for message client shutdown interrupted", int_exc);
+            }
+        }
+
+        public boolean shutdown() {
+            boolean down_ind;
+
+            if (!shutdownInd) {
+                shutdownInd = true;
+            }
+
+            waitShutdown(200);
+
+            synchronized (this) {
+                if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
+                    down_ind = true;
+                else
+                    down_ind = false;
+            }
+
+            return down_ind;
+        }
+
+        public int getNumMsgReceived() {
+            return msgCount;
+        }
+
+        protected void processMessages() throws Exception {
+            Message in_msg;
+
+            haveFirstSeq = false;
+            while ((!shutdownInd) && (!testError)) {
+                in_msg = msgCons.receive(100);
+
+                if (in_msg != null) {
+                    msgCount++;
+                    checkMessage(in_msg);
+                }
+            }
+        }
+
+        protected void checkMessage(Message in_msg) throws Exception {
+            int seq;
+
+            LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+            if (in_msg.propertyExists("SEQ")) {
+                seq = in_msg.getIntProperty("SEQ");
+
+                if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
+                    LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq
+ 1) + " but have " + Integer.toString(seq));
+
+                    testError = true;
+                }
+
+                lastSeq = seq;
+
+                if (msgCount > expectedCount) {
+                    LOG.warn("*** have more messages than expected; have " + msgCount + ";
expect " + expectedCount);
+
+                    testError = true;
+                }
+            }
+
+            if (in_msg.propertyExists("end-of-response")) {
+                LOG.trace("received end-of-response message");
+                shutdownInd = true;
+            }
+        }
+    }
+
+    protected class EchoService extends java.lang.Thread {
+        protected String destName;
+        protected Connection jmsConn;
+        protected Session sess;
+        protected MessageConsumer msg_cons;
+        protected boolean Shutdown_ind;
+
+        protected Destination req_dest;
+        protected Destination resp_dest;
+        protected MessageProducer msg_prod;
+
+        protected CountDownLatch waitShutdown;
+
+        public EchoService(String dest, Connection broker_conn) throws Exception {
+            destName = dest;
+            jmsConn = broker_conn;
+
+            Shutdown_ind = false;
+
+            sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            req_dest = sess.createQueue(destName);
+            msg_cons = sess.createConsumer(req_dest);
+
+            jmsConn.start();
+
+            waitShutdown = new CountDownLatch(1);
+        }
+
+        public EchoService(String dest, String broker_url) throws Exception {
+            this(dest, ActiveMQConnection.makeConnection(broker_url));
+        }
+
+        public void run() {
+            Message req;
+
+            try {
+                LOG.info("STARTING ECHO SERVICE");
+
+                while (!Shutdown_ind) {
+                    req = msg_cons.receive(100);
+                    if (req != null) {
+                        if (LOG.isDebugEnabled())
+                            LOG.debug("ECHO request message " + req.toString());
+
+                        resp_dest = req.getJMSReplyTo();
+                        if (resp_dest != null) {
+                            msg_prod = sess.createProducer(resp_dest);
+                            msg_prod.send(req);
+                            msg_prod.close();
+                            msg_prod = null;
+                        } else {
+                            LOG.warn("invalid request: no reply-to destination given");
+                        }
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error(null, ex);
+            } finally {
+                LOG.info("shutting down test echo service");
+
+                try {
+                    jmsConn.stop();
+                } catch (javax.jms.JMSException jms_exc) {
+                    LOG.warn("error on shutting down JMS connection", jms_exc);
+                }
+
+                synchronized (this) {
+                    waitShutdown.countDown();
+                }
+            }
+        }
+
+        /**
+         * Shut down the service, waiting up to 3 seconds for the service to
+         * terminate.
+         */
+        public void shutdown() {
+            CountDownLatch wait_l;
+
+            synchronized (this) {
+                wait_l = waitShutdown;
+            }
+
+            Shutdown_ind = true;
+
+            try {
+                if (wait_l != null) {
+                    if (wait_l.await(3000, TimeUnit.MILLISECONDS)) {
+                        LOG.info("echo service shutdown complete");
+                    } else {
+                        LOG.warn("timeout waiting for echo service shutdown");
+                    }
+                } else {
+                    LOG.info("echo service shutdown: service does not appear to be active");
+                }
+            } catch (InterruptedException int_exc) {
+                LOG.warn("interrupted while waiting for echo service shutdown");
+            }
+        }
+    }
 }



Mime
View raw message