activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <marc...@absa.co.za>
Subject RE: svn commit: r358785 - /incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java
Date Mon, 26 Dec 2005 11:02:56 GMT
Hello,

I believe that this is the source of the problem that I am experiencing.

I have a process that includes both an embedded broker and a durable
subscriber for a topic. I use another process to publish messages to the
topic. I then have a third process which retrieves messages from the
topic using a durable subscriber. The problem is that any messages
published to the topic after the broker has started are not retrieved by
the third process since the call node.getMessage() in the
PrefetchSubscription::dispatch method always returns null (The third
process is stopped and started regularly). I see that the reference
counting implemented in the IndirectMessageReference will drop the
message from memory when reference count hits 0 and reload it when
incrementReferenceCount() is called. It seems that the reference count
is always being reduced to 0 before the PrefetchSubscription::dispatch
method is called hence the null message. I added a stack dump to give
some idea at what point the reference count hits 0 (see below). I am not
sure though if this behaviour is by design or if somewhere in the
library the IndirectMessageReference::decrementReferenceCount() method
is being called one time to many.

Regards,

Marcus





-----Original Message-----
From: Hiram Chirino [mailto:hiram@hiramchirino.com] 
Sent: 23 December 2005 07:37 PM
To: activemq-dev@geronimo.apache.org
Cc: activemq-commits@geronimo.apache.org
Subject: Re: svn commit: r358785 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broke
r/region/PrefetchSubscription.java

Hi Adrian,

doing a getMessage() before incrementReferenceCount() is dangerous since
the message could have been swapped out and the call to
getMessage() will return null.
Yes, I know there is a null check to see if the message is null, but
that should only happen if the message was expired.  Right now I think
it's possible that we are going to have cases of where messages get
swapped out and this code is going to think that the message has been
expired.

Regards,
Hiram


On Dec 23, 2005, at 4:48 AM, aco@apache.org wrote:

> Author: aco
> Date: Fri Dec 23 01:47:47 2005
> New Revision: 358785
>
> URL: http://svn.apache.org/viewcvs?rev=358785&view=rev
> Log:
> Postpone incrementing of reference count and preload size, only after 
> we are sure that the message will be dispatched by the current 
> subscription. This is to prevent a memory leak type of scenario.
>
> Modified:
>     incubator/activemq/trunk/activemq-core/src/main/java/org/
> activemq/broker/region/PrefetchSubscription.java
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> activemq/broker/region/PrefetchSubscription.java
> URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/
> activemq-core/src/main/java/org/activemq/broker/region/
> PrefetchSubscription.java?rev=358785&r1=358784&r2=358785&view=diff
> ======================================================================
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/
> activemq/broker/region/PrefetchSubscription.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
> activemq/broker/region/PrefetchSubscription.java Fri Dec 23
> 01:47:47 2005
> @@ -239,19 +239,19 @@
>
>      private void dispatch(final MessageReference node) throws 
> IOException {
>
> -        node.incrementReferenceCount();
> -
>          final Message message = node.getMessage();
>          if( message == null ) {
>              return;
> -        }
> -        incrementPreloadSize(node.getMessage().getSize());
> +        }
>
>          // Make sure we can dispatch a message.
>          if( canDispatch(node) ) {
>
>              MessageDispatch md = createMessageDispatch(node, 
> message);
>              dispatched.addLast(node);
> +
> +            node.incrementReferenceCount();
> +            incrementPreloadSize(node.getMessage().getSize());
>
>              if( info.isDispatchAsync() ) {
>                  md.setConsumer(new Runnable(){
>


___________________________________________________________

Important Notice: 
Authorised Financial Services Provider

Important restrictions, qualifications and disclaimers 
("the Disclaimer") apply to this email. To read this click on the 
following address or copy into your Internet browser: 

http://www.absa.co.za/disclaimer

The Disclaimer forms part of the content of this email in terms of 
section 11 of the Electronic Communications and Transactions 
Act, 25 of 2002. 

If you are unable to access the Disclaimer, send a blank e-mail 
to disclaimer@absa.co.za and we will send you a copy of the 
Disclaimer.

Mime
View raw message