activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie McCrindle <jamiemccrin...@gmail.com>
Subject Re: Network of Brokers
Date Mon, 19 Apr 2010 08:56:33 GMT
Awesome. Thanks for looking into that... you're right. I changed the
transacted to false in the original test and it worked... (I also
spotted a bug in the original test in that it was creating both
sessions from the same connection). In case anyone is interested, the
updated test is attached...

*sigh* it's pretty obvious in retrospect that a consumer isn't going
to see a message where the transaction hasn't been committed...

Thanks again Mike,

cheers,
j.

-------

package org.example.activemq;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;

public class NetworkTest extends TestCase {

    public void testNetworkOfBrokers() throws Exception {
        BrokerService brokerService1 = null;
        BrokerService brokerService2 = null;

        try {

        {
            brokerService1 = new BrokerService();
            brokerService1.setBrokerName("one");
            brokerService1.setUseJmx(false);
            brokerService1.setPersistenceAdapter(new
MemoryPersistenceAdapter());
            brokerService1.addConnector("tcp://0.0.0.0:61616");
            NetworkConnector network1 =
brokerService1.addNetworkConnector("static:(tcp://localhost:51515)");
//            NetworkConnector network1 =
brokerService1.addNetworkConnector("multicast://default");
            network1.setName("network1");
            network1.setDynamicOnly(true);
            network1.setNetworkTTL(3);
            network1.setPrefetchSize(1);
            brokerService1.start();
        }

        {
            brokerService2 = new BrokerService();
            brokerService2.setBrokerName("two");
            brokerService2.setUseJmx(false);
            brokerService2.setPersistenceAdapter(new
MemoryPersistenceAdapter());
            brokerService2.addConnector("tcp://0.0.0.0:51515");
            NetworkConnector network2 =
brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
//            NetworkConnector network2 =
brokerService2.addNetworkConnector("multicast://default");
            network2.setName("network2");
            network2.setDynamicOnly(true);
            network2.setNetworkTTL(3);
            network2.setPrefetchSize(1);
            brokerService2.start();
        }

        ActiveMQConnectionFactory connectionFactory1 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false");
        ActiveMQConnectionFactory connectionFactory2 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false");

        Connection connection1 = connectionFactory1.createConnection();
        connection1.start();

        Connection connection2 = connectionFactory2.createConnection();
        connection2.start();

        try {
            Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
            Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);

            MessageConsumer consumer1 = session1.createConsumer(new
ActiveMQQueue("testingqueue"));
            MessageProducer producer2 = session2.createProducer(new
ActiveMQQueue("testingqueue"));

            TextMessage message2 = session2.createTextMessage();
            message2.setText("Hello World!");

            producer2.send(message2);
            Message message1 = consumer1.receive(1000);
            assertNotNull(message1);
            System.out.println(message1);

            QueueBrowser browser = session2.createBrowser(new
ActiveMQQueue("testingqueue"));
            Enumeration<?> enumeration = browser.getEnumeration();

            assertFalse(enumeration.hasMoreElements());
        } finally {
            connection1.stop();
            connection2.stop();
        }

        } finally {
            try { if(brokerService1 != null) { brokerService1.stop();
}} catch(Throwable t) { t.printStackTrace(); }
            try { if(brokerService2 != null) { brokerService2.stop();
}} catch(Throwable t) { t.printStackTrace(); }
        }

    }

}




On Sun, Apr 18, 2010 at 3:10 PM, patzerbud <patzerbud@hotmail.com> wrote:
>
>
>
> dkfn wrote:
>>
>> :) It's the mailing list software conspiring, I tell you... adding it
>> directly into the mail instead:
>>
>
> OK, my first reply runs fine (i.e. without error) but didn't actually work.
> I noodled around with it a little more and offer the following:
>
> package org.apache.activemq.example;
>
> import java.util.Enumeration;
>
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.QueueBrowser;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.network.DiscoveryNetworkConnector;
> import org.apache.activemq.network.NetworkConnector;
> import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
>
>
> public class QueueTest extends TestCase {
>
>        private static final String TEST_QUEUE = "testQueue";
>        private static final String LOCAL_MQ1 = "tcp://localhost:61616";
>        private static final String LOCAL_MQ2 = "tcp://localhost:51515";
>
>
>    public void testNetworkOfBrokers() throws Exception {
>
>                Broker broker1 = createBroker("one", 61616, 51515);
>                Broker broker2 = createBroker("two", 51515, 61616);
>                pause(10, "sleeping to allow brokers to startup & connect
to each
> other...");
>
>                System.out.println("creating consumer");
>                Consumer consumer = createConsumer(LOCAL_MQ2);
>                pause(5, "sleeping to allow consumer to startup & connect
to MQ...");
>
>
>                System.out.println("producing messages");
>                Connection connection = null;
>
>                try {
>                        ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(LOCAL_MQ1);
>                        connection = connectionFactory.createConnection();
>
>                        Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                        MessageProducer producer = session.createProducer(new
> ActiveMQQueue(TEST_QUEUE));
>                        connection.start();
>
>                        for (int i=0; i<10; i++) {
>                                TextMessage message = session.createTextMessage();
>                                message.setText("Hello World!");
>                                producer.send(message);
>                        }
>
>                        pause(5, "sleeping to allow consumer to consume all
messages...");
>
>                        QueueBrowser browser = session.createBrowser(new
> ActiveMQQueue(TEST_QUEUE));
>                        Enumeration<?> enumeration = browser.getEnumeration();
>                        assertFalse(enumeration.hasMoreElements());
>                }
>                catch (Exception e) {
>                        e.printStackTrace();
>                }
>                finally {
>                        try {
>                                if (connection != null) {
>                                        connection.stop();
>                                }
>                        } catch (Throwable t) {
>                                //t.printStackTrace();
>                        }
>
>                        try {
>                                if (broker2 != null) {
>                                        broker2.stop();
>                                }
>                        } catch (Throwable t) {
>                                //t.printStackTrace();
>                        }
>
>                        try {
>                                if (broker1 != null) {
>                                        broker1.stop();
>                                }
>                        } catch (Throwable t) {
>                                //t.printStackTrace();
>                        }
>
>                }
>
>                pause(2);
>                System.out.println("All done!");
>    }
>
>        private void pause(int seconds) {
>                pause(seconds, null);
>        }
>
>        private void pause(int seconds, String msg) {
>                if (msg != null) System.out.println(msg);
>                try {
>                        Thread.currentThread().sleep(seconds * 1000);
>                } catch (InterruptedException e) {
>                        ; // ignore
>                }
>        }
>
>        private Broker createBroker(String name, int listenerPort, int
> networkConnectorPort) {
>                System.out.println("creating broker "+name);
>                Thread brokerThread = null;
>                try {
>                        Broker broker = new Broker(name, listenerPort, networkConnectorPort);
>                        brokerThread = new Thread(broker);
>                        brokerThread.start();
>                        return broker;
>                } catch (Exception ignoreMe) {
>                        ignoreMe.printStackTrace();
>                }
>                return null;
>        }
>
>        private Consumer createConsumer(String url) {
>                Thread thread = null;
>                try {
>                        Consumer consumer = new Consumer(url);
>                        thread = new Thread(consumer);
>                        thread.start();
>                        return consumer;
>                } catch (Exception ignoreMe) {
>                        ignoreMe.printStackTrace();
>                }
>                return null;
>        }
>
>        private class Consumer implements Runnable {
>                private final String url; // "tcp://localhost:51515"
>                Consumer(String url) {
>                        this.url = url;
>                }
>
>                public void run() {
>
>                        Connection connection1 = null;
>
>                        try {
>                                ActiveMQConnectionFactory connectionFactory1
= new
> ActiveMQConnectionFactory(url);
>                                connection1 = connectionFactory1.createConnection();
>                                connection1.start();
>
>                                Session session1 = connection1.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
>                                MessageConsumer consumer1 = session1.createConsumer(new
> ActiveMQQueue(TEST_QUEUE));
>
>                                //for (int i=0; i<1; i++) {
>                                for (;;) {
>                                        Message message1 = consumer1.receive();
>                                        assertNotNull(message1);
>                                        System.out.println(message1);
>                                }
>                        }
>                        catch (Exception e) {
>                        }
>                        finally {
>                                try {
>                                        if (connection1 != null) {
>                                                connection1.stop();
>                                        }
>                                } catch (Throwable t) {
>                                        t.printStackTrace();
>                                }
>                        }
>                }
>        }
>
>        private static class Broker implements Runnable {
>
>                private String name;
>                private int listenPort;
>                private int connectorPort;
>                private BrokerService brokerService = null;
>
>                Broker(String name, int listenerPort, int networkPort) {
>                        this.name = name;
>                        listenPort = listenerPort;
>                        connectorPort = networkPort;
>                }
>
>                public void run() {
>                        try {
>                                brokerService = new BrokerService();
>                                brokerService.setBrokerName(name);
>                                brokerService.setUseJmx(false);
>                                brokerService.setPersistenceAdapter(new
>                                MemoryPersistenceAdapter());
>
>                                NetworkConnector network2 = new DiscoveryNetworkConnector(new
> java.net.URI("static:(tcp://localhost:" + connectorPort + ")"));
>                                network2.setName("network-" + name);
>                                network2.setDynamicOnly(false);
>                                network2.setNetworkTTL(2);
>                                network2.setPrefetchSize(1);
>
>                                brokerService.addNetworkConnector(network2);
>
>                                brokerService.addConnector("tcp://0.0.0.0:"
+ listenPort);
>                                brokerService.start();
>
>                        }
>                        catch (Exception e) {
>                                e.printStackTrace();
>                        }
>                }
>
>                public void stop() {
>                        try {
>                                if (brokerService != null) {
>                                        brokerService.stop();
>                                }
>                        } catch (Throwable t) {
>                                t.printStackTrace();
>                        }
>                }
>        }
>
> }
>
>
> I changed the order around a little bit for the producer. However, I think
> the main difference was this:
>
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>
> This code will not work if you specify true for the first arg. I'm not sure,
> but I think it's because this example is using the in memory persistence
> adapter...
>
> HTH,
>
> Mike L (aka patzerbud)
>
> --
> View this message in context: http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Mime
View raw message