activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r436899 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/jmx/
Date Fri, 25 Aug 2006 18:41:47 GMT
Author: jstrachan
Date: Fri Aug 25 11:41:45 2006
New Revision: 436899

URL: http://svn.apache.org/viewvc?rev=436899&view=rev
Log:
fix for AMQ-896 and AMQ-837. Also tidied up the Queue / QueueView / QueueViewMBean code a
little to make it easier to work with queues via Java / JMX allowing messages to be copied,
moved and removed via a selector or MessageReferenceFilter

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Fri Aug 25 11:41:45 2006
@@ -20,6 +20,7 @@
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -39,16 +40,56 @@
         return OpenTypeSupport.convert(rc);
     }
 
-    public boolean removeMessage(String messageId){
-        return ((Queue) destination).removeMessage(messageId);
-    }
-
     public void purge(){
         ((Queue) destination).purge();
     }
 
+    public boolean removeMessage(String messageId) throws Exception{
+        return ((Queue) destination).removeMessage(messageId);
+    }
+
+    public int removeMatchingMessages(String selector) throws Exception {
+        return ((Queue) destination).removeMatchingMessages(selector);
+    }
+    
+    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception
{
+        return ((Queue) destination).removeMatchingMessages(selector, maximumMessages);
+    }
+    
     public boolean copyMessageTo(String messageId, String destinationName) throws Exception
{
-        return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()),
messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE));
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).copyMessageTo(context, messageId, toDestination);
+    }
+
+    public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception
{
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).copyMatchingMessagesTo(context, selector, toDestination);
+    }
+    
+    public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages)
throws Exception {
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).copyMatchingMessagesTo(context, selector, toDestination,
maximumMessages);
+    }
+    
+    public boolean moveMessageTo(String messageId, String destinationName) throws Exception
{
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).moveMessageTo(context, messageId, toDestination);
+    }
+    
+    public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception
{
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).moveMatchingMessagesTo(context, selector, toDestination);
     }
     
+    public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages)
throws Exception {
+        ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker());
+        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        return ((Queue) destination).moveMatchingMessagesTo(context, selector, toDestination,
maximumMessages);
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Fri Aug 25 11:41:45 2006
@@ -24,37 +24,92 @@
 public interface QueueViewMBean extends DestinationViewMBean {
     
 	/**
-	 * Retrieve a message from the destination's queue.
-	 * 
-	 * @param messageId the message id of the message to retreive
-	 * @return A CompositeData object which is a JMX version of the messages
-	 * @throws OpenDataException
-	 */
+     * Retrieve a message from the destination's queue.
+     * 
+     * @param messageId
+     *            the message id of the message to retrieve
+     * @return A CompositeData object which is a JMX version of the messages
+     * @throws OpenDataException
+     */
     public CompositeData getMessage(String messageId) throws OpenDataException;
     
     /**
-     * Removes a message from the queue.  If the message has allready been dispatched 
-     * to another consumer, the message cannot be delted and this method will return 
-     * false.
+     * Removes a message from the queue. If the message has already been
+     * dispatched to another consumer, the message cannot be deleted and this
+     * method will return false.
      * 
-     * @param messageId 
-     * @return true if the message was found and could be succesfully deleted.
+     * @param messageId
+     * @return true if the message was found and could be successfully deleted.
+     * @throws Exception 
      */
-    public boolean removeMessage(String messageId);
-    
+    public boolean removeMessage(String messageId) throws Exception;
+
     /**
-     * Emptys out all the messages in the queue.
+     * Removes the messages matching the given selector
+     * 
+     * @return the number of messages removed
+     */
+    public int removeMatchingMessages(String selector) throws Exception;
+
+    /**
+     * Removes the messages matching the given selector up to the maximum number of matched
messages
+     * 
+     * @return the number of messages removed
+     */
+    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception;
+
+
+    /**
+     * Removes all of the messages in the queue.
      */
     public void purge();
     
     /**
-     * Copys a given message to another destination.
+     * Copies a given message to another destination.
      * 
      * @param messageId
      * @param destinationName
-     * @return true if the message was found and was successfuly copied to the other destination.
+     * @return true if the message was found and was successfully copied to the
+     *         other destination.
      * @throws Exception
      */
     public boolean copyMessageTo(String messageId, String destinationName) throws Exception;
+
+    /**
+     * Copies the messages matching the given selector
+     * 
+     * @return the number of messages copied
+     */
+    public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception;
+
+    /**
+     * Copies the messages matching the given selector up to the maximum number of matched
messages
+     * 
+     * @return the number of messages copied
+     */
+    public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages)
throws Exception;
+
+    /**
+     * Moves the message to another destination.
+     * 
+     * @param messageId
+     * @param destinationName
+     * @return true if the message was found and was successfully copied to the
+     *         other destination.
+     * @throws Exception
+     */
+    public boolean moveMessageTo(String messageId, String destinationName) throws Exception;
+
+    /**
+     * Moves the messages matching the given selector
+     * 
+     * @return the number of messages removed
+     */
+    public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception;
+    
+    /**
+     * Moves the messages matching the given selector up to the maximum number of matched
messages
+     */
+    public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages)
throws Exception;
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java?rev=436899&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
Fri Aug 25 11:41:45 2006
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.broker.ConnectionContext;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a filter on message references
+ * 
+ * @version $Revision$
+ */
+public interface MessageReferenceFilter {
+
+    public boolean evaluate(ConnectionContext context, MessageReference messageReference)
throws JMSException;
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Aug 25 11:41:45 2006
@@ -33,8 +33,10 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -44,6 +46,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -77,20 +82,21 @@
     protected int highestSubscriptionPriority;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
-    
-    public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store,
-            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
{
+
+    public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store, DestinationStatistics parentStats,
+            TaskRunnerFactory taskFactory) throws Exception {
         this.destination = destination;
         this.usageManager = new UsageManager(memoryManager);
         this.usageManager.setLimit(Long.MAX_VALUE);
         this.store = store;
 
-        // Let the store know what usage manager we are using so that he can flush messages
to disk
+        // Let the store know what usage manager we are using so that he can
+        // flush messages to disk
         // when usage gets high.
-        if( store!=null ) {
+        if (store != null) {
             store.setUsageManager(usageManager);
         }
-        
+
         destinationStatistics.setParent(parentStats);
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
 
@@ -110,8 +116,8 @@
                 public void recoverMessageReference(String messageReference) throws Exception
{
                     throw new RuntimeException("Should not be called.");
                 }
-                
-                public void finished(){
+
+                public void finished() {
                 }
             });
         }
@@ -164,13 +170,15 @@
                         if (sub.matches(node, msgContext)) {
                             sub.add(node);
                         }
-                    } catch (IOException e) {
+                    }
+                    catch (IOException e) {
                         log.warn("Could not load message: " + e, e);
                     }
                 }
             }
 
-        } finally {
+        }
+        finally {
             msgContext.clear();
             dispatchValve.turnOn();
         }
@@ -225,8 +233,9 @@
                             }
                         }
                     }
-                    
-                    // now lets dispatch from the copy of the collection to avoid deadlocks
+
+                    // now lets dispatch from the copy of the collection to
+                    // avoid deadlocks
                     for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();)
{
                         IndirectMessageReference node = (IndirectMessageReference) iter.next();
                         node.incrementRedeliveryCounter();
@@ -239,7 +248,8 @@
                     msgContext.clear();
                 }
             }
-        } finally {
+        }
+        finally {
             dispatchValve.turnOn();
         }
 
@@ -250,9 +260,10 @@
         if (context.isProducerFlowControl()) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
-            } else {
-            usageManager.waitForSpace();
-            }    
+            }
+            else {
+                usageManager.waitForSpace();
+            }
         }
 
         message.setRegionDestination(this);
@@ -269,10 +280,12 @@
                         dispatch(context, node, message);
                     }
                 });
-            } else {
+            }
+            else {
                 dispatch(context, node, message);
             }
-        } finally {
+        }
+        finally {
             node.decrementReferenceCount();
         }
     }
@@ -315,9 +328,10 @@
 
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack,
MessageReference node) throws IOException {
         if (store != null && node.isPersistent()) {
-            // the original ack may be a ranged ack, but we are trying to delete a specific

+            // the original ack may be a ranged ack, but we are trying to delete
+            // a specific
             // message store here so we need to convert to a non ranged ack.
-            if( ack.getMessageCount() > 0 ) {
+            if (ack.getMessageCount() > 0) {
                 // Dup the ack
                 MessageAck a = new MessageAck();
                 ack.copy(a);
@@ -344,9 +358,8 @@
         synchronized (messages) {
             size = messages.size();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size()
-                + ", memory=" + usageManager.getPercentUsage() + "%, size=" + size + ", in
flight groups="
-                + messageGroupOwners;
+        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size() + ", memory=" + usageManager.getPercentUsage()
+                + "%, size=" + size + ", in flight groups=" + messageGroupOwners;
     }
 
     public void start() throws Exception {
@@ -364,7 +377,7 @@
     public String getDestination() {
         return destination.getPhysicalName();
     }
-    
+
     public UsageManager getUsageManager() {
         return usageManager;
     }
@@ -443,8 +456,7 @@
     public void setMemoryLimit(long limit) {
         getUsageManager().setLimit(limit);
     }
-    
-    
+
     // Implementation methods
     // -------------------------------------------------------------------------
     private MessageReference createMessageReference(Message message) {
@@ -472,7 +484,8 @@
             msgContext.setMessageReference(node);
 
             dispatchPolicy.dispatch(context, node, msgContext, consumers);
-        } finally {
+        }
+        finally {
             msgContext.clear();
             dispatchValve.decrement();
         }
@@ -508,10 +521,12 @@
                         if (m != null) {
                             l.add(m);
                         }
-                    } finally {
+                    }
+                    finally {
                         r.decrementReferenceCount();
                     }
-                } catch (IOException e) {
+                }
+                catch (IOException e) {
                 }
             }
         }
@@ -519,33 +534,6 @@
         return (Message[]) l.toArray(new Message[l.size()]);
     }
 
-    public boolean removeMessage(String messageId) {
-        synchronized (messages) {
-            ConnectionContext c = new ConnectionContext();
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                try {
-                    IndirectMessageReference r = (IndirectMessageReference) iter.next();
-                    if (messageId.equals(r.getMessageId().toString())) {
-                        
-                        // We should only delete messages that can be locked.
-                        if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) )  {
-                            MessageAck ack = new MessageAck();
-                            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-                            ack.setDestination(destination);
-                            ack.setMessageID(r.getMessageId());
-                            acknowledge(c, null, ack, r);
-                            r.drop();
-                            dropEvent();
-                            return true;
-                        }
-                    }
-                } catch (IOException e) {
-                }
-            }
-        }
-        return false;
-    }
-
     public Message getMessage(String messageId) {
         synchronized (messages) {
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
@@ -558,12 +546,14 @@
                             if (m != null) {
                                 return m;
                             }
-                        } finally {
+                        }
+                        finally {
                             r.decrementReferenceCount();
                         }
                         break;
                     }
-                } catch (IOException e) {
+                }
+                catch (IOException e) {
                 }
             }
         }
@@ -572,13 +562,13 @@
 
     public void purge() {
         synchronized (messages) {
-            ConnectionContext c = new ConnectionContext();
+            ConnectionContext c = createConnectionContext();
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
                 try {
                     IndirectMessageReference r = (IndirectMessageReference) iter.next();
-                    
+
                     // We should only delete messages that can be locked.
-                    if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) )  {
+                    if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
                         MessageAck ack = new MessageAck();
                         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
                         ack.setDestination(destination);
@@ -587,7 +577,8 @@
                         r.drop();
                         dropEvent(true);
                     }
-                } catch (IOException e) {
+                }
+                catch (IOException e) {
                 }
             }
 
@@ -596,27 +587,207 @@
             gc();
         }
     }
+    
+
+    /**
+     * Removes the message matching the given messageId
+     */
+    public boolean removeMessage(String messageId) throws Exception {
+        return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
+    }
+
+    /**
+     * Removes the messages matching the given selector
+     * 
+     * @return the number of messages removed
+     */
+    public int removeMatchingMessages(String selector) throws Exception {
+        return removeMatchingMessages(selector, -1);
+    }
+    
+    /**
+     * Removes the messages matching the given selector up to the maximum number of matched
messages
+     * 
+     * @return the number of messages removed
+     */
+    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception
{
+        return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
+    }
+
+    /**
+     * Removes the messages matching the given filter up to the maximum number of matched
messages
+     * 
+     * @return the number of messages removed
+     */
+    public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages)
throws Exception {
+        int counter = 0;
+        synchronized (messages) {
+            ConnectionContext c = createConnectionContext();
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                IndirectMessageReference r = (IndirectMessageReference) iter.next();
+                if (filter.evaluate(c, r)) {
+                    // We should only delete messages that can be locked.
+                    if (lockMessage(r)) {
+                        removeMessage(c, r);
+                        if (++counter >= maximumMessages && maximumMessages >
0) {
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        return counter;
+    }
 
+    /**
+     * Copies the message matching the given messageId
+     */
     public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination
dest) throws Exception {
+        return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) >
0;
+    }
+    
+    /**
+     * Copies the messages matching the given selector
+     * 
+     * @return the number of messages copied
+     */
+    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination
dest) throws Exception {
+        return copyMatchingMessagesTo(context, selector, dest, -1);
+    }
+    
+    /**
+     * Copies the messages matching the given selector up to the maximum number of matched
messages
+     * 
+     * @return the number of messages copied
+     */
+    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination
dest, int maximumMessages) throws Exception {
+        return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
+    }
+
+    /**
+     * Copies the messages matching the given filter up to the maximum number of matched
messages
+     * 
+     * @return the number of messages copied
+     */
+    public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
+        int counter = 0;
         synchronized (messages) {
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                try {
-                    MessageReference r = (MessageReference) iter.next();
-                    if (messageId.equals(r.getMessageId().toString())) {
+                MessageReference r = (MessageReference) iter.next();
+                if (filter.evaluate(context, r)) {
+                    r.incrementReferenceCount();
+                    try {
+                        Message m = r.getMessage();
+                        BrokerSupport.resend(context, m, dest);
+                        if (++counter >= maximumMessages && maximumMessages >
0) {
+                            break;
+                        }
+                    }
+                    finally {
+                        r.decrementReferenceCount();
+                    }
+                }
+            }
+        }
+        return counter;
+    }
+
+    /**
+     * Moves the message matching the given messageId
+     */
+    public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination
dest) throws Exception {
+        return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1)
> 0;
+    }
+    
+    /**
+     * Moves the messages matching the given selector
+     * 
+     * @return the number of messages removed
+     */
+    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination
dest) throws Exception {
+        return moveMatchingMessagesTo(context, selector, dest, -1);
+    }
+    
+    /**
+     * Moves the messages matching the given selector up to the maximum number of matched
messages
+     */
+    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination
dest, int maximumMessages) throws Exception {
+        return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
+    }
+
+    /**
+     * Moves the messages matching the given filter up to the maximum number of matched messages
+     */
+    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
+        int counter = 0;
+        synchronized (messages) {
+            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                IndirectMessageReference r = (IndirectMessageReference) iter.next();
+                if (filter.evaluate(context, r)) {
+                    // We should only move messages that can be locked.
+                    if (lockMessage(r)) {
                         r.incrementReferenceCount();
                         try {
                             Message m = r.getMessage();
-                            BrokerSupport.resend(context, m, dest);                     
      
-                        } finally {
+                            BrokerSupport.resend(context, m, dest);
+                            removeMessage(context, r);
+                            if (++counter >= maximumMessages && maximumMessages
> 0) {
+                                break;
+                            }
+                        }
+                        finally {
                             r.decrementReferenceCount();
-                        }                        
-                        return true;
+                        }
                     }
-                } catch (IOException e) {
                 }
             }
         }
-        return false;
+        return counter;
+    }
+
+    protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
+        return new MessageReferenceFilter() {
+            public boolean evaluate(ConnectionContext context, MessageReference r) {
+                return messageId.equals(r.getMessageId().toString());
+            }
+        };
+    }
+    
+    protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException
{
+        final BooleanExpression selectorExpression = new SelectorParser().parse(selector);
+
+        return new MessageReferenceFilter() {
+            public boolean evaluate(ConnectionContext context, MessageReference r) throws
JMSException {
+                MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
+                
+                messageEvaluationContext.setMessageReference(r);
+                if (messageEvaluationContext.getDestination() == null) {
+                    messageEvaluationContext.setDestination(getActiveMQDestination());
+                }
+                
+                return selectorExpression.matches(messageEvaluationContext);
+            }
+        };
+    }
+
+    protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws
IOException {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setDestination(destination);
+        ack.setMessageID(r.getMessageId());
+        acknowledge(c, null, ack, r);
+        r.drop();
+        dropEvent();
+    }
+
+    protected boolean lockMessage(IndirectMessageReference r) {
+        return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
+    }
+
+    protected ConnectionContext createConnectionContext() {
+        ConnectionContext answer = new ConnectionContext();
+        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
+        return answer;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Fri Aug 25 11:41:45 2006
@@ -33,7 +33,6 @@
 import javax.jms.JMSException;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 public class QueueSubscription extends PrefetchSubscription implements LockOwner {
     

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Fri Aug 25 11:41:45 2006
@@ -74,6 +74,58 @@
         assertQueueBrowseWorks();
         assertCreateAndDestroyDurableSubscriptions();
     }
+    
+    public void testMoveMessagesBySelector() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+        
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ getDestinationString() + ",BrokerName=localhost");
+        
+        QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        
+        String newDestination = "test.new.destination." + getClass() + "." + getName();
+        queue.moveMatchingMessagesTo("counter > 2", newDestination );
+        
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ newDestination + ",BrokerName=localhost");
+        
+        queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        
+        assertTrue("Should have at least one message in the queue: " + queueViewMBeanName,
queue.getQueueSize() > 0);
+        
+        // now lets remove them by selector
+        queue.removeMatchingMessages("counter > 2");
+        
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName,
0, queue.getQueueSize());
+    }
+    
+    public void testCopyMessagesBySelector() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+        
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ getDestinationString() + ",BrokerName=localhost");
+
+        QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+
+        String newDestination = "test.new.destination." + getClass() + "." + getName();
+        long queueSize = queue.getQueueSize();
+        queue.copyMatchingMessagesTo("counter > 2", newDestination);
+        
+        assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName,
queueSize, queueSize);
+        
+        queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ newDestination + ",BrokerName=localhost");
+
+        queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        
+        log.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + "
message(s)");
+        
+        assertTrue("Should have at least one message in the queue: " + queueViewMBeanName,
queue.getQueueSize() > 0);
+        
+        // now lets remove them by selector
+        queue.removeMatchingMessages("counter > 2");
+        
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName,
0, queue.getQueueSize());
+    }
+
 
     protected void assertQueueBrowseWorks() throws Exception {
         Integer mbeancnt = mbeanServer.getMBeanCount();
@@ -205,6 +257,7 @@
         MessageProducer producer = session.createProducer(destination);
         for (int i = 0; i < messageCount; i++) {
             Message message = session.createTextMessage("Message: " + i);
+            message.setIntProperty("counter", i);
             producer.send(message);
         }
         Thread.sleep(1000);



Mime
View raw message