activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From patzerbud <patzer...@hotmail.com>
Subject Re: Network of Brokers
Date Sun, 18 Apr 2010 14:10:42 GMT



dkfn wrote:
> 
> :) It's the mailing list software conspiring, I tell you... adding it
> directly into the mail instead:
> 

OK, my first reply runs fine (i.e. without error) but didn't actually work.
I noodled around with it a little more and offer the following:

package org.apache.activemq.example;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;


public class QueueTest extends TestCase {

	private static final String TEST_QUEUE = "testQueue";
	private static final String LOCAL_MQ1 = "tcp://localhost:61616";
	private static final String LOCAL_MQ2 = "tcp://localhost:51515";


    public void testNetworkOfBrokers() throws Exception {

		Broker broker1 = createBroker("one", 61616, 51515);
		Broker broker2 = createBroker("two", 51515, 61616);
		pause(10, "sleeping to allow brokers to startup & connect to each
other...");

		System.out.println("creating consumer");
		Consumer consumer = createConsumer(LOCAL_MQ2);
		pause(5, "sleeping to allow consumer to startup & connect to MQ...");


		System.out.println("producing messages");
		Connection connection = null;

		try {
			ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(LOCAL_MQ1);
			connection = connectionFactory.createConnection();

			Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
			MessageProducer producer = session.createProducer(new
ActiveMQQueue(TEST_QUEUE));
			connection.start();

			for (int i=0; i<10; i++) {
				TextMessage message = session.createTextMessage();
				message.setText("Hello World!");
				producer.send(message);
			}

			pause(5, "sleeping to allow consumer to consume all messages...");

			QueueBrowser browser = session.createBrowser(new
ActiveMQQueue(TEST_QUEUE));
			Enumeration<?> enumeration = browser.getEnumeration();
			assertFalse(enumeration.hasMoreElements());
		}
		catch (Exception e) {
			e.printStackTrace();
		}
		finally {
			try {
				if (connection != null) {
					connection.stop();
				}
			} catch (Throwable t) {
				//t.printStackTrace();
			}

			try {
				if (broker2 != null) {
					broker2.stop();
				}
			} catch (Throwable t) {
				//t.printStackTrace();
			}

			try {
				if (broker1 != null) {
					broker1.stop();
				}
			} catch (Throwable t) {
				//t.printStackTrace();
			}

		}

		pause(2);
		System.out.println("All done!");
    }

	private void pause(int seconds) {
		pause(seconds, null);
	}

	private void pause(int seconds, String msg) {
		if (msg != null) System.out.println(msg);
		try {
			Thread.currentThread().sleep(seconds * 1000);
		} catch (InterruptedException e) {
			; // ignore
		}
	}

	private Broker createBroker(String name, int listenerPort, int
networkConnectorPort) {
		System.out.println("creating broker "+name);
		Thread brokerThread = null;
		try {
			Broker broker = new Broker(name, listenerPort, networkConnectorPort);
			brokerThread = new Thread(broker);
			brokerThread.start();
			return broker;
		} catch (Exception ignoreMe) {
			ignoreMe.printStackTrace();
		}
		return null;
	}

	private Consumer createConsumer(String url) {
		Thread thread = null;
		try {
			Consumer consumer = new Consumer(url);
			thread = new Thread(consumer);
			thread.start();
			return consumer;
		} catch (Exception ignoreMe) {
			ignoreMe.printStackTrace();
		}
		return null;
	}

	private class Consumer implements Runnable {
		private final String url; // "tcp://localhost:51515"
		Consumer(String url) {
			this.url = url;
		}

		public void run() {

			Connection connection1 = null;

			try {
				ActiveMQConnectionFactory connectionFactory1 = new
ActiveMQConnectionFactory(url);
				connection1 = connectionFactory1.createConnection();
				connection1.start();

				Session session1 = connection1.createSession(true,
Session.AUTO_ACKNOWLEDGE);
				MessageConsumer consumer1 = session1.createConsumer(new
ActiveMQQueue(TEST_QUEUE));

				//for (int i=0; i<1; i++) {
				for (;;) {
					Message message1 = consumer1.receive();
					assertNotNull(message1);
					System.out.println(message1);
				}
			}
			catch (Exception e) {
			}
			finally {
				try {
					if (connection1 != null) {
						connection1.stop();
					}
				} catch (Throwable t) {
					t.printStackTrace();
				}
			}
		}
	}

	private static class Broker implements Runnable {

		private String name;
		private int listenPort;
		private int connectorPort;
		private BrokerService brokerService = null;

		Broker(String name, int listenerPort, int networkPort) {
			this.name = name;
			listenPort = listenerPort;
			connectorPort = networkPort;
		}

		public void run() {
			try {
				brokerService = new BrokerService();
				brokerService.setBrokerName(name);
				brokerService.setUseJmx(false);
				brokerService.setPersistenceAdapter(new
				MemoryPersistenceAdapter());

				NetworkConnector network2 = new DiscoveryNetworkConnector(new
java.net.URI("static:(tcp://localhost:" + connectorPort + ")"));
				network2.setName("network-" + name);
				network2.setDynamicOnly(false);
				network2.setNetworkTTL(2);
				network2.setPrefetchSize(1);

				brokerService.addNetworkConnector(network2);

				brokerService.addConnector("tcp://0.0.0.0:" + listenPort);
				brokerService.start();

			}
			catch (Exception e) {
				e.printStackTrace();
			}
		}

		public void stop() {
			try {
				if (brokerService != null) {
					brokerService.stop();
				}
			} catch (Throwable t) {
				t.printStackTrace();
			}
		}
	}

}


I changed the order around a little bit for the producer. However, I think
the main difference was this:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

This code will not work if you specify true for the first arg. I'm not sure,
but I think it's because this example is using the in memory persistence
adapter...

HTH,

Mike L (aka patzerbud)

-- 
View this message in context: http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message