activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r561026 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ broker/ft/ broker/region/ broker/region/policy/ broker/util/ command/
Date Mon, 30 Jul 2007 16:01:39 GMT
Author: rajdavies
Date: Mon Jul 30 09:01:37 2007
New Revision: 561026

URL: http://svn.apache.org/viewvc?view=rev&rev=561026
Log:
Added support for length of time messages are processed by the broker - 
fix for https://issues.apache.org/activemq/browse/AMQ-1160,
https://issues.apache.org/activemq/browse/AMQ-1072,
https://issues.apache.org/activemq/browse/AMQ-936 
and ground work for for https://issues.apache.org/activemq/browse/AMQ-567

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
Mon Jul 30 09:01:37 2007
@@ -142,6 +142,7 @@
         jmxProperties.put("JMSXGroupID", "1");
         jmxProperties.put("JMSXGroupSeq", "1");
         jmxProperties.put("JMSXDeliveryCount","1");
+        jmxProperties.put("JMSXProducerTXID","1");
         return jmxProperties.keys();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Jul 30 09:01:37 2007
@@ -896,8 +896,11 @@
                             ActiveMQMessage message=createActiveMQMessage(md);
                             beforeMessageIsConsumed(md);
                             try{
-                                listener.onMessage(message);
-                                afterMessageIsConsumed(md,false);
+                                boolean expired=message.isExpired();
+                                if(!expired){
+                                    listener.onMessage(message);
+                                }
+                                afterMessageIsConsumed(md,expired);
                             }catch(RuntimeException e){
                                 if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){
                                     // Redeliver the message

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Mon
Jul 30 09:01:37 2007
@@ -186,12 +186,17 @@
      */
     BrokerInfo[] getPeerBrokerInfos();
     
-    
+    /**
+     * Notify the Broker that a dispatch is going to happen
+     * @param messageDispatch
+     */
+    public void preProcessDispatch(MessageDispatch messageDispatch);
+       
     /**
      * Notify the Broker that a dispatch has happened
      * @param messageDispatch
      */
-    public void processDispatch(MessageDispatch messageDispatch);
+    public void postProcessDispatch(MessageDispatch messageDispatch);
   
     /**
      * @return true if the broker has stopped
@@ -264,10 +269,17 @@
     Broker getRoot();
     
     /**
+     * Determine if a message has expired -allows default behaviour to be overriden - 
+     * as the timestamp set by the producer can be out of sync with the broker
+     * @param messageReference
+     * @return true if the message is expired
+     */
+    public boolean isExpired(MessageReference messageReference);
+    
+    /**
      * A Message has Expired
      * @param context
      * @param messageReference
-     * @throws Exception 
      */
     public void messageExpired(ConnectionContext context, MessageReference messageReference);
     
@@ -275,7 +287,8 @@
      * A message needs to go the a DLQ
      * @param context
      * @param messageReference
-     * @throws Exception
      */
     public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference);
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Mon Jul 30 09:01:37 2007
@@ -51,189 +51,190 @@
     
     final protected Broker next;
 
-    public BrokerFilter(Broker next) {
+    public BrokerFilter(Broker next){
         this.next=next;
     }
-   
+
     public Broker getAdaptor(Class type){
-        if (type.isInstance(this)){
+        if(type.isInstance(this)){
             return this;
         }
         return next.getAdaptor(type);
     }
 
-    public Map getDestinationMap() {
+    public Map getDestinationMap(){
         return next.getDestinationMap();
     }
 
-    public Set getDestinations(ActiveMQDestination destination) {
+    public Set getDestinations(ActiveMQDestination destination){
         return next.getDestinations(destination);
     }
 
-    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws
Exception {
-        next.acknowledge(consumerExchange, ack);
+    public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws
Exception{
+        next.acknowledge(consumerExchange,ack);
     }
 
-    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
{
-        return next.messagePull(context, pull);
+    public Response messagePull(ConnectionContext context,MessagePull pull) throws Exception{
+        return next.messagePull(context,pull);
     }
 
-    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
-        next.addConnection(context, info);
+    public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception{
+        next.addConnection(context,info);
     }
 
-    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
-        return next.addConsumer(context, info);
+    public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
+        return next.addConsumer(context,info);
     }
 
-    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception
{
-        next.addProducer(context, info);
+    public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{
+        next.addProducer(context,info);
     }
 
-    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase)
throws Exception {
-        next.commitTransaction(context, xid, onePhase);
+    public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase)
throws Exception{
+        next.commitTransaction(context,xid,onePhase);
     }
 
-    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info)
throws Exception {
-        next.removeSubscription(context, info);
+    public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info)
throws Exception{
+        next.removeSubscription(context,info);
     }
 
-    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception
{
+    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception{
         return next.getPreparedTransactions(context);
     }
 
-    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception
{
-        return next.prepareTransaction(context, xid);
+    public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        return next.prepareTransaction(context,xid);
     }
 
-    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error) throws Exception {
-        next.removeConnection(context, info, error);
+    public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable
error) throws Exception{
+        next.removeConnection(context,info,error);
     }
 
-    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
{
-        next.removeConsumer(context, info);
+    public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
+        next.removeConsumer(context,info);
     }
 
-    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception
{
-        next.removeProducer(context, info);
+    public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception{
+        next.removeProducer(context,info);
     }
 
-    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws
Exception {
-        next.rollbackTransaction(context, xid);
+    public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        next.rollbackTransaction(context,xid);
     }
 
-    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws
Exception {
-        next.send(producerExchange, messageSend);
+    public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws
Exception{
+        next.send(producerExchange,messageSend);
     }
 
-    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception
{
-        next.beginTransaction(context, xid);
+    public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        next.beginTransaction(context,xid);
     }
 
-    public void forgetTransaction(ConnectionContext context, TransactionId transactionId)
throws Exception {
-        next.forgetTransaction(context, transactionId);
+    public void forgetTransaction(ConnectionContext context,TransactionId transactionId)
throws Exception{
+        next.forgetTransaction(context,transactionId);
     }
 
-    public Connection[] getClients() throws Exception {
+    public Connection[] getClients() throws Exception{
         return next.getClients();
     }
 
-    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination)
throws Exception {
-        return next.addDestination(context, destination);
+    public Destination addDestination(ConnectionContext context,ActiveMQDestination destination)
throws Exception{
+        return next.addDestination(context,destination);
     }
 
-    public void removeDestination(ConnectionContext context, ActiveMQDestination destination,
long timeout) throws Exception {
-        next.removeDestination(context, destination, timeout);
+    public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long
timeout)
+            throws Exception{
+        next.removeDestination(context,destination,timeout);
     }
 
-    public ActiveMQDestination[] getDestinations() throws Exception {
+    public ActiveMQDestination[] getDestinations() throws Exception{
         return next.getDestinations();
     }
 
-    public void start() throws Exception {
+    public void start() throws Exception{
         next.start();
     }
 
-    public void stop() throws Exception {
+    public void stop() throws Exception{
         next.stop();
     }
 
-    public void addSession(ConnectionContext context, SessionInfo info) throws Exception
{
-        next.addSession(context, info);
+    public void addSession(ConnectionContext context,SessionInfo info) throws Exception{
+        next.addSession(context,info);
     }
 
-    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception
{
-        next.removeSession(context, info);
+    public void removeSession(ConnectionContext context,SessionInfo info) throws Exception{
+        next.removeSession(context,info);
     }
 
-    public BrokerId getBrokerId() {
+    public BrokerId getBrokerId(){
         return next.getBrokerId();
     }
 
-    public String getBrokerName() {
+    public String getBrokerName(){
         return next.getBrokerName();
     }
-	
-    public void gc() {
+
+    public void gc(){
         next.gc();
     }
 
-
     public void addBroker(Connection connection,BrokerInfo info){
-        next.addBroker(connection, info);
+        next.addBroker(connection,info);
     }
-    
+
     public void removeBroker(Connection connection,BrokerInfo info){
-        next.removeBroker(connection, info);
+        next.removeBroker(connection,info);
     }
 
-
     public BrokerInfo[] getPeerBrokerInfos(){
         return next.getPeerBrokerInfos();
     }
-    
-    public void processDispatch(MessageDispatch messageDispatch){
-        next.processDispatch(messageDispatch);
+
+    public void preProcessDispatch(MessageDispatch messageDispatch){
+        next.preProcessDispatch(messageDispatch);
+    }
+
+    public void postProcessDispatch(MessageDispatch messageDispatch){
+        next.postProcessDispatch(messageDispatch);
     }
-    
+
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception{
         next.processDispatchNotification(messageDispatchNotification);
     }
-        
+
     public boolean isStopped(){
         return next.isStopped();
     }
-    
+
     public Set getDurableDestinations(){
         return next.getDurableDestinations();
     }
-    
+
     public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
-        next.addDestinationInfo(context, info);
-        
+        next.addDestinationInfo(context,info);
     }
 
     public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
-        next.removeDestinationInfo(context, info);
-        
+        next.removeDestinationInfo(context,info);
     }
 
     public boolean isFaultTolerantConfiguration(){
         return next.isFaultTolerantConfiguration();
     }
 
-    public ConnectionContext getAdminConnectionContext() {
+    public ConnectionContext getAdminConnectionContext(){
         return next.getAdminConnectionContext();
     }
 
-    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
+    public void setAdminConnectionContext(ConnectionContext adminConnectionContext){
         next.setAdminConnectionContext(adminConnectionContext);
     }
-    
-    public Store getTempDataStore() {
+
+    public Store getTempDataStore(){
         return next.getTempDataStore();
     }
-    
+
     public URI getVmConnectorURI(){
         return next.getVmConnectorURI();
     }
@@ -241,20 +242,24 @@
     public void brokerServiceStarted(){
         next.brokerServiceStarted();
     }
-    
+
     public BrokerService getBrokerService(){
         return next.getBrokerService();
     }
 
+    public boolean isExpired(MessageReference messageReference){
+        return next.isExpired(messageReference);
+    }
+
     public void messageExpired(ConnectionContext context,MessageReference message){
-        next.messageExpired(context,message); 
+        next.messageExpired(context,message);
     }
 
     public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
-       next.sendToDeadLetterQueue(context,messageReference);       
+        next.sendToDeadLetterQueue(context,messageReference);
     }
 
-    public Broker getRoot() {
-       return next.getRoot();
+    public Broker getRoot(){
+        return next.getRoot();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Mon Jul 30 09:01:37 2007
@@ -185,13 +185,10 @@
         return null;
     }
 
-    /**
-     * Notifiy the Broker that a dispatch has happened
-     * 
-     * @param messageDispatch
-     */
-    public void processDispatch(MessageDispatch messageDispatch) {
-
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+    }
+    
+    public void postProcessDispatch(MessageDispatch messageDispatch) {
     }
 
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
@@ -245,6 +242,10 @@
         return null;
     }
 
+    public boolean isExpired(MessageReference messageReference) {
+        return false;
+    }
+    
     public void messageExpired(ConnectionContext context,MessageReference message){     
  
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Mon Jul 30 09:01:37 2007
@@ -188,7 +188,11 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public void processDispatch(MessageDispatch messageDispatch) {
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public void postProcessDispatch(MessageDispatch messageDispatch) {
         throw new BrokerStoppedException(this.message);
     }
 
@@ -242,6 +246,10 @@
     }
     
     public BrokerService getBrokerService(){
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public boolean isExpired(MessageReference messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Mon Jul 30 09:01:37 2007
@@ -200,8 +200,12 @@
        return getNext().getPeerBrokerInfos();
     }
     
-    public void processDispatch(MessageDispatch messageDispatch){
-        getNext().processDispatch(messageDispatch);
+    public void preProcessDispatch(MessageDispatch messageDispatch){
+        getNext().preProcessDispatch(messageDispatch);
+    }
+    
+    public void postProcessDispatch(MessageDispatch messageDispatch){
+        getNext().postProcessDispatch(messageDispatch);
     }
     
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception{
@@ -259,6 +263,9 @@
         return getNext().getBrokerService();
     }
 
+    public boolean isExpired(MessageReference messageReference) {
+        return getNext().isExpired(messageReference);
+    }
    
     public void messageExpired(ConnectionContext context,MessageReference message){
         getNext().messageExpired(context,message);        

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Jul 30 09:01:37 2007
@@ -741,7 +741,7 @@
             if(message.isMessageDispatch()) {
                 MessageDispatch md=(MessageDispatch) message;
                 Runnable sub=md.getTransmitCallback();
-                broker.processDispatch(md);
+                broker.postProcessDispatch(md);
                 if(sub!=null){
                     sub.run();
                 }
@@ -749,25 +749,26 @@
         }
     }
 
-    protected void processDispatch(Command command) throws IOException {
-        try {
-            if( !disposed.get() ) {
-                 dispatch(command);
+    protected void processDispatch(Command command) throws IOException{
+        final MessageDispatch messageDispatch=(MessageDispatch)(command.isMessageDispatch()?command:null);
+        try{
+            if(!disposed.get()){
+                if(messageDispatch!=null){
+                    broker.preProcessDispatch(messageDispatch);
+                }
+                dispatch(command);
             }
-       } finally {
-
-            if(command.isMessageDispatch()){
-                MessageDispatch md=(MessageDispatch) command;
-                Runnable sub=md.getTransmitCallback();
-                broker.processDispatch(md);
+        }finally{
+            if(messageDispatch!=null){
+                Runnable sub=messageDispatch.getTransmitCallback();
+                broker.postProcessDispatch(messageDispatch);
                 if(sub!=null){
                     sub.run();
                 }
             }
-
             getStatistics().getDequeues().increment();
         }
-     }   
+    }   
 
 
 
@@ -918,7 +919,7 @@
 		        if(command.isMessageDispatch()) {
 		            MessageDispatch md=(MessageDispatch) command;
 		            Runnable sub=md.getTransmitCallback();
-		            broker.processDispatch(md);
+		            broker.postProcessDispatch(md);
 		            if(sub!=null){
 		                sub.run();
 		            }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Mon Jul 30 09:01:37 2007
@@ -283,7 +283,7 @@
      * 
      * @param messageDispatch
      */
-    public void processDispatch(MessageDispatch messageDispatch){
+    public void postProcessDispatch(MessageDispatch messageDispatch){
         MessageDispatchNotification mdn=new MessageDispatchNotification();
         mdn.setConsumerId(messageDispatch.getConsumerId());
         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
@@ -293,7 +293,7 @@
             mdn.setMessageId(msg.getMessageId());
             sendAsyncToSlave(mdn);
         }
-        super.processDispatch(messageDispatch);
+        super.postProcessDispatch(messageDispatch);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Jul 30 09:01:37 2007
@@ -382,7 +382,7 @@
                             pending.remove();
                             // Message may have been sitting in the pending list a while
                             // waiting for the consumer to ak the message.
-                            if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                            if(node!=QueueMessageReference.NULL_MESSAGE&&broker.isExpired(node)){
                                 broker.messageExpired(getContext(),node);
                                 dequeueCounter++;
                                 continue;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Jul 30 09:01:37 2007
@@ -137,7 +137,7 @@
 
                     public boolean recoverMessage(Message message){
                         // Message could have expired while it was being loaded..
-                        if(message.isExpired()){
+                        if(broker.isExpired(message)){
                             broker.messageExpired(createConnectionContext(),message);
                             destinationStatistics.getMessages().decrement();
                             return true;
@@ -379,7 +379,7 @@
 	            	        try {							
 	        			        
 	        					// While waiting for space to free up... the message may have expired.
-	            	        	if(message.isExpired()) {
+	            	        	if(broker.isExpired(message)) {
 	        			            broker.messageExpired(context,message);
 	                                destinationStatistics.getMessages().decrement();
 	        			        } else {
@@ -455,7 +455,7 @@
                 	try { 
                         // It could take while before we receive the commit
                         // op, by that time the message could have expired..
-	                    if(message.isExpired()){
+	                    if(broker.isExpired(message)){
 	                        broker.messageExpired(context,message);
                             destinationStatistics.getMessages().decrement();
 	                        return;
@@ -1014,7 +1014,7 @@
                         while(messages.hasNext()&&count<toPageIn){
                             MessageReference node=messages.next();
                             messages.remove();
-                            if(!node.isExpired()){
+                            if(!broker.isExpired(node)){
                                 node=createMessageReference(node.getMessage());
                                 result.add(node);
                                 count++;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Jul 30 09:01:37 2007
@@ -385,6 +385,7 @@
     public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
         long si=sequenceGenerator.getNextSequenceId();
         message.getMessageId().setBrokerSequenceId(si);
+        message.setBrokerInTime(System.currentTimeMillis());
         if(producerExchange.isMutable()||producerExchange.getRegion()==null){
             ActiveMQDestination destination=message.getDestination();
             // ensure the destination is registered with the RegionBroker
@@ -538,8 +539,14 @@
         return result;
     }
     
-    public void processDispatch(MessageDispatch messageDispatch){
-        
+    public void preProcessDispatch(MessageDispatch messageDispatch){ 
+        Message message = messageDispatch.getMessage();
+        if(message != null) {
+            message.setBrokerOutTime(System.currentTimeMillis());
+        }
+    }
+    
+    public void postProcessDispatch(MessageDispatch messageDispatch){   
     }
     
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
@@ -623,6 +630,10 @@
 
     public BrokerService getBrokerService(){
         return brokerService;
+    }
+    
+    public boolean isExpired(MessageReference messageReference) {
+        return messageReference.isExpired();
     }
 
     public void messageExpired(ConnectionContext context,MessageReference node){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Jul 30 09:01:37 2007
@@ -263,7 +263,7 @@
     	
     	// There is delay between the client sending it and it arriving at the
     	// destination.. it may have expired.
-    	if( message.isExpired() ) {
+    	if( broker.isExpired(message) ) {
             broker.messageExpired(context,message);
             destinationStatistics.getMessages().decrement();
             if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize()
> 0 ) && !context.isInRecoveryMode() ) {
@@ -286,7 +286,7 @@
         				public void run() {
         					
         					// While waiting for space to free up... the message may have expired.
-        			        if(message.isExpired()){
+        			        if(broker.isExpired(message)){
         			            broker.messageExpired(context,message);
                                 destinationStatistics.getMessages().decrement();
         			            
@@ -357,7 +357,7 @@
                     public void afterCommit() throws Exception {
                     	// It could take while before we receive the commit
                     	// operration.. by that time the message could have expired..
-                    	if( message.isExpired() ) {
+                    	if(broker.isExpired(message) ) {
                     		broker.messageExpired(context,message);
                             message.decrementReferenceCount();
                             destinationStatistics.getMessages().decrement();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Jul 30 09:01:37 2007
@@ -129,7 +129,7 @@
             matched.reset();
             while(matched.hasNext()){
                 MessageReference node=matched.next();
-                if(node.isExpired()){
+                if(broker.isExpired(node)){
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
@@ -361,7 +361,7 @@
                     matched.remove();
                     // Message may have been sitting in the matched list a while
                     // waiting for the consumer to ak the message.
-                    if(message.isExpired()){
+                    if(broker.isExpired(message)){
                         message.decrementReferenceCount();
                         broker.messageExpired(getContext(),message);
                         dequeueCounter.incrementAndGet();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
Mon Jul 30 09:01:37 2007
@@ -28,6 +28,7 @@
     private boolean processNonPersistent=true;
     private boolean processExpired=true;
     
+    
     public boolean isSendToDeadLetterQueue(Message message){
         boolean result=false;
         if(message!=null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
Mon Jul 30 09:01:37 2007
@@ -18,7 +18,6 @@
 package org.apache.activemq.broker.util;
 
 import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.Message;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
Mon Jul 30 09:01:37 2007
@@ -189,9 +189,9 @@
 		return super.prepareTransaction(context, xid);
 	}
 
-	public void processDispatch(MessageDispatch messageDispatch) {
+	public void postProcessDispatch(MessageDispatch messageDispatch) {
     	trace(messageDispatch);
-		super.processDispatch(messageDispatch);
+		super.postProcessDispatch(messageDispatch);
 	}
 
 	public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?view=diff&rev=561026&r1=561025&r2=561026
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon
Jul 30 09:01:37 2007
@@ -54,6 +54,8 @@
     protected long expiration;
     protected long timestamp;
     protected long arrival;
+    protected long brokerInTime;
+    protected long brokerOutTime;
     protected String correlationId;
     protected ActiveMQDestination replyTo;
     protected boolean persistent;
@@ -83,6 +85,8 @@
     private BrokerId [] brokerPath;
     protected boolean droppable = false;
     private BrokerId [] cluster;
+    
+    
 
     abstract public Message copy();
     
@@ -123,6 +127,8 @@
         copy.arrival = arrival;
         copy.connection = connection;
         copy.regionDestination = regionDestination;
+        copy.brokerInTime=brokerInTime;
+        copy.brokerOutTime=brokerOutTime;
         //copying the broker path breaks networks - if a consumer re-uses a consumed
         //message and forwards it on
         //copy.brokerPath = brokerPath;
@@ -629,5 +635,27 @@
     
     public boolean isMessage() {
         return true;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerInTime(){
+        return this.brokerInTime;
+    }
+
+    public void setBrokerInTime(long brokerInTime){
+        this.brokerInTime=brokerInTime;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerOutTime(){
+        return this.brokerOutTime;
+    }
+
+    public void setBrokerOutTime(long brokerOutTime){
+        this.brokerOutTime=brokerOutTime;
     }
 }



Mime
View raw message