qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gordon Sim <g...@redhat.com>
Subject Re: RES: Behaviour when there're elements on the queue before instantiating the consumers
Date Wed, 16 Dec 2009 10:20:10 GMT
On 12/15/2009 08:10 PM, Luiz Cordeiro wrote:
> Hi,
>
> 	This did the job:
>
>        mngr = new SubscriptionManager( session );
>        FlowControl fc(5, FlowControl::UNLIMITED, false);
>        mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
>
> 	At least the first part: Consume 5 items. But how do I give the process more credit
in order to read five more? What we want to do is have N consumers and allow each to get a
fare share of items. So, once each sibling consumer has got their share, I would like to give
them all a new share.

There are two options. One option (which I would recommend) is to use a 
credit 'window' whereby more messages are sent as they are acknowledged 
by the client[1] (i.e. in this option the credit defines a 'prefetch' 
window). To do this you would set the last argument to your flow control 
object to be true. E.g.:

     FlowControl fc(5, FlowControl::UNLIMITED, true);

or

     FlowControl fc = FlowControl::messageWindow(5);//does the same thing

The other option is to directly issue credit, e.g. using 
Subscription::grantCredit().


[1] Even with this option you can control the acknowledgement process if 
you wish. In AMQP 0-10 there are two 'levels' of message acknowledgement 
(at the model- and session- levels). Flow is controlled by the session 
level completion (i.e. the session_completed control). By default these 
controls are sent by the library but the application can decide when 
that should happen or even take control for sending completion 
themselves (see SubscriptionSettings::completionMode for details).

>
> 	The closest thing to this I could get was:
>
>        // O esperado eh que apenas 5 elementos sejam consumidos.
>        mngr = new SubscriptionManager( session );
>        FlowControl fc(5, FlowControl::UNLIMITED, false);
>
>        while (1) {
>           mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));
>
>           while ( incoming.get(m, 1000000L) ) {
>              // printf("[%d] [%s]\n", pid, m.getData().c_str());
>              ++count;
>           }
>
>           printf("[%d] = [%d]\n", pid, count);
>           mngr->cancel("teste_flow");
>        }
>
> 	Running with 5 consumers, I get something like this:
>
> [12272] = [200]
> [12273] = [200]
> [12274] = [195]
> [12275] = [190]
> [12276] = [190]
>
> 	Is this the right way to do it? To keep creating and closing the subscription?
>
> Thanks,
> Acácio.
>
> -----Mensagem original-----
> De: Gordon Sim [mailto:gsim@redhat.com]
> Enviada em: terça-feira, 15 de dezembro de 2009 14:39
> Para: users@qpid.apache.org
> Assunto: Re: Behaviour when there're elements on the queue before instantiating the consumers
>
> On 12/15/2009 04:08 PM, Acácio Centeno wrote:
>> Hello Mr. Sim,
>>
>> 	Could you give me an example on using flow control? I tried this,
>> expecting to limit the amount of messages to be consumed to five, but almost
>> all the messages on the queue were consumed (I'm also fuzzy about why not
>> all of them were consumed):
>>
>> #include<iostream>
>>
>> #include<qpid/client/Connection.h>
>> #include<qpid/client/Session.h>
>> #include<qpid/client/Message.h>
>> #include<qpid/client/SubscriptionManager.h>
>>
>> using namespace std;
>> using namespace qpid::client;
>> using namespace qpid::framing;
>>
>> int main(int argc, char *argv[]) {
>>      Connection           connection;
>>      ConnectionSettings   settings;
>>      Session              session;
>>      SubscriptionManager  *mngr;
>>      LocalQueue           incoming;
>>      Message              m;
>>
>>      pid_t                pid = getpid();
>>
>>      try {
>>         settings.host = "localhost";
>>         settings.port = 5672;
>>         settings.virtualhost = "bridge";
>>         settings.mechanism = "ANONYMOUS";
>>         settings.tcpNoDelay = false;
>>
>>         connection.open(settings);
>>         session = connection.newSession();
>>
>>         mngr = new SubscriptionManager( session );
>
> One easy way is to change:
>
>>         mngr->subscribe(incoming, "teste_flow");
>
> to:
>            mngr->subscribe(incoming, "teste_flow",
> SubscriptionSettings(FlowControl::messageWindow(5)));
>
>>         // O esperado eh que apenas 5 elementos sejam consumidos.
>
> You don't need the following line if you do the above; if you do want to
> set the default instead of the specific approach above, this line needs
> to happen *before* you call SubscriptionManager::subscribe().
>
>>         mngr->setFlowControl( "teste_flow", 5, 0, false );
>>
>>         while ( incoming.get(m, 1000000L) ) {
>>            printf("[%d] [%s]\n", pid, m.getData().c_str());
>>         }
>>
>>         printf("[%d] finalizado.\n", pid);
>>      } catch(const std::exception&   error) {
>>         fprintf(stderr, "Erro: %s\n", error.what());
>>         return 1;
>>      }
>>
>>      return 0;
>> }
>>
>> Thanks,
>
> Hope this helps!
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Mime
View raw message