qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matjaž Ostroveršnik <Matjaz.Ostrovers...@halcom.si>
Subject multiple consumers on one or more queues in parallel
Date Sun, 18 Nov 2012 14:45:37 GMT
Hi all,

Thanks all for your fast answers on my previous question. 

I am trying to make a client / consumer who will attach to one or more queues and read the
messages from the queues and process them.
Since processing of a single message can take a while I'd like to have multiple consumers
running in parallel and consuming the messages as soon they arrive.

I've noticed that even if I run several clients (see below) in parallel, only one is getting
all the messages from the queue. Expirementation with acknowledgements and capacity did not
help.
Question:

How can I read from several queues at the same time and in parallel with several clients?


Thanks in advance

Matjaž

The relevant fraction of code is listed below:

        int delay = 60;
          if (m_session.nextReceiver(rec, delay * qmsg::Duration::SECOND)) //FIXME make it
configurable

          {
            Log::info(__S("Started receiving messages from the queue: '") + rec.getName()
+ "'");
            do
            {
              delay = 1;
              one_batch_of_files = read_from_queue_and_save_to_disk(rec, file_dscr);
              files_written += one_batch_of_files;
              Log::info(
                  __S(one_batch_of_files) + " files taken from the queue: '" + rec.getName()
                      + "' and written to disk. Still to process:" + __S(m_session.getReceivable()));
            }
            while (m_session.nextReceiver(rec, delay * qmsg::Duration::SECOND) &&
//FIXME make it configurable
                process::exit_on_signal());
          }
          else
          {
            Log::info(__S("No receiver with messages. Waiting...")+__S(delay));
          }
        Log::info("Files left to process:" +__S(m_session.getReceivable()));


My queue definition is as follows:
max_msg_cnt=1000000
que_size_pages=600000000
jnl_files=64
fs=850
param="--max-queue-size=$que_size_pages --file-size=$fs --file-count=$jnl_files --max-queue-count=$max_msg_cnt
--limit-policy=flow-to-disk --durable --group-header=\"filename\" --shared-groups "
qpid-config add queue Alpha   $param
qpid-config add queue Beta  $param
qpid-config add queue Gamma $param
qpid-config add queue Delta $param

Mime
View raw message