activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r479694 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/ test/java/org/apache/activemq/broker/virtual/ test/resources/org/apache/activemq/broker/virtual/
Date Mon, 27 Nov 2006 17:16:34 GMT
Author: jstrachan
Date: Mon Nov 27 09:16:33 2006
New Revision: 479694

URL: http://svn.apache.org/viewvc?view=rev&rev=479694
Log:
added support for AMQ-1073 to allow selectors to be used with virtual destinations

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=479694&r1=479693&r2=479694
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
Mon Nov 27 09:16:33 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.MessageEvaluationContext;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -29,7 +30,7 @@
 /**
  * Represents a composite {@link Destination} where send()s are replicated to
  * each Destination instance.
- * 
+ *
  * @version $Revision$
  */
 public class CompositeDestinationInterceptor extends DestinationFilter {
@@ -46,8 +47,29 @@
     }
 
     public void send(ConnectionContext context, Message message) throws Exception {
+        MessageEvaluationContext messageContext = null;
+
         for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
-            ActiveMQDestination destination = (ActiveMQDestination) iter.next();
+            ActiveMQDestination destination = null;
+            Object value = iter.next();
+
+            if (value instanceof FilteredDestination) {
+                FilteredDestination filteredDestination = (FilteredDestination) value;
+                if (messageContext == null) {
+                    messageContext = new MessageEvaluationContext();
+                    messageContext.setMessageReference(message);
+                }
+                messageContext.setDestination(filteredDestination.getDestination());
+                if (filteredDestination.matches(messageContext)) {
+                    destination = filteredDestination.getDestination();
+                }
+            }
+            else if (value instanceof ActiveMQDestination) {
+                destination = (ActiveMQDestination) value;
+            }
+            if (destination == null) {
+                continue;
+            }
 
             if (copyMessage) {
                 message = message.copy();

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java?view=auto&rev=479694
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
Mon Nov 27 09:16:33 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.virtual;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.selector.SelectorParser;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+/**
+ * Represents a destination which is filtered using some predicate such as a selector
+ * so that messages are only dispatched to the destination if they match the filter.
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision$
+ */
+public class FilteredDestination {
+    
+    private ActiveMQDestination destination;
+    private String selector;
+    private BooleanExpression filter;
+
+    public boolean matches(MessageEvaluationContext context) throws JMSException {
+        BooleanExpression booleanExpression = getFilter();
+        if (booleanExpression == null) {
+            return false;
+        }
+        return booleanExpression.matches(context);
+    }
+
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    /**
+     * The destination to send messages to if they match the filter
+     */
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    /**
+     * Sets the JMS selector used to filter messages before forwarding them to this destination
+     */
+    public void setSelector(String selector) throws InvalidSelectorException {
+        this.selector = selector;
+        setFilter(new SelectorParser().parse(selector));
+    }
+
+    public BooleanExpression getFilter() {
+        return filter;
+    }
+
+    public void setFilter(BooleanExpression filter) {
+        this.filter = filter;
+    }
+
+
+    /**
+     * Sets the destination property to the given queue name
+     */
+    public void setQueue(String queue) {
+        setDestination(ActiveMQDestination.createDestination(queue, ActiveMQDestination.QUEUE_TYPE));
+    }
+
+    /**
+     * Sets the destination property to the given topic name
+     */
+    public void setTopic(String topic) {
+        setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE));
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java?view=diff&rev=479694&r1=479693&r2=479694
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
Mon Nov 27 09:16:33 2006
@@ -30,6 +30,8 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
 
 import java.net.URI;
 
@@ -43,6 +45,9 @@
     
     private Connection connection;
 
+    protected int total = 10;
+
+
     public void testVirtualTopicCreation() throws Exception {
         if (connection == null) {
             connection = createConnection();
@@ -73,15 +78,27 @@
         MessageProducer producer = session.createProducer(producerDestination);
         assertNotNull(producer);
 
-        int total = 10;
         for (int i = 0; i < total; i++) {
-            producer.send(session.createTextMessage("message: " + i));
+            producer.send(createMessage(session, i));
         }
 
+        assertMessagesArrived(messageList1, messageList2);
+    }
+
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2)
{
         messageList1.assertMessagesArrived(total);
         messageList2.assertMessagesArrived(total);
     }
-    
+
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        TextMessage textMessage = session.createTextMessage("message: " + i);
+        if (i % 2 == 1) {
+            textMessage.setStringProperty("odd", "yes");
+        }
+        textMessage.setIntProperty("i", i);
+        return textMessage;
+    }
+
     protected Destination getConsumer1Dsetination() {
         return new ActiveMQQueue("FOO");
     }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java?view=auto&rev=479694
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
Mon Nov 27 09:16:33 2006
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.spring.ConsumerBean;
+
+/**
+ * @version $Revision$
+ */
+public class FilteredQueueTest extends CompositeQueueTest {
+
+    @Override
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/virtual/filtered-queue.xml";
+    }
+
+    @Override
+    protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2)
{
+        messageList1.assertMessagesArrived(total / 2);
+        messageList2.assertMessagesArrived(1);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml?view=auto&rev=479694
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
Mon Nov 27 09:16:33 2006
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans>
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
/>
+
+  <broker xmlns="http://activemq.org/config/1.0">
+    <destinationInterceptors>
+      <virtualDestinationInterceptor>
+        <virtualDestinations>
+          <compositeQueue name="MY.QUEUE">
+            <forwardTo>
+              <filteredDestination selector="odd = 'yes'" queue="FOO"/>
+              <filteredDestination selector="i = 5" topic="BAR"/>
+            </forwardTo>
+          </compositeQueue>
+        </virtualDestinations>
+      </virtualDestinationInterceptor>
+    </destinationInterceptors>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message