activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r954540 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/ test/java/org/apache/activemq/broker/virtual/ test/resources/org/apache/activemq/broker/virtual/
Date Mon, 14 Jun 2010 16:25:57 GMT
Author: gtully
Date: Mon Jun 14 16:25:57 2010
New Revision: 954540

URL: http://svn.apache.org/viewvc?rev=954540&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2779 - add selectorAware option to a
virtual topic such that only message matching some exisitng subscription selector are propagated
to a subscription queue, this allows dynamic selector usage withough the build up of unmatched
messages on a destination

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/global-virtual-topics.xml

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=954540&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
Mon Jun 14 16:25:57 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.virtual;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+
+public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
+
+    public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix)
{
+        super(next, prefix, postfix);
+    }
+
+    /**
+     * Respect the selectors of the subscriptions to ensure only matched messages are dispatched
to
+     * the virtual queues, hence there is no build up of unmatched messages on these destinations
+     */
+    @Override
+    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination
destination) throws Exception {
+        Broker broker = context.getConnectionContext().getBroker();
+        Set<Destination> destinations = broker.getDestinations(destination);
+
+        for (Destination dest : destinations) {
+            if (matchesSomeConsumer(message, dest)) {
+                dest.send(context, message.copy());
+            }
+        }
+    }
+    
+    private boolean matchesSomeConsumer(Message message, Destination dest) throws IOException
{
+        boolean matches = false;
+        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+        msgContext.setDestination(dest.getActiveMQDestination());
+        msgContext.setMessageReference(message);
+        List<Subscription> subs = dest.getConsumers();
+        for (Subscription sub: subs) {
+            if (sub.matches(message, msgContext)) {
+                matches = true;
+                break;
+            }
+        }
+        return matches;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=954540&r1=954539&r2=954540&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
Mon Jun 14 16:25:57 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.broker.regio
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 
 /**
@@ -36,6 +35,7 @@ public class VirtualTopic implements Vir
     private String prefix = "Consumer.*.";
     private String postfix = "";
     private String name = ">";
+    private boolean selectorAware = false;
 
 
     public ActiveMQDestination getVirtualDestination() {
@@ -43,7 +43,8 @@ public class VirtualTopic implements Vir
     }
 
     public Destination intercept(Destination destination) {
-        return new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
+        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(),
getPostfix()) : 
+            new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
     }
     
 
@@ -83,5 +84,18 @@ public class VirtualTopic implements Vir
     public void setName(String name) {
         this.name = name;
     }
-
+    
+    /**
+     * Indicates whether the selectors of consumers are used to determine dispatch
+     * to a virtual destination, when true only messages matching an existing 
+     * consumer will be dispatched.
+     * @param selectorAware when true take consumer selectors into consideration
+     */
+    public void setSelectorAware(boolean selectorAware) {
+        this.selectorAware = selectorAware;
+    }
+    
+    public boolean isSelectorAware() {
+        return selectorAware;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java?rev=954540&r1=954539&r2=954540&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
Mon Jun 14 16:25:57 2010
@@ -44,7 +44,8 @@ public class CompositeQueueTest extends 
     private static final Log LOG = LogFactory.getLog(CompositeQueueTest.class);
     
     protected int total = 10;
-    private Connection connection;
+    protected Connection connection;
+    public String messageSelector1, messageSelector2 = null;
 
 
     public void testVirtualTopicCreation() throws Exception {
@@ -67,8 +68,8 @@ public class CompositeQueueTest extends 
         LOG.info("Sending to: " + producerDestination);
         LOG.info("Consuming from: " + destination1 + " and " + destination2);
         
-        MessageConsumer c1 = session.createConsumer(destination1);
-        MessageConsumer c2 = session.createConsumer(destination2);
+        MessageConsumer c1 = session.createConsumer(destination1, messageSelector1);
+        MessageConsumer c2 = session.createConsumer(destination2, messageSelector2);
 
         c1.setMessageListener(messageList1);
         c2.setMessageListener(messageList2);
@@ -93,6 +94,8 @@ public class CompositeQueueTest extends 
         TextMessage textMessage = session.createTextMessage("message: " + i);
         if (i % 2 != 0) {
             textMessage.setStringProperty("odd", "yes");
+        } else {
+            textMessage.setStringProperty("odd", "no");
         }
         textMessage.setIntProperty("i", i);
         return textMessage;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java?rev=954540&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
Mon Jun 14 16:25:57 2010
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class VirtualTopicSelectorTest extends CompositeTopicTest {
+
+    private static final Log LOG = LogFactory.getLog(VirtualTopicSelectorTest.class);
+            
+    protected Destination getConsumer1Dsetination() {
+        return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+    }
+
+    protected Destination getConsumer2Dsetination() {
+        return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
+    }
+    
+    protected Destination getProducerDestination() {
+        return new ActiveMQTopic("VirtualTopic.TEST");
+    }
+    
+    @Override
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2)
{
+        messageList1.assertMessagesArrived(total/2);
+        messageList2.assertMessagesArrived(total/2);
+ 
+        messageList1.flushMessages();
+        messageList2.flushMessages();
+        
+        LOG.info("validate no other messages on queues");
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                
+            Destination destination1 = getConsumer1Dsetination();
+            Destination destination2 = getConsumer2Dsetination();
+            MessageConsumer c1 = session.createConsumer(destination1, null);
+            MessageConsumer c2 = session.createConsumer(destination2, null);
+            c1.setMessageListener(messageList1);
+            c2.setMessageListener(messageList2);
+            
+            
+            LOG.info("send one simple message that should go to both consumers");
+            MessageProducer producer = session.createProducer(getProducerDestination());
+            assertNotNull(producer);
+            
+            producer.send(session.createTextMessage("Last Message"));
+            
+            messageList1.assertMessagesArrived(1);
+            messageList2.assertMessagesArrived(1);
+        
+        } catch (JMSException e) {
+            e.printStackTrace();
+            fail("unexpeced ex while waiting for last messages: " + e);
+        }
+    }
+    
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        // use message selectors on consumers that need to propagate up to the virtual
+        // topic dispatch so that un matched messages do not linger on subscription queues
+        messageSelector1 = "odd = 'yes'";
+        messageSelector2 = "odd = 'no'";
+        
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+
+        VirtualTopic virtualTopic = new VirtualTopic();
+        // the new config that enables selectors on the intercepter
+        virtualTopic.setSelectorAware(true);
+        VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
+        broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
+        return broker;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/global-virtual-topics.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/global-virtual-topics.xml?rev=954540&r1=954539&r2=954540&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/global-virtual-topics.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/global-virtual-topics.xml
Mon Jun 14 16:25:57 2010
@@ -31,7 +31,7 @@
     <destinationInterceptors>
       <virtualDestinationInterceptor>
         <virtualDestinations>
-          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." />
+          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
         </virtualDestinations>
       </virtualDestinationInterceptor>
     </destinationInterceptors>



Mime
View raw message