activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "AMARNATH, Balachandar" <BALACHANDAR.AMARN...@airbus.com>
Subject RE: Re:New bie question - Running producer and two instance of consumer
Date Fri, 15 Feb 2013 04:01:01 GMT
Thanks for the reply,

My intention is not transferring files at the moment. I was just checking how the consumers
pull the messages from the queue. When I ran the code, it looks like, the messages are equally
shared between the consumers that make the application need to wait for the slow consumers
to finish even though fast consumers are free. Looks like the messages are buffered in consumers,
and hence no messages are available for fast consumers when they finished processing. Not
sure though !

-Bala

-----Original Message-----
From: SuoNayi [mailto:suonayi2006@163.com] 
Sent: 14 February 2013 17:30
To: users@activemq.apache.org
Subject: Re:New bie question - Running producer and two instance of consumer

Hi,AMQ is not designed to transfer files, especially large files although it has provided
two approaches(BlobMessage or Jms Stream).
In order to provide better performance, AMQ broker dispatches batches of messages to consumers
instead of pulling messages every time.

So tune prefetch of consumers will help your fast consumers to get more chances to pull messages
from brokers.
Take a look at:
http://activemq.apache.org/what-is-the-prefetch-limit-for.html






At 2013-02-14 19:41:49,"AMARNATH, Balachandar" <BALACHANDAR.AMARNATH@airbus.com> wrote:
>Hi,
>
>I am new to activeMQ (for any other messaging system). I want to use this messaging system
for distributed processing of large number of files. I wish to split my application in to
two pieces namely producer and consumer, so that one instance of producer generates messages
that several instance of similar consumers can process it. The consumers are supposed to be
distributed machines. I wrote a simple producer and two consumers (as attached). The producer
generates 100 messages (simple text messages) and consumers take 'different' processing times
to process every message. Hence, I expect, the consumer that take less time to finish processing
would process more messages while the other one process less number of messages. However,
when I ran them, I saw the number of messages is evenly splitted and processed. Can someone
give me a hint here, how this can be achieved? And where I am going wrong?
>
>Here are my classes:-
>
>public class HelloWorldConsumer1 {
>             public static void main(String[] args ){
>            try {
>            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
>                // Create a Connection
>                Connection connection = connectionFactory.createConnection();
>                connection.start();
>                // Create a Session
>                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>                // Create the destination (Topic or Queue)
>                Destination destination = session.createQueue("TEST.FOO");
>                // Create a MessageConsumer from the Session to the Topic or Queue
>                MessageConsumer consumer = session.createConsumer(destination);
>                ArrayList<String> contents = new ArrayList<String>();
>                boolean flag = true;
>                while (flag){
>                        Message message = consumer.receive(1000);
>                if (message instanceof TextMessage) {
>                    TextMessage textMessage = (TextMessage) message;
>                    String text = textMessage.getText();
>                    System.out.println("Received : " + text + " and processing");
>                    Thread.sleep(5000);
>                    if (text.equalsIgnoreCase("END")){
>                        System.out.println("The recvd text is "+text);
>                        flag = false;
>                    }
>                    contents.add(text);
>                } else {
>                        System.out.println("Received : "+message);
>                }
>                }
>                System.out.println("Exiting..");
>                System.out.println("The contents collected in consumer.java is "+contents.size());
>                connection.close();
>            } catch (Exception e) {
>                System.out.println("Caught: " + e);
>                e.printStackTrace();
>            }
>        }
>}
>
>
>
>public class HelloWorldProducer {
>        public static void main(String[] args) throws Exception {
>                    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
>                // Create a Connection
>                Connection connection = connectionFactory.createConnection();
>                connection.start();
>                // Create a Session
>                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>                // Create the destination (Topic or Queue)
>                Destination destination = session.createQueue("TEST.FOO");
>                // Create a MessageProducer from the Session to the Topic or Queue
>                MessageProducer producer = session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>                int counter = 0;
>                boolean flag = true;
>                while (counter < 50){
>                counter++;
>                String text = "Message "+counter;// + Thread.currentThread().getName()
+ " : " + this.hashCode();
>                TextMessage message = session.createTextMessage(text);
>                // Tell the producer to send the message
>                producer.send(message);
>                System.out.println("Sent message '" + message.getText() + "'");
>                Thread.sleep(1000);
>                }
>                Thread.sleep(2000);
>                producer.send(session.createTextMessage("END"));
>                Thread.sleep(4000);
>                producer.send(session.createTextMessage("END"));
>                // Clean up
>                connection.close();
>        }
>    }
>
>
>
>
>
>With thanks and regards
>Balachandar
>
>
>
>
>The information in this e-mail is confidential. The contents may not be disclosed or used
by anyone other than the addressee. Access to this e-mail by anyone else is unauthorised.
>If you are not the intended recipient, please notify Airbus immediately and delete this
e-mail.
>Airbus cannot accept any responsibility for the accuracy or completeness of this e-mail
as it has been sent over public networks. If you have any concerns over the content of this
message or its Accuracy or Integrity, please contact Airbus immediately.
>All outgoing e-mails from Airbus are checked using regularly updated virus scanning software
but you should take whatever measures you deem to be appropriate to ensure that this message
and any attachments are virus free.
>

This mail has originated outside your organization, either from an external partner or the
Global Internet.
Keep this in mind if you answer this message.



The information in this e-mail is confidential. The contents may not be disclosed or used
by anyone other than the addressee. Access to this e-mail by anyone else is unauthorised.
If you are not the intended recipient, please notify Airbus immediately and delete this e-mail.
Airbus cannot accept any responsibility for the accuracy or completeness of this e-mail as
it has been sent over public networks. If you have any concerns over the content of this message
or its Accuracy or Integrity, please contact Airbus immediately.
All outgoing e-mails from Airbus are checked using regularly updated virus scanning software
but you should take whatever measures you deem to be appropriate to ensure that this message
and any attachments are virus free.


Mime
View raw message