activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neil Pritchard <Neil.Pritch...@securetrading.com>
Subject RE: Individual acknowledge in pyactivemq
Date Wed, 15 Sep 2010 08:53:43 GMT


-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com] 
Sent: 14 September 2010 17:08
To: users@activemq.apache.org
Subject: RE: Individual acknowledge in pyactivemq

On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > Hi All,
> > 
> > I'm using pyactivemq as both a producer and consumer of messages which are being
brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java
ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more
importantly use individual acknowledge (or client acknowledge) but can't find any example
of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates
that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement
mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find
the Acknowledge() method.
> > 
> > Many thanks,
> > 
> > Neil
> > 
> > Here's my consumer:....
> > 
> > #!/usr/bin/env python
> > 
> > import cpickle as pickle
> > import Queue
> > import pyactivemq
> > from pyactivemq import ActiveMQConnectionFactory
> > from pyactivemq import AcknowledgeMode
> > 
> > class MessageListener(pyactivemq.MessageListener):
> >     def __init__(self, name, queue):
> >         pyactivemq.MessageListener.__init__(self)
> >         self.name = name
> >         self.queue = queue
> > 
> >     def onMessage(self, message):
> >         self.queue.put('%s got: %s' % (self.name, message.text))
> > 
> > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > conn = f.createConnection()
> > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > queue = Queue.Queue(0)
> > 
> > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > consumer = session.createConsumer(myqueue)
> > listener = MessageListener('consumer', queue)
> > consumer.messageListener = listener
> > 
> > conn.start()
> > while queue:
> >     message = queue.get(block=True)
> >     message = message[14:]
> >     notificationDict = pickle.loads(message)
> >     print notificationDict
> >     acknowledge()
> > 
> > conn.close()
> > 
> 
> It looks as if the acknowledge method is exposed on each Message object,
> so you should be able to ack them as they are received.  Whether or not
> it works is another question.
> 
> Regards
> 
> 

The correct URI setting would be something like, 

cms.PrefetchPolicy.topicPrefetch=1
cms.PrefetchPolicy.queuePrefetch=1
etc...

the cms Prefetch policy doesn't current support an "all" configuration
option, you could open a new Jira issue to add on to v3.2.4.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/



Hi Tim,

I assume the config goes here....

f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?cms.PrefetchPolicy.queuePrefetch=1')

Cheers,

Neil


Mime
View raw message