activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Davies <rajdav...@gmail.com>
Subject Re: svn commit: r558054 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apa...
Date Fri, 24 Aug 2007 06:30:53 GMT
Hi Ferry,

its best to write to the user or dev list in general (copied). This  
certainly looks like a bug. Please try with the next snapshot built  
after today

cheers,

Rob

On Aug 24, 2007, at 4:17 AM, vri_97@yahoo.com wrote:

> Hi,
> First, I'm sorry if this is not a proper way to ask you a question.
> I will not do it again if that is so.
>
> I got this error,after the expired event :
> 2007-08-24 11:12:39,260 DEBUG  
> [org.apache.activemq.broker.region.RegionBroker] Message expired  
> ActiveMQTextMessage {commandId = 15, ...
> 2007-08-24 11:12:39,270 WARN   
> [org.apache.activemq.broker.region.RegionBroker] Failed to pass  
> expired message to dead letter queue
> 2007-08-24 11:12:39,270 DEBUG  
> [org.apache.activemq.broker.region.AbstractRegion] Adding  
> destination: topic:// 
> ActiveMQ.Advisory.Expired.Queue.ActiveMQ.DLQ.Queue.queue.HelloQ
>
> After that, the second expired :
> 2007-08-24 11:12:39,270 DEBUG  
> [org.apache.activemq.broker.region.RegionBroker] Message expired  
> ActiveMQTextMessage {commandId = 5, ....
> 2007-08-24 11:12:39,270 WARN   
> [org.apache.activemq.broker.region.RegionBroker] Failed to pass  
> expired message to dead letter queue
>
> I check the code, it reach to logger.warn, which didn't tell what  
> is the exception. I don't know what is wrong ? is it a bug ? or is  
> there anything that I should check in my config file ?
> I use ActiveMQ5-SNAPSHOT-57
>
> Thanks..
> Regards,
> Ferry
>
>
> rajdavies-2 wrote:
>>
>> Author: rajdavies
>> Date: Fri Jul 20 10:08:10 2007
>> New Revision: 558054
>>
>> URL: http://svn.apache.org/viewvc?view=rev&rev=558054
>> Log:
>> Fix for:
>>  http://issues.apache.org/activemq/browse/AMQ-1207
>> http://issues.apache.org/activemq/browse/AMQ-880
>> http://issues.apache.org/activemq/browse/AMQ-450
>> http://issues.apache.org/activemq/browse/AMQ-879
>>
>> Added:
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> (with props)
>> Modified:
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisoryBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisorySupport.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/Broker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/BrokerFilter.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/EmptyBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/ErrorBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/MutableBrokerFilter.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/AbstractRegion.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFactoryImpl.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/PrefetchSubscription.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/RegionBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TempQueueRegion.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Topic.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TopicSubscription.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/DeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/IndividualDeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/SharedDeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ 
>> broker/StubBroker.java
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisoryBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/advisory/AdvisoryBroker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisoryBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisoryBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,6 +17,7 @@
>>   */
>>  package org.apache.activemq.advisory;
>>
>> +import java.io.IOException;
>>  import java.util.Iterator;
>>
>>  import org.apache.activemq.broker.Broker;
>> @@ -24,6 +25,7 @@
>>  import org.apache.activemq.broker.ConnectionContext;
>>  import org.apache.activemq.broker.ProducerBrokerExchange;
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.ActiveMQMessage;
>> @@ -38,6 +40,8 @@
>>  import org.apache.activemq.command.ProducerInfo;
>>  import org.apache.activemq.util.IdGenerator;
>>  import org.apache.activemq.util.LongSequenceGenerator;
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>>
>>  import java.util.concurrent.ConcurrentHashMap;
>>
>> @@ -49,7 +53,7 @@
>>   */
>>  public class AdvisoryBroker extends BrokerFilter {
>>
>> -    //private static final Log log =
>> LogFactory.getLog(AdvisoryBroker.class);
>> +    private static final Log log =
>> LogFactory.getLog(AdvisoryBroker.class);
>>
>>      protected final ConcurrentHashMap connections = new
>> ConcurrentHashMap();
>>      protected final ConcurrentHashMap consumers = new
>> ConcurrentHashMap();
>> @@ -226,6 +230,16 @@
>>              ActiveMQTopic topic =
>> AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
>>              producers.remove(info.getProducerId());
>>              fireProducerAdvisory(context, topic,
>> info.createRemoveCommand());
>> +        }
>> +    }
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> messageReference){
>> +        next.messageExpired(context,messageReference);
>> +        try{
>> +            ActiveMQTopic
>> topic=AdvisorySupport.getExpiredMessageTopic 
>> (messageReference.getMessage().getDestination());
>> +            fireAdvisory(context,topic,messageReference.getMessage 
>> ());
>> +        }catch(Exception e){
>> +            log.warn("Failed to fire message expired advisory");
>>          }
>>      }
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisorySupport.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/advisory/AdvisorySupport.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisorySupport.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> advisory/AdvisorySupport.java
>> Fri Jul 20 10:08:10 2007
>> @@ -64,6 +64,13 @@
>>              return new
>> ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX 
>> +destination.getPhysicalName());
>>      }
>>
>> +    public static ActiveMQTopic
>> getExpiredMessageTopic(ActiveMQDestination destination) {
>> +        if (destination.isQueue()) {
>> +            return getExpiredQueueMessageAdvisoryTopic(destination);
>> +        }
>> +        return getExpiredTopicMessageAdvisoryTopic(destination);
>> +    }
>> +
>>      public static ActiveMQTopic
>> getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination  
>> destination) {
>>          String name =
>> EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
>>          return new ActiveMQTopic(name);
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/Broker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/Broker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/Broker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/Broker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -20,19 +20,15 @@
>>  import java.net.URI;
>>  import java.util.Set;
>>  import org.apache.activemq.Service;
>> -import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Region;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.BrokerId;
>>  import org.apache.activemq.command.BrokerInfo;
>>  import org.apache.activemq.command.ConnectionInfo;
>>  import org.apache.activemq.command.DestinationInfo;
>>  import org.apache.activemq.command.MessageDispatch;
>> -import org.apache.activemq.command.MessageDispatchNotification;
>> -import org.apache.activemq.command.MessagePull;
>>  import org.apache.activemq.command.ProducerInfo;
>> -import org.apache.activemq.command.Response;
>>  import org.apache.activemq.command.SessionInfo;
>>  import org.apache.activemq.command.TransactionId;
>>  import org.apache.activemq.kaha.Store;
>> @@ -135,6 +131,8 @@
>>
>>      /**
>>       * Gets a list of all the prepared xa transactions.
>> +     * @param context transaction ids
>> +     * @return
>>       * @throws Exception TODO
>>       */
>>      public TransactionId[] getPreparedTransactions(ConnectionContext
>> context) throws Exception;
>> @@ -151,7 +149,7 @@
>>       * Prepares a transaction. Only valid for xa transactions.
>>       * @param context
>>       * @param xid
>> -     * @return
>> +     * @return id
>>       * @throws Exception TODO
>>       */
>>      public int prepareTransaction(ConnectionContext context,
>> TransactionId xid) throws Exception;
>> @@ -176,6 +174,9 @@
>>
>>      /**
>>       * Forgets a transaction.
>> +     * @param context
>> +     * @param transactionId
>> +     * @throws Exception
>>       */
>>      public void forgetTransaction(ConnectionContext context,
>> TransactionId transactionId) throws Exception;
>>
>> @@ -246,7 +247,35 @@
>>       */
>>      public URI getVmConnectorURI();
>>
>> +    /**
>> +     * called when the brokerService starts
>> +     */
>>      public void brokerServiceStarted();
>>
>> +    /**
>> +     * @return the BrokerService
>> +     */
>>      BrokerService getBrokerService();
>> +
>> +    /**
>> +     * Ensure we get the Broker at the top of the Stack
>> +     * @return the broker at the top of the Stack
>> +     */
>> +    Broker getRoot();
>> +
>> +    /**
>> +     * A Message has Expired
>> +     * @param context
>> +     * @param messageReference
>> +     * @throws Exception
>> +     */
>> +    public void messageExpired(ConnectionContext context,
>> MessageReference messageReference);
>> +
>> +    /**
>> +     * A message needs to go the a DLQ
>> +     * @param context
>> +     * @param messageReference
>> +     * @throws Exception
>> +     */
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference);
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/BrokerFilter.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/BrokerFilter.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/BrokerFilter.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/BrokerFilter.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,12 @@
>>   */
>>  package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Map;
>> +import java.util.Set;
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.BrokerId;
>>  import org.apache.activemq.command.BrokerInfo;
>> @@ -38,10 +41,6 @@
>>  import org.apache.activemq.command.TransactionId;
>>  import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>>  /**
>>   * Allows you to intercept broker operation so that features such as
>> security can be
>>   * implemented as a pluggable filter.
>> @@ -245,5 +244,17 @@
>>
>>      public BrokerService getBrokerService(){
>>          return next.getBrokerService();
>> +    }
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> message){
>> +        next.messageExpired(context,message);
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> +       next.sendToDeadLetterQueue(context,messageReference);
>> +    }
>> +
>> +    public Broker getRoot() {
>> +       return next.getRoot();
>>      }
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/EmptyBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/EmptyBroker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/EmptyBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/EmptyBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,13 @@
>>   */
>>  package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Collections;
>> +import java.util.Map;
>> +import java.util.Set;
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.BrokerId;
>>  import org.apache.activemq.command.BrokerInfo;
>> @@ -38,11 +42,6 @@
>>  import org.apache.activemq.command.TransactionId;
>>  import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Collections;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>>  /**
>>   * Dumb implementation - used to be overriden by listeners
>>   *
>> @@ -245,4 +244,14 @@
>>      public BrokerService getBrokerService(){
>>          return null;
>>      }
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> message){
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> +    }
>> +
>> +    public Broker getRoot(){
>> +        return null;
>> +     }
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/ErrorBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/ErrorBroker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/ErrorBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/ErrorBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -21,10 +21,9 @@
>>  import java.util.Collections;
>>  import java.util.Map;
>>  import java.util.Set;
>> -
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.BrokerId;
>>  import org.apache.activemq.command.BrokerInfo;
>> @@ -245,4 +244,16 @@
>>      public BrokerService getBrokerService(){
>>          throw new BrokerStoppedException(this.message);
>>      }
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> message){
>> +       throw new BrokerStoppedException(this.message);
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> +       throw new BrokerStoppedException(this.message);
>> +    }
>> +
>> +    public Broker getRoot(){
>> +        throw new BrokerStoppedException(this.message);
>> +     }
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/MutableBrokerFilter.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/MutableBrokerFilter.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/MutableBrokerFilter.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/MutableBrokerFilter.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,12 @@
>>   */
>>  package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Map;
>> +import java.util.Set;
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>>  import org.apache.activemq.command.BrokerId;
>>  import org.apache.activemq.command.BrokerInfo;
>> @@ -38,10 +41,6 @@
>>  import org.apache.activemq.command.TransactionId;
>>  import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>>  /**
>>   * Like a BrokerFilter but it allows you to switch the getNext 
>> ().broker.
>> This has more
>>   * overhead than a BrokerFilter since access to the getNext 
>> ().broker has
>> to synchronized
>> @@ -258,6 +257,19 @@
>>
>>      public BrokerService getBrokerService(){
>>          return getNext().getBrokerService();
>> +    }
>> +
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> message){
>> +        getNext().messageExpired(context,message);
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference) {
>> +       getNext().sendToDeadLetterQueue(context,messageReference);
>> +    }
>> +
>> +    public Broker getRoot(){
>> +        return getNext().getRoot();
>>      }
>>
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/AbstractRegion.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/AbstractRegion.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/AbstractRegion.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/AbstractRegion.java
>> Fri Jul 20 10:08:10 2007
>> @@ -332,14 +332,15 @@
>>                      // Try to auto create the destination... re- 
>> invoke
>> broker from the
>>                      // top so that the proper security checks are
>> performed.
>>                      try {
>> +
>> +
>> context.getBroker().addDestination(context,destination);
>>                          dest = addDestination(context, destination);
>> -
>> //context.getBroker().addDestination(context,destination);
>>                      }
>>                      catch (DestinationAlreadyExistsException e) {
>>                          // if the destination already exists then  
>> lets
>> ignore this error
>>                      }
>>                      // We should now have the dest created.
>> -                    //dest=(Destination) destinations.get 
>> (destination);
>> +                    dest=(Destination) destinations.get 
>> (destination);
>>                  }
>>                  if(dest==null){
>>                      throw new JMSException("The destination
>> "+destination+" does not exist.");
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFactoryImpl.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/DestinationFactoryImpl.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFactoryImpl.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFactoryImpl.java
>> Fri Jul 20 10:08:10 2007
>> @@ -42,118 +42,121 @@
>>   * @author fateev@amazon.com
>>   * @version $Revision$
>>   */
>> -public class DestinationFactoryImpl extends DestinationFactory {
>> +public class DestinationFactoryImpl extends DestinationFactory{
>>
>>      protected final UsageManager memoryManager;
>>      protected final TaskRunnerFactory taskRunnerFactory;
>>      protected final PersistenceAdapter persistenceAdapter;
>>      protected RegionBroker broker;
>>
>> -    public DestinationFactoryImpl(UsageManager memoryManager,
>> TaskRunnerFactory taskRunnerFactory,
>> -            PersistenceAdapter persistenceAdapter) {
>> -        this.memoryManager = memoryManager;
>> -        this.taskRunnerFactory = taskRunnerFactory;
>> -        if (persistenceAdapter == null) {
>> +    public DestinationFactoryImpl(UsageManager
>> memoryManager,TaskRunnerFactory taskRunnerFactory,
>> +            PersistenceAdapter persistenceAdapter){
>> +        this.memoryManager=memoryManager;
>> +        this.taskRunnerFactory=taskRunnerFactory;
>> +        if(persistenceAdapter==null){
>>              throw new IllegalArgumentException("null
>> persistenceAdapter");
>>          }
>> -        this.persistenceAdapter = persistenceAdapter;
>> +        this.persistenceAdapter=persistenceAdapter;
>>      }
>>
>> -    public void setRegionBroker(RegionBroker broker) {
>> -        if (broker == null) {
>> +    public void setRegionBroker(RegionBroker broker){
>> +        if(broker==null){
>>              throw new IllegalArgumentException("null broker");
>>          }
>> -        this.broker = broker;
>> +        this.broker=broker;
>>      }
>>
>> -    public Set getDestinations() {
>> +    public Set getDestinations(){
>>          return persistenceAdapter.getDestinations();
>>      }
>>
>>      /**
>>       * @return instance of {@link Queue} or {@link Topic}
>>       */
>> -    public Destination createDestination(ConnectionContext context,
>> ActiveMQDestination destination, DestinationStatistics
>> destinationStatistics) throws Exception {
>> -        if (destination.isQueue()) {
>> -            if (destination.isTemporary()) {
>> -                final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> -                return new Queue(destination, memoryManager, null,
>> destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
>> -
>> -                    public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>> +    public Destination createDestination(ConnectionContext
>> context,ActiveMQDestination destination,
>> +            DestinationStatistics destinationStatistics) throws
>> Exception{
>> +        if(destination.isQueue()){
>> +            if(destination.isTemporary()){
>> +                final ActiveMQTempDestination
>> tempDest=(ActiveMQTempDestination)destination;
>> +                return new
>> Queue(broker.getRoot(),destination,memoryManager,null,
>> +
>> destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
>> +
>> +                    public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception{
>>                          // Only consumers on the same connection can
>> consume from
>>                          // the temporary destination
>> -                        if( !tempDest.getConnectionId().equals(
>> sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
>> +
>> if(!tempDest.getConnectionId().equals(sub.getConsumerInfo 
>> ().getConsumerId().getConnectionId())){
>>                              throw new JMSException("Cannot  
>> subscribe to
>> remote temporary destination: "+tempDest);
>>                          }
>> -                        super.addSubscription(context, sub);
>> +                        super.addSubscription(context,sub);
>>                      };
>>                  };
>> -            } else {
>> -                MessageStore store =
>> persistenceAdapter.createQueueMessageStore((ActiveMQQueue)  
>> destination);
>> -                Queue queue = new Queue(destination, memoryManager,
>> store, destinationStatistics,
>> taskRunnerFactory,broker.getTempDataStore());
>> -                configureQueue(queue, destination);
>> +            }else{
>> +                MessageStore
>> store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue) 
>> destination);
>> +                Queue queue=new
>> Queue(broker.getRoot(),destination,memoryManager,store,
>> +
>> destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
>> +                configureQueue(queue,destination);
>>                  queue.initialize();
>>                  return queue;
>>              }
>> -        } else if (destination.isTemporary()){
>> -            final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> -            return new Topic(destination, null, memoryManager,
>> destinationStatistics, taskRunnerFactory) {
>> -                public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>> +        }else if(destination.isTemporary()){
>> +            final ActiveMQTempDestination
>> tempDest=(ActiveMQTempDestination)destination;
>> +            return new
>> Topic(broker.getRoot(),destination,null,memoryManager,
>> +                    destinationStatistics,taskRunnerFactory){
>> +
>> +                public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception{
>>                      // Only consumers on the same connection can  
>> consume
>> from
>>                      // the temporary destination
>> -                    if( !tempDest.getConnectionId().equals(
>> sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
>> +
>> if(!tempDest.getConnectionId().equals(sub.getConsumerInfo 
>> ().getConsumerId().getConnectionId())){
>>                          throw new JMSException("Cannot subscribe to
>> remote temporary destination: "+tempDest);
>>                      }
>> -                    super.addSubscription(context, sub);
>> +                    super.addSubscription(context,sub);
>>                  };
>>              };
>> -        } else {
>> -            TopicMessageStore store = null;
>> -            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
>> -                store =
>> persistenceAdapter.createTopicMessageStore((ActiveMQTopic)  
>> destination);
>> +        }else{
>> +            TopicMessageStore store=null;
>> +            if(!AdvisorySupport.isAdvisoryTopic(destination)){
>> +
>> store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic) 
>> destination);
>>              }
>> -
>> -            Topic topic = new Topic(destination, store,  
>> memoryManager,
>> destinationStatistics, taskRunnerFactory);
>> -            configureTopic(topic, destination);
>> -
>> +            Topic topic=new
>> Topic(broker.getRoot(),destination,store,memoryManager,
>> +                    destinationStatistics,taskRunnerFactory);
>> +            configureTopic(topic,destination);
>>              return topic;
>>          }
>>      }
>>
>> -    protected void configureQueue(Queue queue, ActiveMQDestination
>> destination) {
>> -        if (broker == null) {
>> +    protected void configureQueue(Queue queue,ActiveMQDestination
>> destination){
>> +        if(broker==null){
>>              throw new IllegalStateException("broker property is not
>> set");
>>          }
>> -        if (broker.getDestinationPolicy() != null) {
>> -            PolicyEntry entry =
>> broker.getDestinationPolicy().getEntryFor(destination);
>> -            if (entry != null) {
>> +        if(broker.getDestinationPolicy()!=null){
>> +            PolicyEntry
>> entry=broker.getDestinationPolicy().getEntryFor(destination);
>> +            if(entry!=null){
>>                  entry.configure(queue,broker.getTempDataStore());
>>              }
>>          }
>>      }
>>
>> -    protected void configureTopic(Topic topic, ActiveMQDestination
>> destination) {
>> -        if (broker == null) {
>> +    protected void configureTopic(Topic topic,ActiveMQDestination
>> destination){
>> +        if(broker==null){
>>              throw new IllegalStateException("broker property is not
>> set");
>>          }
>> -        if (broker.getDestinationPolicy() != null) {
>> -            PolicyEntry entry =
>> broker.getDestinationPolicy().getEntryFor(destination);
>> -            if (entry != null) {
>> +        if(broker.getDestinationPolicy()!=null){
>> +            PolicyEntry
>> entry=broker.getDestinationPolicy().getEntryFor(destination);
>> +            if(entry!=null){
>>                  entry.configure(topic);
>>              }
>>          }
>>      }
>>
>> -    public long getLastMessageBrokerSequenceId() throws  
>> IOException {
>> +    public long getLastMessageBrokerSequenceId() throws IOException{
>>          return persistenceAdapter.getLastMessageBrokerSequenceId();
>>      }
>>
>> -    public PersistenceAdapter getPersistenceAdapter() {
>> +    public PersistenceAdapter getPersistenceAdapter(){
>>          return persistenceAdapter;
>>      }
>>
>> -    public SubscriptionInfo[] getAllDurableSubscriptions 
>> (ActiveMQTopic
>> topic) throws IOException {
>> +    public SubscriptionInfo[] getAllDurableSubscriptions 
>> (ActiveMQTopic
>> topic) throws IOException{
>>          return
>> persistenceAdapter.createTopicMessageStore 
>> (topic).getAllSubscriptions();
>>      }
>> -
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/PrefetchSubscription.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/PrefetchSubscription.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/PrefetchSubscription.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/PrefetchSubscription.java
>> Fri Jul 20 10:08:10 2007
>> @@ -276,17 +276,7 @@
>>       * @throws Exception
>>       */
>>      protected void sendToDLQ(final ConnectionContext context,final
>> MessageReference node) throws IOException,Exception{
>> -        // Send the message to the DLQ
>> -        Message message=node.getMessage();
>> -        if(message!=null){
>> -            // The original destination and transaction id do not  
>> get
>> filled when the message is first
>> -            // sent,
>> -            // it is only populated if the message is routed to  
>> another
>> destination like the DLQ
>> -            DeadLetterStrategy
>> deadLetterStrategy=node.getRegionDestination 
>> ().getDeadLetterStrategy();
>> -            ActiveMQDestination  
>> deadLetterDestination=deadLetterStrategy
>> -                    .getDeadLetterQueueFor(message.getDestination 
>> ());
>> -            BrokerSupport.resend 
>> (context,message,deadLetterDestination);
>> -        }
>> +        broker.sendToDeadLetterQueue(context,node);
>>      }
>>
>>      /**
>> @@ -393,7 +383,9 @@
>>                              // Message may have been sitting in the
>> pending list a while
>>                              // waiting for the consumer to ak the
>> message.
>>
>> if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
>> -                                continue; // just drop it.
>> +                                broker.messageExpired(getContext 
>> (),node);
>> +                                dequeueCounter++;
>> +                                continue;
>>                              }
>>                              dispatch(node);
>>                              count++;
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/Queue.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>> Fri Jul 20 10:08:10 2007
>> @@ -27,6 +27,7 @@
>>  import javax.jms.InvalidSelectorException;
>>  import javax.jms.JMSException;
>>
>> +import org.apache.activemq.broker.Broker;
>>  import org.apache.activemq.broker.ConnectionContext;
>>  import org.apache.activemq.broker.ProducerBrokerExchange;
>>  import  
>> org.apache.activemq.broker.region.cursors.PendingMessageCursor;
>> @@ -72,7 +73,6 @@
>>  public class Queue implements Destination, Task {
>>
>>      private final Log log;
>> -
>>      private final ActiveMQDestination destination;
>>      private final List consumers = new CopyOnWriteArrayList();
>>      private final Valve dispatchValve = new Valve(true);
>> @@ -96,9 +96,11 @@
>>      private final Object doDispatchMutex = new Object();
>>      private TaskRunner taskRunner;
>>      private boolean started = false;
>> +    final Broker broker;
>>
>> -    public Queue(ActiveMQDestination destination, final UsageManager
>> memoryManager, MessageStore store, DestinationStatistics parentStats,
>> +    public Queue(Broker broker,ActiveMQDestination destination,  
>> final
>> UsageManager memoryManager, MessageStore store, DestinationStatistics
>> parentStats,
>>              TaskRunnerFactory taskFactory, Store tmpStore) throws
>> Exception {
>> +        this.broker=broker;
>>          this.destination = destination;
>>          this.usageManager = new
>> UsageManager(memoryManager,destination.toString());
>>          this.usageManager.setUsagePortion(1.0f);
>> @@ -136,7 +138,8 @@
>>                      public void recoverMessage(Message message){
>>                          // Message could have expired while it  
>> was being
>> loaded..
>>                          if(message.isExpired()){
>> -                            // TODO remove from store
>> +
>> broker.messageExpired(createConnectionContext(),message);
>> +
>> destinationStatistics.getMessages().decrement();
>>                              return;
>>                          }
>>                          message.setRegionDestination(Queue.this);
>> @@ -342,9 +345,8 @@
>>          // There is delay between the client sending it and it  
>> arriving
>> at the
>>          // destination.. it may have expired.
>>          if(message.isExpired()){
>> -            if (log.isDebugEnabled()) {
>> -                log.debug("Expired message: " + message);
>> -            }
>> +            broker.messageExpired(context,message);
>> +            destinationStatistics.getMessages().decrement();
>>              if( ( !message.isResponseRequired() ||
>> producerExchange.getProducerState().getInfo().getWindowSize() >  
>> 0 ) &&
>> !context.isInRecoveryMode() ) {
>>          		ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo 
>> ().getProducerId(),
>> message.getSize());
>>  				context.getConnection().dispatchAsync(ack);	    	             
>> 	        		
>> @@ -365,9 +367,8 @@
>>          					
>>          					// While waiting for space to free up... the message  
>> may
>> have expired.
>>          			        if(message.isExpired()){
>> -        			            if (log.isDebugEnabled()) {
>> -        			                log.debug("Expired message: " + message);
>> -        			            }
>> +        			            broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>>          			
>>          			            if( !message.isResponseRequired() &&
>> !context.isInRecoveryMode() ) {
>>          			        		ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo 
>> ().getProducerId(),
>> message.getSize());
>> @@ -440,10 +441,8 @@
>>                          // It could take while before we receive the
>> commit
>>                          // op, by that time the message could have
>> expired..
>>  	                    if(message.isExpired()){
>> -	                        // TODO: remove message from store.
>> -	                        if (log.isDebugEnabled()) {
>> -	                            log.debug("Expired message: " +  
>> message);
>> -	                        }
>> +	                        broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>>  	                        return;
>>  	                    }
>>  	                    sendMessage(context,message);
>> @@ -1011,9 +1010,8 @@
>>                                  result.add(node);
>>                                  count++;
>>                              }else{
>> -                                if (log.isDebugEnabled()) {
>> -                                    log.debug("Expired message: " +
>> node);
>> -                                }
>> +
>> broker.messageExpired(createConnectionContext(),node);
>> +
>> destinationStatistics.getMessages().decrement();
>>                              }
>>                          }
>>                      }finally{
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/RegionBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/RegionBroker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/RegionBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/RegionBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -37,6 +37,7 @@
>>  import org.apache.activemq.broker.DestinationAlreadyExistsException;
>>  import org.apache.activemq.broker.ProducerBrokerExchange;
>>  import org.apache.activemq.broker.TransactionBroker;
>> +import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
>>  import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.broker.region.policy.PolicyMap;
>>  import
>> org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMe 
>> ssageStoragePolicy;
>> @@ -62,6 +63,7 @@
>>  import org.apache.activemq.store.PersistenceAdapter;
>>  import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
>>  import org.apache.activemq.thread.TaskRunnerFactory;
>> +import org.apache.activemq.util.BrokerSupport;
>>  import org.apache.activemq.util.IdGenerator;
>>  import org.apache.activemq.util.LongSequenceGenerator;
>>  import org.apache.activemq.util.ServiceStopper;
>> @@ -625,6 +627,52 @@
>>      public BrokerService getBrokerService(){
>>          return brokerService;
>>      }
>> -
>> -
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> node){
>> +        if(log.isDebugEnabled()){
>> +            log.debug("Message expired "+node);
>> +        }
>> +        getRoot().sendToDeadLetterQueue(context,node);
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference node){
>> +        try{
>> +            if(node!=null){
>> +                Message message=node.getMessage();
>> +                if(message!=null){
>> +                    DeadLetterStrategy
>> deadLetterStrategy=node.getRegionDestination 
>> ().getDeadLetterStrategy();
>> +                    if(deadLetterStrategy!=null){
>> +
>> if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
>> +                            long expiration=message.getExpiration();
>> +                            message.setExpiration(0);
>> +                            message.setProperty 
>> ("originalExpiration",new
>> Long(expiration));
>> +                            if(!message.isPersistent()){
>> +                                message.setPersistent(true);
>> +
>> message.setProperty("originalDeliveryMode","NON_PERSISTENT");
>> +                            }
>> +                            // The original destination and  
>> transaction
>> id do not get filled when the message is first
>> +                            // sent,
>> +                            // it is only populated if the  
>> message is
>> routed to another destination like the DLQ
>> +                            ActiveMQDestination
>> deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor 
>> (message
>> +                                    .getDestination());
>> +
>> BrokerSupport.resend(context,message,deadLetterDestination);
>> +                        }
>> +                    }
>> +                }else{
>> +                    log.warn("Null message for node: "+node);
>> +                }
>> +            }
>> +        }catch(Exception e){
>> +            log.warn("Failed to pass expired message to dead letter
>> queue");
>> +        }
>> +    }
>> +
>> +    public Broker getRoot(){
>> +        try{
>> +            return getBrokerService().getBroker();
>> +        }catch(Exception e){
>> +            log.fatal("Trying to get Root Broker "+e);
>> +            throw new RuntimeException("The broker from the  
>> BrokerService
>> should not throw an exception");
>> +        }
>> +    }
>>  }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TempQueueRegion.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/TempQueueRegion.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TempQueueRegion.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TempQueueRegion.java
>> Fri Jul 20 10:08:10 2007
>> @@ -41,7 +41,7 @@
>>
>>      protected Destination createDestination(ConnectionContext  
>> context,
>> ActiveMQDestination destination) throws Exception {
>>          final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> -        return new Queue(destination, memoryManager, null,
>> destinationStatistics, taskRunnerFactory, null) {
>> +        return new Queue(broker.getRoot(),destination,  
>> memoryManager,
>> null, destinationStatistics, taskRunnerFactory, null) {
>>
>>              public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Topic.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/Topic.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Topic.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Topic.java
>> Fri Jul 20 10:08:10 2007
>> @@ -24,6 +24,7 @@
>>  import java.util.concurrent.CopyOnWriteArrayList;
>>  import java.util.concurrent.CopyOnWriteArraySet;
>>  import org.apache.activemq.advisory.AdvisorySupport;
>> +import org.apache.activemq.broker.Broker;
>>  import org.apache.activemq.broker.ConnectionContext;
>>  import org.apache.activemq.broker.ProducerBrokerExchange;
>>  import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
>> @@ -72,10 +73,11 @@
>>      private boolean sendAdvisoryIfNoConsumers;
>>      private DeadLetterStrategy deadLetterStrategy = new
>> SharedDeadLetterStrategy();
>>      private final ConcurrentHashMap durableSubcribers = new
>> ConcurrentHashMap();
>> +    final Broker broker;
>>
>> -    public Topic(ActiveMQDestination destination, TopicMessageStore
>> store, UsageManager memoryManager, DestinationStatistics parentStats,
>> +    public Topic(Broker broker,ActiveMQDestination destination,
>> TopicMessageStore store, UsageManager memoryManager,  
>> DestinationStatistics
>> parentStats,
>>              TaskRunnerFactory taskFactory) {
>> -
>> +        this.broker=broker;
>>          this.destination = destination;
>>          this.store = store; //this could be NULL! (If an advsiory)
>>          this.usageManager = new
>> UsageManager(memoryManager,destination.toString());
>> @@ -261,9 +263,8 @@
>>      	// There is delay between the client sending it and it  
>> arriving at
>> the
>>      	// destination.. it may have expired.
>>      	if( message.isExpired() ) {
>> -            if (log.isDebugEnabled()) {
>> -                log.debug("Expired message: " + message);
>> -            }
>> +            broker.messageExpired(context,message);
>> +            destinationStatistics.getMessages().decrement();
>>              if( ( !message.isResponseRequired() ||
>> producerExchange.getProducerState().getInfo().getWindowSize() >  
>> 0 ) &&
>> !context.isInRecoveryMode() ) {
>>          		ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo 
>> ().getProducerId(),
>> message.getSize());
>>  				context.getConnection().dispatchAsync(ack);	    	             
>> 	        		
>> @@ -285,9 +286,8 @@
>>          					
>>          					// While waiting for space to free up... the message  
>> may
>> have expired.
>>          			        if(message.isExpired()){
>> -        			            if (log.isDebugEnabled()) {
>> -        			                log.debug("Expired message: " + message);
>> -        			            }
>> +        			            broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>>          			
>>          			            if( !message.isResponseRequired() &&
>> !context.isInRecoveryMode() ) {
>>          			        		ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo 
>> ().getProducerId(),
>> message.getSize());
>> @@ -357,7 +357,9 @@
>>                      	// It could take while before we receive the  
>> commit
>>                      	// operration.. by that time the message  
>> could have
>> expired..
>>                      	if( message.isExpired() ) {
>> -                    		// TODO: remove message from store.
>> +                    		broker.messageExpired(context,message);
>> +                            message.decrementReferenceCount();
>> +
>> destinationStatistics.getMessages().decrement();
>>                      		return;
>>                      	}
>>                          dispatch(context, message);
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TopicSubscription.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/TopicSubscription.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TopicSubscription.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TopicSubscription.java
>> Fri Jul 20 10:08:10 2007
>> @@ -103,12 +103,7 @@
>>                              int messagesToEvict=oldMessages.length;
>>                              for(int i=0;i<messagesToEvict;i++){
>>                                  MessageReference
>> oldMessage=oldMessages[i];
>> -                                oldMessage.decrementReferenceCount 
>> ();
>> -                                matched.remove(oldMessage);
>> -                                discarded++;
>> -                                if(log.isDebugEnabled()){
>> -                                    log.debug("Discarding message
>> "+oldMessages[i]);
>> -                                }
>> +                                discard(oldMessage);
>>                              }
>>                              // lets avoid an infinite loop if we are
>> given a bad eviction strategy
>>                              // for a bad strategy lets just not  
>> evict
>> @@ -138,6 +133,7 @@
>>                      matched.remove();
>>                      dispatchedCounter.incrementAndGet();
>>                      node.decrementReferenceCount();
>> +                    broker.messageExpired(getContext(),node);
>>                      break;
>>                  }
>>              }
>> @@ -367,6 +363,8 @@
>>                      // waiting for the consumer to ak the message.
>>                      if(message.isExpired()){
>>                          message.decrementReferenceCount();
>> +                        broker.messageExpired(getContext(),message);
>> +                        dequeueCounter.incrementAndGet();
>>                          continue; // just drop it.
>>                      }
>>                      dispatch(message);
>> @@ -409,6 +407,17 @@
>>
>> node.getRegionDestination().getDestinationStatistics 
>> ().getDispatched().increment();
>>              node.decrementReferenceCount();
>>          }
>> +    }
>> +
>> +    private void discard(MessageReference message) {
>> +        message.decrementReferenceCount();
>> +        matched.remove(message);
>> +        discarded++;
>> +        dequeueCounter.incrementAndGet();
>> +        if(log.isDebugEnabled()){
>> +            log.debug("Discarding message "+message);
>> +        }
>> +        broker.getRoot().sendToDeadLetterQueue(getContext 
>> (),message);
>>      }
>>
>>      public String toString(){
>>
>> Added:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/policy/ 
>> AbstractDeadLetterStrategy.java?view=auto&rev=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> (added)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -0,0 +1,74 @@
>> +/**
>> + *
>> + * Licensed to the Apache Software Foundation (ASF) under one or  
>> more
>> + * contributor license agreements.  See the NOTICE file  
>> distributed with
>> + * this work for additional information regarding copyright  
>> ownership.
>> + * The ASF licenses this file to You under the Apache License,  
>> Version
>> 2.0
>> + * (the "License"); you may not use this file except in  
>> compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,  
>> software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing  
>> permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.activemq.broker.region.policy;
>> +
>> +import org.apache.activemq.command.Message;
>> +
>> +/**
>> + * A strategy for choosing which destination is used for dead letter
>> queue messages.
>> + *
>> + * @version $Revision: 426366 $
>> + */
>> +public abstract  class AbstractDeadLetterStrategy implements
>> DeadLetterStrategy {
>> +    private boolean processNonPersistent=true;
>> +    private boolean processExpired=true;
>> +
>> +    public boolean isSendToDeadLetterQueue(Message message){
>> +        boolean result=false;
>> +        if(message!=null){
>> +            result=true;
>> +
>> if(message.isPersistent()==false&&processNonPersistent==false){
>> +                result=false;
>> +            }
>> +            if(message.isExpired()&&processExpired==false){
>> +                result=false;
>> +            }
>> +        }
>> +        return result;
>> +    }
>> +
>> +    /**
>> +     * @return the processExpired
>> +     */
>> +    public boolean isProcessExpired(){
>> +        return this.processExpired;
>> +    }
>> +
>> +    /**
>> +     * @param processExpired the processExpired to set
>> +     */
>> +    public void setProcessExpired(boolean processExpired){
>> +        this.processExpired=processExpired;
>> +    }
>> +
>> +    /**
>> +     * @return the processNonPersistent
>> +     */
>> +    public boolean isProcessNonPersistent(){
>> +        return this.processNonPersistent;
>> +    }
>> +
>> +    /**
>> +     * @param processNonPersistent the processNonPersistent to set
>> +     */
>> +    public void setProcessNonPersistent(boolean  
>> processNonPersistent){
>> +        this.processNonPersistent=processNonPersistent;
>> +    }
>> +
>> +
>> +}
>>
>> Propchange:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> --------------------------------------------------------------------- 
>> ---------
>>     svn:eol-style = native
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/DeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/policy/ 
>> DeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/DeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/DeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -18,6 +18,7 @@
>>  package org.apache.activemq.broker.region.policy;
>>
>>  import org.apache.activemq.command.ActiveMQDestination;
>> +import org.apache.activemq.command.Message;
>>
>>  /**
>>   * A strategy for choosing which destination is used for dead letter
>> queue messages.
>> @@ -25,6 +26,14 @@
>>   * @version $Revision$
>>   */
>>  public interface DeadLetterStrategy {
>> +
>> +    /**
>> +     * Allow pluggable strategy for deciding if message should be  
>> sent to
>> a dead letter queue
>> +     * for example, you might not want to ignore expired or
>> non-persistent messages
>> +     * @param message
>> +     * @return true if message should be sent to a dead letter queue
>> +     */
>> +    public boolean isSendToDeadLetterQueue(Message message);
>>
>>      /**
>>       * Returns the dead letter queue for the given destination.
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/policy/ 
>> IndividualDeadLetterStrategy.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -29,7 +29,7 @@
>>   *
>>   * @version $Revision$
>>   */
>> -public class IndividualDeadLetterStrategy implements  
>> DeadLetterStrategy {
>> +public class IndividualDeadLetterStrategy extends
>> AbstractDeadLetterStrategy {
>>
>>      private String topicPrefix = "ActiveMQ.DLQ.Topic.";
>>      private String queuePrefix = "ActiveMQ.DLQ.Queue.";
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/SharedDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/ 
>> java/org/apache/activemq/broker/region/policy/ 
>> SharedDeadLetterStrategy.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/SharedDeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/SharedDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -29,7 +29,7 @@
>>   *
>>   * @version $Revision$
>>   */
>> -public class SharedDeadLetterStrategy implements  
>> DeadLetterStrategy {
>> +public class SharedDeadLetterStrategy extends  
>> AbstractDeadLetterStrategy
>> {
>>
>>      private ActiveMQDestination deadLetterQueue = new
>> ActiveMQQueue("ActiveMQ.DLQ");
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ 
>> broker/StubBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/ 
>> java/org/apache/activemq/broker/StubBroker.java? 
>> view=diff&rev=558054&r1=558053&r2=558054
>> ===================================================================== 
>> =========
>> ---
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ 
>> broker/StubBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ 
>> broker/StubBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -19,6 +19,7 @@
>>  package org.apache.activemq.broker;
>>
>>  import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>>  import org.apache.activemq.broker.region.Subscription;
>>  import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess 
>> ageStoragePolicy;
>>  import org.apache.activemq.command.ActiveMQDestination;
>> @@ -243,5 +244,15 @@
>>
>>      public BrokerService getBrokerService(){
>>          return null;
>> +    }
>> +
>> +    public void messageExpired(ConnectionContext  
>> context,MessageReference
>> messageReference){
>> +    }
>> +
>> +    public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference) {
>> +    }
>> +
>> +    public Broker getRoot(){
>> +        return this;
>>      }
>>  }
>>
>>
>>
>>
> Quoted from:
> http://www.nabble.com/svn-commit%3A-r558054---in--activemq-trunk- 
> activemq-core-src%3A-main-java-org-apache-activemq-advisory--main- 
> java-org-apache-activemq-broker--main-java-org-apache-activemq- 
> broker-region--main-java-org-apache-activemq-broker-region-policy-- 
> test-java-org-apa...-tf4118408s2354.html#a11712223
>


Mime
View raw message