activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1471420 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker: jmx/ region/ region/policy/
Date Wed, 24 Apr 2013 14:01:16 GMT
Author: dejanb
Date: Wed Apr 24 14:01:16 2013
New Revision: 1471420

URL: http://svn.apache.org/r1471420
Log:
https://issues.apache.org/jira/browse/AMQ-4483 - improve dlq managment

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
Wed Apr 24 14:01:16 2013
@@ -473,4 +473,9 @@ public class DestinationView implements 
         return optionsString;
     }
 
+    @Override
+    public boolean isDLQ() {
+        return destination.isDLQ();
+    }
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Wed Apr 24 14:01:16 2013
@@ -365,4 +365,10 @@ public interface DestinationViewMBean {
     @MBeanInfo("returns the destination options, name value pairs as URL queryString")
     String getOptions();
 
+    /**
+     * @return true if this is dead letter queue
+     */
+    @MBeanInfo("Dead Letter Queue")
+    boolean isDLQ();
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Wed Apr 24 14:01:16 2013
@@ -102,6 +102,11 @@ public class QueueView extends Destinati
         return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination,
maximumMessages);
     }
 
+    public int retryMessages() throws Exception {
+        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
+        return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE);
+    }
+
     /**
      * Moves a message back to its original destination
      */

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Wed Apr 24 14:01:16 2013
@@ -129,6 +129,12 @@ public interface QueueViewMBean extends 
      */
     @MBeanInfo("Moves up to a specified number of messages based on an SQL-92 selecton on
the message headers or XPATH on the body into the specified destination.")
     int moveMatchingMessagesTo(@MBeanInfo("selector") String selector, @MBeanInfo("destinationName")
String destinationName, @MBeanInfo("maximumMessages") int maximumMessages) throws Exception;
+
+    /**
+     * Retries messages sent to the DLQ
+     */
+    @MBeanInfo("Retries messages sent to the DLQ")
+    public int retryMessages() throws Exception;
     
     /**
      * @return true if the message cursor has memory space available

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Apr 24 14:01:16 2013
@@ -779,4 +779,8 @@ public abstract class BaseDestination im
         return ack;
     }
 
+    public boolean isDLQ() {
+        return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
+    }
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Apr 24 14:01:16 2013
@@ -237,4 +237,6 @@ public interface Destination extends Ser
     void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
 
     public void clearPendingMessages();
+
+    public boolean isDLQ();
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Apr 24 14:01:16 2013
@@ -382,4 +382,8 @@ public class DestinationFilter implement
         next.clearPendingMessages();
     }
 
+    @Override
+    public boolean isDLQ() {
+        return next.isDLQ();
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Apr 24 14:01:16 2013
@@ -1404,6 +1404,35 @@ public class Queue extends BaseDestinati
         return movedCounter;
     }
 
+    public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception
{
+        if (!isDLQ()) {
+            throw new Exception("Retry of message is only possible on Dead Letter Queues!");
+        }
+        int restoredCounter = 0;
+        Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
+        do {
+            doPageIn(true);
+            pagedInMessagesLock.readLock().lock();
+            try{
+                set.addAll(pagedInMessages.values());
+            }finally {
+                pagedInMessagesLock.readLock().unlock();
+            }
+            List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
+            for (QueueMessageReference ref : list) {
+                if (ref.getMessage().getOriginalDestination() != null) {
+
+                    moveMessageTo(context, ref, ref.getMessage().getOriginalDestination());
+                    set.remove(ref);
+                    if (++restoredCounter >= maximumMessages && maximumMessages
> 0) {
+                        return restoredCounter;
+                    }
+                }
+            }
+        } while (set.size() < this.destinationStatistics.getMessages().getCount() &&
set.size() < maximumMessages);
+        return restoredCounter;
+    }
+
     /**
      * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
@@ -1672,6 +1701,12 @@ public class Queue extends BaseDestinati
         destinationStatistics.getExpired().increment();
         try {
             removeMessage(context, subs, (QueueMessageReference) reference);
+            messagesLock.writeLock().lock();
+            try {
+                messages.rollback(reference.getMessageId());
+            } finally {
+                messagesLock.writeLock().unlock();
+            }
         } catch (IOException e) {
             LOG.error("Failed to remove expired Message from the store ", e);
         }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
Wed Apr 24 14:01:16 2013
@@ -60,4 +60,6 @@ public interface DeadLetterStrategy {
      */
     public void setProcessNonPersistent(boolean processNonPersistent);
 
+    public boolean isDLQ(ActiveMQDestination destination);
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
Wed Apr 24 14:01:16 2013
@@ -167,4 +167,19 @@ public class IndividualDeadLetterStrateg
             return new ActiveMQTopic(name);
         }
     }
+
+    @Override
+    public boolean isDLQ(ActiveMQDestination destination) {
+        String name = destination.getPhysicalName();
+        if (destination.isQueue()) {
+            if ((queuePrefix != null && name.startsWith(queuePrefix)) || (queueSuffix
!= null && name.endsWith(queueSuffix))) {
+                return true;
+            }
+        } else {
+            if ((topicPrefix != null && name.startsWith(topicPrefix)) || (topicSuffix
!= null && name.endsWith(topicSuffix))) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java?rev=1471420&r1=1471419&r2=1471420&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
Wed Apr 24 14:01:16 2013
@@ -48,4 +48,12 @@ public class SharedDeadLetterStrategy ex
         this.deadLetterQueue = deadLetterQueue;
     }
 
+    @Override
+    public boolean isDLQ(ActiveMQDestination destination) {
+        if (destination.equals(deadLetterQueue)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
 }



Mime
View raw message