activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject RE: Individual acknowledge in pyactivemq
Date Wed, 15 Sep 2010 10:55:04 GMT
On Wed, 2010-09-15 at 09:53 +0100, Neil Pritchard wrote:
> 
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com] 
> Sent: 14 September 2010 17:08
> To: users@activemq.apache.org
> Subject: RE: Individual acknowledge in pyactivemq
> 
> On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote:
> > On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote:
> > > Hi All,
> > > 
> > > I'm using pyactivemq as both a producer and consumer of messages which are
being brokered by ActiveMQ 5.3.2.  In the past I used python stompy to produce messages and
java ActiveMQ to consume them.  I need to set a prefetch policy of 1 messgae at a time and
more importantly use individual acknowledge (or client acknowledge) but can't find any example
of how to acknowledge the message in pyactivemq, also the activeMQ log (set to debug) indicates
that the messages are being consumed regardless.  Can anyone tell me how to set the acknowledgement
mode properly and how to acknowledge a message.  When I try to acknowledge, Python can't find
the Acknowledge() method.
> > > 
> > > Many thanks,
> > > 
> > > Neil
> > > 
> > > Here's my consumer:....
> > > 
> > > #!/usr/bin/env python
> > > 
> > > import cpickle as pickle
> > > import Queue
> > > import pyactivemq
> > > from pyactivemq import ActiveMQConnectionFactory
> > > from pyactivemq import AcknowledgeMode
> > > 
> > > class MessageListener(pyactivemq.MessageListener):
> > >     def __init__(self, name, queue):
> > >         pyactivemq.MessageListener.__init__(self)
> > >         self.name = name
> > >         self.queue = queue
> > > 
> > >     def onMessage(self, message):
> > >         self.queue.put('%s got: %s' % (self.name, message.text))
> > > 
> > > f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire')
> > > conn = f.createConnection()
> > > consumer_session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE')
> > > queue = Queue.Queue(0)
> > > 
> > > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE)
> > > consumer = session.createConsumer(myqueue)
> > > listener = MessageListener('consumer', queue)
> > > consumer.messageListener = listener
> > > 
> > > conn.start()
> > > while queue:
> > >     message = queue.get(block=True)
> > >     message = message[14:]
> > >     notificationDict = pickle.loads(message)
> > >     print notificationDict
> > >     acknowledge()
> > > 
> > > conn.close()
> > > 
> > 
> > It looks as if the acknowledge method is exposed on each Message object,
> > so you should be able to ack them as they are received.  Whether or not
> > it works is another question.
> > 
> > Regards
> > 
> > 
> 
> The correct URI setting would be something like, 
> 
> cms.PrefetchPolicy.topicPrefetch=1
> cms.PrefetchPolicy.queuePrefetch=1
> etc...
> 
> the cms Prefetch policy doesn't current support an "all" configuration
> option, you could open a new Jira issue to add on to v3.2.4.
> 
> Regards
> 


Yes, the options are applied on the URI that is supplied to the
ConnectionFactory.

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/


Mime
View raw message