activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Naveen Rawat" <nav...@in.effectsoft.com>
Subject Queue hogging by a single consumer.
Date Thu, 17 Aug 2006 06:29:43 GMT

Hi all :) 

I am working with the binary version of ActiveMQ 4.0 broker and trying out 
the openwire cpp apis for asynchronous messaging.
[https://svn.apache.org/repos/asf/incubator/activemq/tags/activemq-4.0/openw 
ire-cpp] 

 

I wonder if I m resurrecting the mummies of issues already burnt. 

 

I am trying out having 3 consumers A, B and C listening on the same queue.
What I am trying to do is - 

	A, B, C sending on Q1, and consuming Z's response on Q2
	Consumer Z listening on Q1, responding back on Q2. 

	like this -
	A/B/C  >>  Q1  >>  Z  >>  Q2  >>  A/B/C
	 

Listening/responding to a single consumer is working well at present. BUT 
broker is spoofing up the responses from Z to the simultaneous consumers 
(either 2 or all three). Response for one consumer (A) is going to any of 
the other consumer (B/C). Same is happening for other consumers. Being 
prefetch size preset to 1000, the consumer that first manages session with 
the broker on a queue is getting all the messages (and if it gets 
terminated, the following one hogs the all and so on.) . 

As I m at presently testing, setting prefetch size to less (say 1), even 
dont solves the purpose as not giving it frequest quick requests (man Vs 
machine). Moreover as the hogging consumer is reading and acknowledging all 
the responses, the prefetch size of even 1 is not surpassed. 

I tried out "with no success" the way of grid processing of messages (using 
MessageGroups) as suggested in
	http://activemq.org/site/how-do-i-preserve-order-of-messages.html
Code relevant of this is as follows - 

	[........A/B/C....................................................
	producer = session->createProducer(Q1) ;
	producer->setPersistent(true) ;
	message = session->createBytesMessage() ;
	message->setJMSXGroupID("cheese") ;
	message->writeString("Hello World") ;
 	producer->send(message);
	.................................................................] 


	[ .........Z ' s OnMessage(message)...............................
	p<string> NGid;
	NGid = message->getJMSXGroupID();
	producer = session->createProducer(Q2) ;
	producer->setPersistent(true) ;
	reqMessage = session->createBytesMessage() ;
	reqMessage->setJMSXGroupID(NGid->c_str() );
	reqMessage->writeString("R E C E I V E D") ; 	//response string
	producer->send(reqMessage);
	.................................................................] 

Is there anymore needed in the code that I m loosing? 


I come to know that there are certain issues yet not resolved pertaining to 
the prefetch buffer initial size. Correct me please.
Will manipulation of prefetch buffer size help my cause? Please suggest a 
way otherwise. 

HELP ME.. 


			THANKS IN ADVANCE 

Regards,
Navin

Mime
View raw message