activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pubudu gunawardena <pubud...@gmail.com>
Subject Network of Brokers with one standalone and two embedded brokers on producer and consumer
Date Fri, 19 Jun 2015 08:22:39 GMT
Hi All,

I have the following setup. There is a producer which sends messages
to an embedded broker. There is a consumer that consumes messages from
another embedded broker. I have created a network of brokers with the
two embedded brokers connected to the standalone broker. But the
messages don't get passed from the producer to the consumer. I have
created an example which shows this behavior. Can someone point out to
me what I am doing wrong or if this is not possible? Following is the
code to reproduce what I have mentioned.


import java.net.URI;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;

public class Test {

    public static void main(String[] args) {
        try {
            startBroker1();
            startBroker2();
            runProducer();
            runConsumer();

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static void runConsumer() {
        new Thread(new Consumer()).start();

    }

    private static final class Consumer implements Runnable, MessageListener {
        @Override
        public void run() {
            try {
                ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61615");
                Connection connection = factory.createConnection();
                connection.start();
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                Queue destination = session.createQueue("topic");
                MessageConsumer consumer = session.createConsumer(destination);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onMessage(Message message) {
            try {
                TextMessage text = (TextMessage) message;
                System.out.println("Message is : " + text.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    private static void runProducer() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61617");
                    // ActiveMQConnection.DEFAULT_BROKER_URL =
                    // failover://tcp://localhost:61616
                    Connection connection;
                    connection = factory.createConnection();
                    connection.start();
                    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                    Queue destination = session.createQueue("topic");
                    MessageProducer producer =
session.createProducer(destination);
                    TextMessage message = session.createTextMessage();
                    message.setText("This is the message");
                    producer.send(message);
                    System.out.println("Sent: " + message.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }).start();

    }

    private static void startBroker1() throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                startBroker("broker1", "tcp://localhost:61617");
            }
        }).start();

    }

    private static void startBroker2() throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                startBroker("broker2", "tcp://localhost:61615");
            }
        }).start();
    }

    private static void startBroker(String name, String uri) {
        try {
            BrokerService broker = new BrokerService();
            broker.setBrokerName(name);
            broker.addConnector(uri);
            NetworkConnector connector = new
DiscoveryNetworkConnector(new URI("static://" +
"tcp://localhost:61616"));
            connector.setDuplex(true);
            broker.addNetworkConnector(connector);
            broker.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}


-- 
Thanks,
Pubudu

Mime
View raw message