activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "AMARNATH, Balachandar" <BALACHANDAR.AMARN...@airbus.com>
Subject New bie question - Running producer and two instance of consumer
Date Thu, 14 Feb 2013 11:41:49 GMT
Hi,

I am new to activeMQ (for any other messaging system). I want to use this messaging system
for distributed processing of large number of files. I wish to split my application in to
two pieces namely producer and consumer, so that one instance of producer generates messages
that several instance of similar consumers can process it. The consumers are supposed to be
distributed machines. I wrote a simple producer and two consumers (as attached). The producer
generates 100 messages (simple text messages) and consumers take 'different' processing times
to process every message. Hence, I expect, the consumer that take less time to finish processing
would process more messages while the other one process less number of messages. However,
when I ran them, I saw the number of messages is evenly splitted and processed. Can someone
give me a hint here, how this can be achieved? And where I am going wrong?

Here are my classes:-

public class HelloWorldConsumer1 {
             public static void main(String[] args ){
            try {
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
                ArrayList<String> contents = new ArrayList<String>();
                boolean flag = true;
                while (flag){
                        Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received : " + text + " and processing");
                    Thread.sleep(5000);
                    if (text.equalsIgnoreCase("END")){
                        System.out.println("The recvd text is "+text);
                        flag = false;
                    }
                    contents.add(text);
                } else {
                        System.out.println("Received : "+message);
                }
                }
                System.out.println("Exiting..");
                System.out.println("The contents collected in consumer.java is "+contents.size());
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
}



public class HelloWorldProducer {
        public static void main(String[] args) throws Exception {
                    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                int counter = 0;
                boolean flag = true;
                while (counter < 50){
                counter++;
                String text = "Message "+counter;// + Thread.currentThread().getName() + "
: " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // Tell the producer to send the message
                producer.send(message);
                System.out.println("Sent message '" + message.getText() + "'");
                Thread.sleep(1000);
                }
                Thread.sleep(2000);
                producer.send(session.createTextMessage("END"));
                Thread.sleep(4000);
                producer.send(session.createTextMessage("END"));
                // Clean up
                connection.close();
        }
    }





With thanks and regards
Balachandar




The information in this e-mail is confidential. The contents may not be disclosed or used
by anyone other than the addressee. Access to this e-mail by anyone else is unauthorised.
If you are not the intended recipient, please notify Airbus immediately and delete this e-mail.
Airbus cannot accept any responsibility for the accuracy or completeness of this e-mail as
it has been sent over public networks. If you have any concerns over the content of this message
or its Accuracy or Integrity, please contact Airbus immediately.
All outgoing e-mails from Airbus are checked using regularly updated virus scanning software
but you should take whatever measures you deem to be appropriate to ensure that this message
and any attachments are virus free.


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message