camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/3] git commit: CAMEL-7846 Added a DelegateEndpoint interface into Camel API
Date Wed, 24 Sep 2014 05:42:28 GMT
CAMEL-7846 Added a DelegateEndpoint interface into Camel API


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/198f2707
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/198f2707
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/198f2707

Branch: refs/heads/master
Commit: 198f2707fe0068f5ee0473942a430b7b0958530c
Parents: db27ed3
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Wed Sep 24 13:24:20 2014 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Wed Sep 24 13:24:20 2014 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/DelegateEndpoint.java |  28 +++++
 .../apache/camel/component/quartz/CamelJob.java |  12 +-
 .../camel/component/quartz2/CamelJob.java       |  10 +-
 .../quartz/DelegateEndpointQuartzTest.java      | 116 +++++++++++++++++++
 4 files changed, 161 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/198f2707/camel-core/src/main/java/org/apache/camel/DelegateEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/DelegateEndpoint.java b/camel-core/src/main/java/org/apache/camel/DelegateEndpoint.java
new file mode 100644
index 0000000..aa5ec93
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/DelegateEndpoint.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+public interface DelegateEndpoint extends Endpoint {
+
+    /**
+     * Gets the delegated {@link Endpoint}.
+     *
+     * @return the Endpoint we delegate to
+     */
+    Endpoint getEndpoint();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/198f2707/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
index 7497172..b2c0414 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.quartz;
 import java.io.Serializable;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.DelegateEndpoint;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Route;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
@@ -70,13 +72,17 @@ public class CamelJob implements Job, Serializable {
             // check all active routes for the quartz endpoint this task matches
             // as we prefer to use the existing endpoint from the routes
             for (Route route : camelContext.getRoutes()) {
-                if (route.getEndpoint() instanceof QuartzEndpoint) {
-                    QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint();
+                Endpoint endpoint = route.getEndpoint();
+                if (endpoint instanceof DelegateEndpoint) {
+                    endpoint = ((DelegateEndpoint)endpoint).getEndpoint();   
+                }
+                if (endpoint instanceof QuartzEndpoint) {
+                    QuartzEndpoint quartzEndpoint = (QuartzEndpoint) endpoint;
                     String triggerName = quartzEndpoint.getTrigger().getName();
                     String triggerGroup = quartzEndpoint.getTrigger().getGroup();
                     LOG.trace("Checking route trigger {}.{}", triggerName, triggerGroup);
                     if (triggerName.equals(targetTriggerName) && triggerGroup.equals(targetTriggerGroup))
{
-                        return (QuartzEndpoint) route.getEndpoint();
+                        return (QuartzEndpoint) endpoint;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/198f2707/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index 1d5d88d..407b93b 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.quartz2;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.DelegateEndpoint;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
 import org.quartz.Job;
@@ -96,8 +98,12 @@ public class CamelJob implements Job {
         // check all active routes for the quartz endpoint this task matches
         // as we prefer to use the existing endpoint from the routes
         for (Route route : camelContext.getRoutes()) {
-            if (route.getEndpoint() instanceof QuartzEndpoint) {
-                QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint();
+            Endpoint endpoint = route.getEndpoint();
+            if (endpoint instanceof DelegateEndpoint) {
+                endpoint = ((DelegateEndpoint)endpoint).getEndpoint();   
+            }
+            if (endpoint instanceof QuartzEndpoint) {
+                QuartzEndpoint quartzEndpoint = (QuartzEndpoint) endpoint;
                 TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Checking route endpoint={} with checkTriggerKey={}", quartzEndpoint,
checkTriggerKey);

http://git-wip-us.apache.org/repos/asf/camel/blob/198f2707/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/DelegateEndpointQuartzTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/DelegateEndpointQuartzTest.java
b/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/DelegateEndpointQuartzTest.java
new file mode 100644
index 0000000..ac3c00a
--- /dev/null
+++ b/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/DelegateEndpointQuartzTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.quartz;
+
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.DelegateEndpoint;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.quartz.QuartzConstants;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.URISupport;
+import org.junit.Test;
+import org.quartz.JobDetail;
+
+public class DelegateEndpointQuartzTest extends CamelTestSupport {
+    
+    @Test
+    public void testQuartzCronRoute() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(3);
+        
+        assertMockEndpointsSatisfied();
+
+        JobDetail job = mock.getReceivedExchanges().get(0).getIn().getHeader("jobDetail",
JobDetail.class);
+        assertNotNull(job);
+
+        assertEquals("cron", job.getJobDataMap().get(QuartzConstants.QUARTZ_TRIGGER_TYPE));
+        assertEquals("0/2 * * * * ?", job.getJobDataMap().get(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("my:quartz://myGroup/myTimerName?cron=0/2+*+*+*+*+?").to("mock:result");
+            }
+        };
+    }
+    
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry =  new JndiRegistry(createJndiContext());
+        registry.bind("my", new MyComponent());
+        return registry;
+    }
+    
+    class MyComponent extends DefaultComponent {
+
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters)
+            throws Exception {
+            
+            String childUri = remaining;
+            // we need to apply the params here
+            if (parameters != null && parameters.size() > 0) {
+                childUri = childUri + "?" + URISupport.createQueryString(parameters);
+            }
+            // need to clean the parameters to avoid default component verify parameter complain
+            parameters.clear();
+            Endpoint childEndpoint = context.getEndpoint(childUri);
+            return new MyEndpoint(uri, childEndpoint);
+        }
+        
+    }
+    
+    class MyEndpoint extends DefaultEndpoint implements DelegateEndpoint {
+        private final Endpoint childEndpoint;
+        
+        MyEndpoint(String uri, Endpoint childEndpoint) {
+            this.childEndpoint = childEndpoint;
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            return childEndpoint.createProducer();
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return childEndpoint.createConsumer(processor);
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return false;
+        }
+
+        @Override
+        public Endpoint getEndpoint() {
+            return childEndpoint;
+        }
+        
+    }
+
+}


Mime
View raw message