activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From SuoNayi <suonayi2...@163.com>
Subject Re:RE: Re:New bie question - Running producer and two instance of consumer
Date Fri, 15 Feb 2013 06:01:14 GMT
yes, you're right. you can assign larger prefetch for fast consumers so that they can buffer
more messages to process than slow consumers.


At 2013-02-15 12:01:01,"AMARNATH, Balachandar" <BALACHANDAR.AMARNATH@airbus.com> wrote:
>Thanks for the reply,
>
>My intention is not transferring files at the moment. I was just checking how the consumers
pull the messages from the queue. When I ran the code, it looks like, the messages are equally
shared between the consumers that make the application need to wait for the slow consumers
to finish even though fast consumers are free. Looks like the messages are buffered in consumers,
and hence no messages are available for fast consumers when they finished processing. Not
sure though !
>
>-Bala
>
>-----Original Message-----
>From: SuoNayi [mailto:suonayi2006@163.com] 
>Sent: 14 February 2013 17:30
>To: users@activemq.apache.org
>Subject: Re:New bie question - Running producer and two instance of consumer
>
>Hi,AMQ is not designed to transfer files, especially large files although it has provided
two approaches(BlobMessage or Jms Stream).
>In order to provide better performance, AMQ broker dispatches batches of messages to consumers
instead of pulling messages every time.
>
>So tune prefetch of consumers will help your fast consumers to get more chances to pull
messages from brokers.
>Take a look at:
>http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>
>
>
>
>
>
>At 2013-02-14 19:41:49,"AMARNATH, Balachandar" <BALACHANDAR.AMARNATH@airbus.com>
wrote:
>>Hi,
>>
>>I am new to activeMQ (for any other messaging system). I want to use this messaging
system for distributed processing of large number of files. I wish to split my application
in to two pieces namely producer and consumer, so that one instance of producer generates
messages that several instance of similar consumers can process it. The consumers are supposed
to be distributed machines. I wrote a simple producer and two consumers (as attached). The
producer generates 100 messages (simple text messages) and consumers take 'different' processing
times to process every message. Hence, I expect, the consumer that take less time to finish
processing would process more messages while the other one process less number of messages.
However, when I ran them, I saw the number of messages is evenly splitted and processed. Can
someone give me a hint here, how this can be achieved? And where I am going wrong?
>>
>>Here are my classes:-
>>
>>public class HelloWorldConsumer1 {
>>             public static void main(String[] args ){
>>            try {
>>            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>>                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
>>                // Create a Connection
>>                Connection connection = connectionFactory.createConnection();
>>                connection.start();
>>                // Create a Session
>>                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>>                // Create the destination (Topic or Queue)
>>                Destination destination = session.createQueue("TEST.FOO");
>>                // Create a MessageConsumer from the Session to the Topic or Queue
>>                MessageConsumer consumer = session.createConsumer(destination);
>>                ArrayList<String> contents = new ArrayList<String>();
>>                boolean flag = true;
>>                while (flag){
>>                        Message message = consumer.receive(1000);
>>                if (message instanceof TextMessage) {
>>                    TextMessage textMessage = (TextMessage) message;
>>                    String text = textMessage.getText();
>>                    System.out.println("Received : " + text + " and processing");
>>                    Thread.sleep(5000);
>>                    if (text.equalsIgnoreCase("END")){
>>                        System.out.println("The recvd text is "+text);
>>                        flag = false;
>>                    }
>>                    contents.add(text);
>>                } else {
>>                        System.out.println("Received : "+message);
>>                }
>>                }
>>                System.out.println("Exiting..");
>>                System.out.println("The contents collected in consumer.java is "+contents.size());
>>                connection.close();
>>            } catch (Exception e) {
>>                System.out.println("Caught: " + e);
>>                e.printStackTrace();
>>            }
>>        }
>>}
>>
>>
>>
>>public class HelloWorldProducer {
>>        public static void main(String[] args) throws Exception {
>>                    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>>                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
>>                // Create a Connection
>>                Connection connection = connectionFactory.createConnection();
>>                connection.start();
>>                // Create a Session
>>                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>>                // Create the destination (Topic or Queue)
>>                Destination destination = session.createQueue("TEST.FOO");
>>                // Create a MessageProducer from the Session to the Topic or Queue
>>                MessageProducer producer = session.createProducer(destination);
>>                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>>                int counter = 0;
>>                boolean flag = true;
>>                while (counter < 50){
>>                counter++;
>>                String text = "Message "+counter;// + Thread.currentThread().getName()
+ " : " + this.hashCode();
>>                TextMessage message = session.createTextMessage(text);
>>                // Tell the producer to send the message
>>                producer.send(message);
>>                System.out.println("Sent message '" + message.getText() + "'");
>>                Thread.sleep(1000);
>>                }
>>                Thread.sleep(2000);
>>                producer.send(session.createTextMessage("END"));
>>                Thread.sleep(4000);
>>                producer.send(session.createTextMessage("END"));
>>                // Clean up
>>                connection.close();
>>        }
>>    }
>>
>>
>>
>>
>>
>>With thanks and regards
>>Balachandar
>>
>>
>>
>>
>>The information in this e-mail is confidential. The contents may not be disclosed
or used by anyone other than the addressee. Access to this e-mail by anyone else is unauthorised.
>>If you are not the intended recipient, please notify Airbus immediately and delete
this e-mail.
>>Airbus cannot accept any responsibility for the accuracy or completeness of this e-mail
as it has been sent over public networks. If you have any concerns over the content of this
message or its Accuracy or Integrity, please contact Airbus immediately.
>>All outgoing e-mails from Airbus are checked using regularly updated virus scanning
software but you should take whatever measures you deem to be appropriate to ensure that this
message and any attachments are virus free.
>>
>
>This mail has originated outside your organization, either from an external partner or
the Global Internet.
>Keep this in mind if you answer this message.
>
>
>
>The information in this e-mail is confidential. The contents may not be disclosed or used
by anyone other than the addressee. Access to this e-mail by anyone else is unauthorised.
>If you are not the intended recipient, please notify Airbus immediately and delete this
e-mail.
>Airbus cannot accept any responsibility for the accuracy or completeness of this e-mail
as it has been sent over public networks. If you have any concerns over the content of this
message or its Accuracy or Integrity, please contact Airbus immediately.
>All outgoing e-mails from Airbus are checked using regularly updated virus scanning software
but you should take whatever measures you deem to be appropriate to ensure that this message
and any attachments are virus free.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message