qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Luiz Cordeiro <Luiz.Corde...@corp.terra.com.br>
Subject RES: Behaviour when there're elements on the queue before instantiating the consumers
Date Tue, 15 Dec 2009 20:10:12 GMT
Hi,

	This did the job:

      mngr = new SubscriptionManager( session );
      FlowControl fc(5, FlowControl::UNLIMITED, false);
      mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));

	At least the first part: Consume 5 items. But how do I give the process more credit in order
to read five more? What we want to do is have N consumers and allow each to get a fare share
of items. So, once each sibling consumer has got their share, I would like to give them all
a new share.

	The closest thing to this I could get was:

      // O esperado eh que apenas 5 elementos sejam consumidos.
      mngr = new SubscriptionManager( session );
      FlowControl fc(5, FlowControl::UNLIMITED, false);

      while (1) {
         mngr->subscribe(incoming, "teste_flow", SubscriptionSettings(fc));

         while ( incoming.get(m, 1000000L) ) {
            // printf("[%d] [%s]\n", pid, m.getData().c_str());
            ++count;
         }

         printf("[%d] = [%d]\n", pid, count);
         mngr->cancel("teste_flow");
      }

	Running with 5 consumers, I get something like this:

[12272] = [200]
[12273] = [200]
[12274] = [195]
[12275] = [190]
[12276] = [190]

	Is this the right way to do it? To keep creating and closing the subscription?

Thanks,
Acácio.

-----Mensagem original-----
De: Gordon Sim [mailto:gsim@redhat.com] 
Enviada em: terça-feira, 15 de dezembro de 2009 14:39
Para: users@qpid.apache.org
Assunto: Re: Behaviour when there're elements on the queue before instantiating the consumers

On 12/15/2009 04:08 PM, Acácio Centeno wrote:
> Hello Mr. Sim,
>
> 	Could you give me an example on using flow control? I tried this,
> expecting to limit the amount of messages to be consumed to five, but almost
> all the messages on the queue were consumed (I'm also fuzzy about why not
> all of them were consumed):
>
> #include<iostream>
>
> #include<qpid/client/Connection.h>
> #include<qpid/client/Session.h>
> #include<qpid/client/Message.h>
> #include<qpid/client/SubscriptionManager.h>
>
> using namespace std;
> using namespace qpid::client;
> using namespace qpid::framing;
>
> int main(int argc, char *argv[]) {
>     Connection           connection;
>     ConnectionSettings   settings;
>     Session              session;
>     SubscriptionManager  *mngr;
>     LocalQueue           incoming;
>     Message              m;
>
>     pid_t                pid = getpid();
>
>     try {
>        settings.host = "localhost";
>        settings.port = 5672;
>        settings.virtualhost = "bridge";
>        settings.mechanism = "ANONYMOUS";
>        settings.tcpNoDelay = false;
>
>        connection.open(settings);
>        session = connection.newSession();
>
>        mngr = new SubscriptionManager( session );

One easy way is to change:

>        mngr->subscribe(incoming, "teste_flow");

to:
          mngr->subscribe(incoming, "teste_flow", 
SubscriptionSettings(FlowControl::messageWindow(5)));

>        // O esperado eh que apenas 5 elementos sejam consumidos.

You don't need the following line if you do the above; if you do want to 
set the default instead of the specific approach above, this line needs 
to happen *before* you call SubscriptionManager::subscribe().

>        mngr->setFlowControl( "teste_flow", 5, 0, false );
>
>        while ( incoming.get(m, 1000000L) ) {
>           printf("[%d] [%s]\n", pid, m.getData().c_str());
>        }
>
>        printf("[%d] finalizado.\n", pid);
>     } catch(const std::exception&  error) {
>        fprintf(stderr, "Erro: %s\n", error.what());
>        return 1;
>     }
>
>     return 0;
> }
>
> Thanks,

Hope this helps!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Mime
View raw message