Awesome. Thanks for looking into that... you're right. I changed the
transacted to false in the original test and it worked... (I also
spotted a bug in the original test in that it was creating both
sessions from the same connection). In case anyone is interested, the
updated test is attached...
*sigh* it's pretty obvious in retrospect that a consumer isn't going
to see a message where the transaction hasn't been committed...
Thanks again Mike,
cheers,
j.
-------
package org.example.activemq;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
public class NetworkTest extends TestCase {
public void testNetworkOfBrokers() throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
try {
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("one");
brokerService1.setUseJmx(false);
brokerService1.setPersistenceAdapter(new
MemoryPersistenceAdapter());
brokerService1.addConnector("tcp://0.0.0.0:61616");
NetworkConnector network1 =
brokerService1.addNetworkConnector("static:(tcp://localhost:51515)");
// NetworkConnector network1 =
brokerService1.addNetworkConnector("multicast://default");
network1.setName("network1");
network1.setDynamicOnly(true);
network1.setNetworkTTL(3);
network1.setPrefetchSize(1);
brokerService1.start();
}
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("two");
brokerService2.setUseJmx(false);
brokerService2.setPersistenceAdapter(new
MemoryPersistenceAdapter());
brokerService2.addConnector("tcp://0.0.0.0:51515");
NetworkConnector network2 =
brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
// NetworkConnector network2 =
brokerService2.addNetworkConnector("multicast://default");
network2.setName("network2");
network2.setDynamicOnly(true);
network2.setNetworkTTL(3);
network2.setPrefetchSize(1);
brokerService2.start();
}
ActiveMQConnectionFactory connectionFactory1 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false");
ActiveMQConnectionFactory connectionFactory2 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false");
Connection connection1 = connectionFactory1.createConnection();
connection1.start();
Connection connection2 = connectionFactory2.createConnection();
connection2.start();
try {
Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(new
ActiveMQQueue("testingqueue"));
MessageProducer producer2 = session2.createProducer(new
ActiveMQQueue("testingqueue"));
TextMessage message2 = session2.createTextMessage();
message2.setText("Hello World!");
producer2.send(message2);
Message message1 = consumer1.receive(1000);
assertNotNull(message1);
System.out.println(message1);
QueueBrowser browser = session2.createBrowser(new
ActiveMQQueue("testingqueue"));
Enumeration<?> enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
} finally {
connection1.stop();
connection2.stop();
}
} finally {
try { if(brokerService1 != null) { brokerService1.stop();
}} catch(Throwable t) { t.printStackTrace(); }
try { if(brokerService2 != null) { brokerService2.stop();
}} catch(Throwable t) { t.printStackTrace(); }
}
}
}
On Sun, Apr 18, 2010 at 3:10 PM, patzerbud <patzerbud@hotmail.com> wrote:
>
>
>
> dkfn wrote:
>>
>> :) It's the mailing list software conspiring, I tell you... adding it
>> directly into the mail instead:
>>
>
> OK, my first reply runs fine (i.e. without error) but didn't actually work.
> I noodled around with it a little more and offer the following:
>
> package org.apache.activemq.example;
>
> import java.util.Enumeration;
>
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.QueueBrowser;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.network.DiscoveryNetworkConnector;
> import org.apache.activemq.network.NetworkConnector;
> import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
>
>
> public class QueueTest extends TestCase {
>
> private static final String TEST_QUEUE = "testQueue";
> private static final String LOCAL_MQ1 = "tcp://localhost:61616";
> private static final String LOCAL_MQ2 = "tcp://localhost:51515";
>
>
> public void testNetworkOfBrokers() throws Exception {
>
> Broker broker1 = createBroker("one", 61616, 51515);
> Broker broker2 = createBroker("two", 51515, 61616);
> pause(10, "sleeping to allow brokers to startup & connect
to each
> other...");
>
> System.out.println("creating consumer");
> Consumer consumer = createConsumer(LOCAL_MQ2);
> pause(5, "sleeping to allow consumer to startup & connect
to MQ...");
>
>
> System.out.println("producing messages");
> Connection connection = null;
>
> try {
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(LOCAL_MQ1);
> connection = connectionFactory.createConnection();
>
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> MessageProducer producer = session.createProducer(new
> ActiveMQQueue(TEST_QUEUE));
> connection.start();
>
> for (int i=0; i<10; i++) {
> TextMessage message = session.createTextMessage();
> message.setText("Hello World!");
> producer.send(message);
> }
>
> pause(5, "sleeping to allow consumer to consume all
messages...");
>
> QueueBrowser browser = session.createBrowser(new
> ActiveMQQueue(TEST_QUEUE));
> Enumeration<?> enumeration = browser.getEnumeration();
> assertFalse(enumeration.hasMoreElements());
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> finally {
> try {
> if (connection != null) {
> connection.stop();
> }
> } catch (Throwable t) {
> //t.printStackTrace();
> }
>
> try {
> if (broker2 != null) {
> broker2.stop();
> }
> } catch (Throwable t) {
> //t.printStackTrace();
> }
>
> try {
> if (broker1 != null) {
> broker1.stop();
> }
> } catch (Throwable t) {
> //t.printStackTrace();
> }
>
> }
>
> pause(2);
> System.out.println("All done!");
> }
>
> private void pause(int seconds) {
> pause(seconds, null);
> }
>
> private void pause(int seconds, String msg) {
> if (msg != null) System.out.println(msg);
> try {
> Thread.currentThread().sleep(seconds * 1000);
> } catch (InterruptedException e) {
> ; // ignore
> }
> }
>
> private Broker createBroker(String name, int listenerPort, int
> networkConnectorPort) {
> System.out.println("creating broker "+name);
> Thread brokerThread = null;
> try {
> Broker broker = new Broker(name, listenerPort, networkConnectorPort);
> brokerThread = new Thread(broker);
> brokerThread.start();
> return broker;
> } catch (Exception ignoreMe) {
> ignoreMe.printStackTrace();
> }
> return null;
> }
>
> private Consumer createConsumer(String url) {
> Thread thread = null;
> try {
> Consumer consumer = new Consumer(url);
> thread = new Thread(consumer);
> thread.start();
> return consumer;
> } catch (Exception ignoreMe) {
> ignoreMe.printStackTrace();
> }
> return null;
> }
>
> private class Consumer implements Runnable {
> private final String url; // "tcp://localhost:51515"
> Consumer(String url) {
> this.url = url;
> }
>
> public void run() {
>
> Connection connection1 = null;
>
> try {
> ActiveMQConnectionFactory connectionFactory1
= new
> ActiveMQConnectionFactory(url);
> connection1 = connectionFactory1.createConnection();
> connection1.start();
>
> Session session1 = connection1.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> MessageConsumer consumer1 = session1.createConsumer(new
> ActiveMQQueue(TEST_QUEUE));
>
> //for (int i=0; i<1; i++) {
> for (;;) {
> Message message1 = consumer1.receive();
> assertNotNull(message1);
> System.out.println(message1);
> }
> }
> catch (Exception e) {
> }
> finally {
> try {
> if (connection1 != null) {
> connection1.stop();
> }
> } catch (Throwable t) {
> t.printStackTrace();
> }
> }
> }
> }
>
> private static class Broker implements Runnable {
>
> private String name;
> private int listenPort;
> private int connectorPort;
> private BrokerService brokerService = null;
>
> Broker(String name, int listenerPort, int networkPort) {
> this.name = name;
> listenPort = listenerPort;
> connectorPort = networkPort;
> }
>
> public void run() {
> try {
> brokerService = new BrokerService();
> brokerService.setBrokerName(name);
> brokerService.setUseJmx(false);
> brokerService.setPersistenceAdapter(new
> MemoryPersistenceAdapter());
>
> NetworkConnector network2 = new DiscoveryNetworkConnector(new
> java.net.URI("static:(tcp://localhost:" + connectorPort + ")"));
> network2.setName("network-" + name);
> network2.setDynamicOnly(false);
> network2.setNetworkTTL(2);
> network2.setPrefetchSize(1);
>
> brokerService.addNetworkConnector(network2);
>
> brokerService.addConnector("tcp://0.0.0.0:"
+ listenPort);
> brokerService.start();
>
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> public void stop() {
> try {
> if (brokerService != null) {
> brokerService.stop();
> }
> } catch (Throwable t) {
> t.printStackTrace();
> }
> }
> }
>
> }
>
>
> I changed the order around a little bit for the producer. However, I think
> the main difference was this:
>
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>
> This code will not work if you specify true for the first arg. I'm not sure,
> but I think it's because this example is using the in memory persistence
> adapter...
>
> HTH,
>
> Mike L (aka patzerbud)
>
> --
> View this message in context: http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
|