activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Laugt <daniel.la...@WallStreetSystems.com>
Subject RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Date Wed, 09 Nov 2011 15:03:05 GMT
Ok I've opened a Jira issue for this:
https://issues.apache.org/jira/browse/AMQCPP-384

Thanks,
Daniel.

-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com] 
Sent: 09 November 2011 12:14
To: users@activemq.apache.org
Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull
command is lost

On Wed, 2011-11-09 at 10:00 +0100, Daniel Laugt wrote:
> Hello Timothy,
> 
> I've just looked the svn trunk and I don't see the fix...
> 
> The last commit on ConnectionStateTracker.cpp has been done the 21th April 2011. ActiveMQ-CPP
has been release the 29th April 2011.
> 
> On the issue AMQ-2877, the method processMessagePull() has been overridden in the ConnectionStateTracker
class. This is not the case in the trunk of ActiveMQ-CPP.
> 
> Daniel.

Recommend you open a new Jira issue in the ActiveMQ-CPP Jira and attach
any patches and unit tests there so that this gets addressed.

Regards



> 
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com] 
> Sent: 08 November 2011 16:33
> To: users@activemq.apache.org
> Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the
MessagePull command is lost
> 
> On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote:
> > For ActiveMQ-CPP, I'm using the version 3.4.0.
> > 
> > Daniel.
> 
> I believe these fixes were already made in trunk, I'd recommend you try
> out that code.
> 
> Regards
> 
> 
> > 
> > -----Original Message-----
> > From: Timothy Bish [mailto:tabish121@gmail.com] 
> > Sent: 08 November 2011 16:21
> > To: users@activemq.apache.org
> > Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers
if the MessagePull command is lost
> > 
> > On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> > > Hello,
> > > 
> > > I'm using a synchronous consumer. If I understand well the configuration prefetch
size = 0 makes the consumers as synchronous.
> > > 
> > 
> > What version of ActiveMQ-CPP are you using?
> > 
> > > Regards,
> > > Daniel.
> > > 
> > > -----Original Message-----
> > > From: Oscar Pernas [mailto:oscar@pernas.es] 
> > > Sent: 08 November 2011 15:45
> > > To: users@activemq.apache.org
> > > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers
if the MessagePull command is lost
> > > 
> > > Hi Laugt,
> > > 
> > > Are you using synchronous or asynchronous consumer? When I was using
> > > synchronous consuming I used to have problems with failover reconnection.
> > > 
> > > 
> > > regards
> > > 
> > > 2011/11/8 Daniel Laugt <daniel.laugt@wallstreetsystems.com>
> > > 
> > > > It seems that email attachment is not allowed... I put the diff directly
> > > > below...
> > > >
> > > >
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > >           2011/10/19 10:02:31                146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > >        2011/10/19 10:17:24                146084
> > > >
> > > > @@ -54,9 +54,11 @@
> > > >
> > > >         //        Either we need to implement something similar to
> > > > LinkedHashMap or find
> > > >
> > > >         //        some other way of tracking the eldest entry into the
map
> > > > and removing it
> > > >
> > > >         //        if the cache size is exceeded.
> > > >
> > > > -        ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> > > >
> > > > +        ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> > > >
> > > >                           MessageId::COMPARATOR > messageCache;
> > > >
> > > > +        ConcurrentStlMap< std::string, Pointer<Command> >
> > > > messagePullCache;
> > > >
> > > > +
> > > >
> > > >         bool trackTransactions;
> > > >
> > > >         bool restoreSessions;
> > > >
> > > >         bool restoreConsumers;
> > > >
> > > > @@ -122,6 +124,8 @@
> > > >
> > > >         virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > > > info );
> > > >
> > > > +        virtual Pointer<Command> processMessagePull( MessagePull*
pull );
> > > >
> > > > +
> > > >
> > > >         bool isRestoreConsumers() const {
> > > >
> > > >             return this->restoreConsumers;
> > > >
> > > >         }
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > >      2011/10/19 10:02:31                146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > >   2011/10/19 10:17:24                146084
> > > >
> > > > @@ -108,11 +108,21 @@
> > > >
> > > > void ConnectionStateTracker::trackBack( const Pointer<Command>&
command ) {
> > > >
> > > >     try{
> > > >
> > > > -        if( trackMessages && command != NULL && command->isMessage()
) {
> > > >
> > > > -            Pointer<Message> message =
> > > >
> > > > +        if( command != NULL ) {
> > > >
> > > > +            if( trackMessages && command->isMessage() ) {
> > > >
> > > > +              Pointer<Message> message =
> > > >
> > > >                 command.dynamicCast<Message>();
> > > >
> > > > -            if( message->getTransactionId() == NULL ) {
> > > >
> > > > +              if( message->getTransactionId() == NULL ) {
> > > >
> > > >                 currentCacheSize = currentCacheSize + message->getSize();
> > > >
> > > > +              }
> > > >
> > > > +            }
> > > >
> > > > +            else {
> > > >
> > > > +              Pointer<MessagePull> messagePull =
> > > >
> > > > +                command.dynamicCast<MessagePull>();
> > > >
> > > > +              if( messagePull != NULL ) {
> > > >
> > > > +                // just needs to be a rough estimate of size, ~4
> > > > identifiers
> > > >
> > > > +                currentCacheSize += 400;
> > > >
> > > > +              }
> > > >
> > > >             }
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > > @@ -148,12 +158,19 @@
> > > >
> > > >         }
> > > >
> > > >         // Now we flush messages
> > > >
> > > > -        std::vector< Pointer<Message> > messages = messageCache.values();
> > > >
> > > > -        std::vector< Pointer<Message> >::const_iterator messageIter
=
> > > > messages.begin();
> > > >
> > > > +        std::vector< Pointer<Command> > messages = messageCache.values();
> > > >
> > > > +        std::vector< Pointer<Command> >::const_iterator messageIter
=
> > > > messages.begin();
> > > >
> > > >         for( ; messageIter != messages.end(); ++messageIter ) {
> > > >
> > > >             transport->oneway( *messageIter );
> > > >
> > > >         }
> > > >
> > > > +
> > > >
> > > > +        std::vector< Pointer<Command> > messagePulls =
> > > > messagePullCache.values();
> > > >
> > > > +        std::vector< Pointer<Command> >::const_iterator messagePullIter
=
> > > > messagePulls.begin();
> > > >
> > > > +
> > > >
> > > > +        for( ; messagePullIter != messagePulls.end(); ++messagePullIter
)
> > > > {
> > > >
> > > > +          transport->oneway( *messagePullIter );
> > > >
> > > > +        }
> > > >
> > > >     }
> > > >
> > > >     AMQ_CATCH_RETHROW( IOException )
> > > >
> > > >     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> > > >
> > > > @@ -790,6 +807,19 @@
> > > >
> > > > }
> > > >
> > > >
> > > >  ////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > > > pull ) {
> > > >
> > > > +  if( pull != NULL
> > > >
> > > > +    && pull->getDestination() != NULL
> > > >
> > > > +    && pull->getConsumerId() != NULL) {
> > > >
> > > > +      std::string id = pull->getDestination()->toString() + "::"
+
> > > > pull->getConsumerId()->toString();
> > > >
> > > > +      messagePullCache.put( id,
> > > >
> > > > +        Pointer<Command>( pull->cloneDataStructure() ) );
> > > >
> > > > +  }
> > > >
> > > > +
> > > >
> > > > +  return Pointer<Command>();
> > > >
> > > > +}
> > > >
> > > > +
> > > >
> > > >
> > > > +////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> > > >
> > > >     transport::Transport* transport, const Pointer<ConnectionId>&
> > > > connectionId ) {
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > > > Sent: 08 November 2011 13:16
> > > > To: users@activemq.apache.org
> > > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > > > consumers if the MessagePull command is lost
> > > >
> > > >
> > > >
> > > > Hello,
> > > >
> > > >
> > > >
> > > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > > > prefetch size = 0, polling consumer fails to reconnect during the failover.
> > > >
> > > >
> > > >
> > > > This issue has been fixed by the item AMQ-2877:
> > > >
> > > > https://issues.apache.org/jira/browse/AMQ-2877
> > > >
> > > >
> > > >
> > > > AMQ-2877 fixes the problem in the java client side but not in the c++
> > > > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> > > >
> > > >
> > > >
> > > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > > > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > > > suggestion...
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > > 
> > > 
> > 
> > 
> > 
> 

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/




Mime
View raw message