activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject activemq git commit: CAMEL-8522: Enrich AMQ message with original destination (where Camel received the message) which for example allows to keep that information when sending messages to other queues or DLQs etc.
Date Sat, 20 Feb 2016 09:29:36 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 49974279a -> f490ab549


CAMEL-8522: Enrich AMQ message with original destination (where Camel received the message)
which for example allows to keep that information when sending messages to other queues or
DLQs etc.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f490ab54
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f490ab54
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f490ab54

Branch: refs/heads/master
Commit: f490ab549df7f130386b319922f936d263691a86
Parents: 4997427
Author: Claus Ibsen <claus.ibsen@gmail.com>
Authored: Sat Feb 20 10:22:11 2016 +0100
Committer: Claus Ibsen <claus.ibsen@gmail.com>
Committed: Sat Feb 20 10:23:54 2016 +0100

----------------------------------------------------------------------
 .../camel/component/ActiveMQComponent.java      |  5 ++
 .../OriginalDestinationPropagateStrategy.java   | 56 +++++++++++++++
 .../ActiveMQOriginalDestinationTest.java        | 76 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 4 files changed, 138 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f490ab54/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
index 39d4420..0ed82bf 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
@@ -206,6 +206,11 @@ public class ActiveMQComponent extends JmsComponent implements EndpointCompleter
             endpointLoader = new CamelEndpointLoader(getCamelContext(), source);
             endpointLoader.afterPropertiesSet();
         }
+
+        // use OriginalDestinationPropagateStrategy by default if no custom stategy has been
set
+        if (getMessageCreatedStrategy() == null) {
+            setMessageCreatedStrategy(new OriginalDestinationPropagateStrategy());
+        }
     }
 
     protected void createDestinationSource() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f490ab54/activemq-camel/src/main/java/org/apache/activemq/camel/component/OriginalDestinationPropagateStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/OriginalDestinationPropagateStrategy.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/OriginalDestinationPropagateStrategy.java
new file mode 100644
index 0000000..84de369
--- /dev/null
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/OriginalDestinationPropagateStrategy.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.jms.MessageCreatedStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A strategy to enrich JMS message with their original destination if the Camel
+ * route originates from a JMS destination.
+ */
+public class OriginalDestinationPropagateStrategy implements MessageCreatedStrategy {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(OriginalDestinationPropagateStrategy.class);
+
+    @Override
+    public void onMessageCreated(Message message, Session session, Exchange exchange, Throwable
cause) {
+        if (exchange.getIn() instanceof JmsMessage) {
+            JmsMessage msg = exchange.getIn(JmsMessage.class);
+            Message jms = msg.getJmsMessage();
+            if (message instanceof ActiveMQMessage) {
+                ActiveMQMessage amq = (ActiveMQMessage) jms;
+                if (amq.getOriginalDestination() == null) {
+                    ActiveMQDestination from = amq.getDestination();
+                    if (from != null) {
+                        LOG.trace("Setting OriginalDestination: {} on {}", from, message);
+                        ((ActiveMQMessage) message).setOriginalDestination(from);
+                    }
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f490ab54/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQOriginalDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQOriginalDestinationTest.java
b/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQOriginalDestinationTest.java
new file mode 100644
index 0000000..5e2bea1
--- /dev/null
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQOriginalDestinationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import javax.jms.Message;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+
+public class ActiveMQOriginalDestinationTest extends CamelTestSupport {
+
+    @Test
+    public void testActiveMQOriginalDestination() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("activemq:queue:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // consume from bar
+        Exchange out = consumer.receive("activemq:queue:bar", 5000);
+        assertNotNull(out);
+
+        // and we should have foo as the original destination
+        JmsMessage msg = out.getIn(JmsMessage.class);
+        Message jms = msg.getJmsMessage();
+        ActiveMQMessage amq = assertIsInstanceOf(ActiveMQMessage.class, jms);
+        ActiveMQDestination original = amq.getOriginalDestination();
+        assertNotNull(original);
+        assertEquals("foo", original.getPhysicalName());
+        assertEquals("Queue", original.getDestinationTypeAsString());
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo")
+                        .to("activemq:queue:bar")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f490ab54/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b24bb51..bc315aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
     <aries-transaction-version>1.1.1</aries-transaction-version>
     <axion-version>1.0-M3-dev</axion-version>
     <camel-version>2.16.2</camel-version>
-    <camel-version-range>[2.15,3)</camel-version-range>
+    <camel-version-range>[2.16,3)</camel-version-range>
     <cglib-version>2.2</cglib-version>
     <commons-beanutils-version>1.8.3</commons-beanutils-version>
     <commons-collections-version>3.2.2</commons-collections-version>


Mime
View raw message