activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject RE: Individual acknowledge in pyactivemq
Date Tue, 14 Sep 2010 16:07:39 GMT
On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > Hi All,
> > 
> > I'm using pyactivemq as both a producer and consumer of messages which are being
brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and java
ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and more
importantly use individual acknowledge (or client acknowledge) but can't find any example
of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates
that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement
mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find
the Acknowledge() method.
> > 
> > Many thanks,
> > 
> > Neil
> > 
> > Here's my consumer:....
> > 
> > #!/usr/bin/env python
> > 
> > import cpickle as pickle
> > import Queue
> > import pyactivemq
> > from pyactivemq import ActiveMQConnectionFactory
> > from pyactivemq import AcknowledgeMode
> > 
> > class MessageListener(pyactivemq.MessageListener):
> >     def __init__(self, name, queue):
> >         pyactivemq.MessageListener.__init__(self)
> >         self.name = name
> >         self.queue = queue
> > 
> >     def onMessage(self, message):
> >         self.queue.put('%s got: %s' % (self.name, message.text))
> > 
> > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > conn = f.createConnection()
> > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > queue = Queue.Queue(0)
> > 
> > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > consumer = session.createConsumer(myqueue)
> > listener = MessageListener('consumer', queue)
> > consumer.messageListener = listener
> > 
> > conn.start()
> > while queue:
> >     message = queue.get(block=True)
> >     message = message[14:]
> >     notificationDict = pickle.loads(message)
> >     print notificationDict
> >     acknowledge()
> > 
> > conn.close()
> > 
> 
> It looks as if the acknowledge method is exposed on each Message object,
> so you should be able to ack them as they are received.  Whether or not
> it works is another question.
> 
> Regards
> 
> 

The correct URI setting would be something like, 

cms.PrefetchPolicy.topicPrefetch=1
cms.PrefetchPolicy.queuePrefetch=1
etc...

the cms Prefetch policy doesn't current support an "all" configuration
option, you could open a new Jira issue to add on to v3.2.4.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/


Mime
View raw message