activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject [2/2] git commit: fix for https://issues.apache.org/jira/browse/AMQ-4714
Date Fri, 06 Sep 2013 12:56:21 GMT
fix for https://issues.apache.org/jira/browse/AMQ-4714


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8d31e44e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8d31e44e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8d31e44e

Branch: refs/heads/trunk
Commit: 8d31e44e8d0615632d428f0a6778ce36ba5e02ee
Parents: 0a5b143
Author: rajdavies <rajdavies@gmail.com>
Authored: Fri Sep 6 13:45:37 2013 +0100
Committer: rajdavies <rajdavies@gmail.com>
Committed: Fri Sep 6 13:46:44 2013 +0100

----------------------------------------------------------------------
 .../camel/component/broker/BrokerEndpoint.java  | 19 ++---
 .../broker/BrokerComponentXMLConfigTest.java    | 63 ++++++++++++++---
 .../activemq/camel/component/broker/camel.xml   | 73 +++++++++++++-------
 3 files changed, 114 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
index e0d896e..e327669 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
@@ -22,8 +22,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.inteceptor.MessageInterceptor;
 import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
-import org.apache.activemq.broker.view.MessageBrokerView;
-import org.apache.activemq.broker.view.MessageBrokerViewRegistry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.camel.Consumer;
@@ -46,7 +44,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
 
     @UriParam
     private final BrokerConfiguration configuration;
-    private MessageBrokerView messageBrokerView;
     private MessageInterceptorRegistry messageInterceptorRegistry;
 
 
@@ -92,8 +89,7 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(configuration.getBrokerName());
-        messageInterceptorRegistry = new MessageInterceptorRegistry(messageBrokerView.getBrokerService());
+        messageInterceptorRegistry =  MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName());
         for (MessageInterceptor messageInterceptor : messageInterceptorList) {
             addMessageInterceptor(messageInterceptor);
         }
@@ -119,11 +115,18 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
     }
 
     protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message)
throws Exception {
+        ProducerBrokerExchange pbe = producerBrokerExchange;
         if (message != null) {
-            if (message.getDestination() == null) {
-                message.setDestination(destination);
+            message.setDestination(destination);
+            if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination()
!= null){
+                if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){
+                     //The message broker will create a new ProducerBrokerExchange with the
+                     //correct region broker set
+                     pbe = null;
+                }
             }
-            messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
+
+            messageInterceptorRegistry.injectMessage(pbe, message);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
index 0696170..e3b7227 100644
--- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java
@@ -47,16 +47,18 @@ public class BrokerComponentXMLConfigTest {
     private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class);
     protected static final String TOPIC_NAME = "test.broker.component.topic";
     protected static final String QUEUE_NAME = "test.broker.component.queue";
+    protected static final String ROUTE_QUEUE_NAME = "test.broker.component.route";
+    protected static final String DIVERTED_QUEUE_NAME = "test.broker.component.ProcessLater";
+    protected static final int DIVERT_COUNT = 100;
+
     protected BrokerService brokerService;
     protected ActiveMQConnectionFactory factory;
     protected Connection producerConnection;
     protected Connection consumerConnection;
     protected Session consumerSession;
     protected Session producerSession;
-    protected MessageConsumer consumer;
-    protected MessageProducer producer;
-    protected Topic topic;
-    protected int messageCount = 5000;
+
+    protected int messageCount = 1000;
     protected int timeOutInSeconds = 10;
 
     @Before
@@ -69,10 +71,9 @@ public class BrokerComponentXMLConfigTest {
         producerConnection = factory.createConnection();
         producerConnection.start();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        topic = consumerSession.createTopic(TOPIC_NAME);
+
         producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        consumer = consumerSession.createConsumer(topic);
-        producer = producerSession.createProducer(topic);
+
     }
 
     protected BrokerService createBroker(String resource) throws Exception {
@@ -110,9 +111,10 @@ public class BrokerComponentXMLConfigTest {
     public void testReRouteAll() throws Exception {
         final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
 
+        Topic topic = consumerSession.createTopic(TOPIC_NAME);
 
         final CountDownLatch latch = new CountDownLatch(messageCount);
-        consumer = consumerSession.createConsumer(queue);
+        MessageConsumer  consumer = consumerSession.createConsumer(queue);
         consumer.setMessageListener(new MessageListener() {
             @Override
             public void onMessage(javax.jms.Message message) {
@@ -124,6 +126,8 @@ public class BrokerComponentXMLConfigTest {
                 }
             }
         });
+        MessageProducer producer = producerSession.createProducer(topic);
+
         for (int i  = 0; i < messageCount; i++){
             javax.jms.Message message = producerSession.createTextMessage("test: " + i);
             producer.send(message);
@@ -134,7 +138,50 @@ public class BrokerComponentXMLConfigTest {
 
     }
 
+    @Test
+    public void testRouteWithDestinationLimit() throws Exception {
+        final ActiveMQQueue routeQueue = new ActiveMQQueue(ROUTE_QUEUE_NAME);
+
 
+        final CountDownLatch routeLatch = new CountDownLatch(DIVERT_COUNT);
+        MessageConsumer  messageConsumer = consumerSession.createConsumer(routeQueue);
+        messageConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                try {
+                    routeLatch.countDown();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final CountDownLatch divertLatch = new CountDownLatch(messageCount-DIVERT_COUNT);
+        MessageConsumer  divertConsumer = consumerSession.createConsumer(new ActiveMQQueue(DIVERTED_QUEUE_NAME));
+        divertConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                try {
+                    divertLatch.countDown();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+
+        MessageProducer producer = producerSession.createProducer(routeQueue);
+
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        routeLatch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        divertLatch.await(timeOutInSeconds,TimeUnit.SECONDS);
+        assertEquals(0,routeLatch.getCount());
+        assertEquals(0,divertLatch.getCount());
+    }
 
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
index b23a281..750c134 100644
--- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml
@@ -1,36 +1,59 @@
- <!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
 -->
 
 <beans
-   xmlns="http://www.springframework.org/schema/beans"  
-   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-   xsi:schemaLocation="
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-  
-    <camelContext id="camel"  trace="false" xmlns="http://camel.apache.org/schema/spring">
+
+    <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
 
         <!-- You can use Spring XML syntax to define the routes here using the <route>
element -->
-        <route id ="brokerComponentTest">
+        <route id="brokerComponentTest">
             <from uri="broker:topic:test.broker.>"/>
-             <setHeader headerName="JMSPriority">
-                 <constant>9</constant>
-             </setHeader>
+            <setHeader headerName="JMSPriority">
+                <constant>9</constant>
+            </setHeader>
             <to uri="broker:queue:test.broker.component.queue"/>
         </route>
+
+    <route id="brokerComponentDLQAboveLimitTest">
+        <from uri="broker:queue:test.broker.component.route"/>
+        <choice>
+            <when>
+                <spel>#{@destinationView.enqueueCount >= 100}</spel>
+                <to uri="broker:queue:test.broker.component.ProcessLater"/>
+            </when>
+            <otherwise>
+                <to uri="broker:queue:test.broker.component.route"/>
+            </otherwise>
+        </choice>
+        </route>
+
+
     </camelContext>
-</beans>
\ No newline at end of file
+    <bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView">
+        <constructor-arg value="testBroker"/>
+    </bean>
+    <bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView">
+        <constructor-arg value="test.broker.component.route"/>
+
+    </bean>
+</beans>
+


Mime
View raw message