camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [01/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic uris.
Date Mon, 13 Jul 2015 12:37:52 GMT
Repository: camel
Updated Branches:
  refs/heads/master 71b43bc83 -> fe5960aef


CAMEL-4596: pollEnrich supports dynamic uris.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9fd4d549
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9fd4d549
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9fd4d549

Branch: refs/heads/master
Commit: 9fd4d549056c12754c0fa76e253e00580e8ceb7a
Parents: 71b43bc
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun Jul 12 20:54:52 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Jul 13 08:14:47 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ConsumerCache.java    |  1 -
 .../camel/model/PollEnrichDefinition.java       | 62 ++++++++++---
 .../apache/camel/model/ProcessorDefinition.java | 31 +++++++
 .../apache/camel/processor/PollEnricher.java    | 93 +++++++++++++++++---
 .../apache/camel/processor/SendProcessor.java   |  1 -
 .../enricher/PollEnrichExpressionTest.java      | 47 ++++++++++
 6 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
index 6f60f46..d957efe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
@@ -58,7 +58,6 @@ public class ConsumerCache extends ServiceSupport {
         this(source, camelContext, cache, camelContext.getPollingConsumerServicePool());
     }
 
-
     public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer>
cache, ServicePool<Endpoint, PollingConsumer> pool) {
         this.camelContext = camelContext;
         this.consumers = cache;

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 7b6f9d1..a5d7cb1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,12 +19,16 @@ package org.apache.camel.model;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -41,6 +45,8 @@ import org.apache.camel.util.ObjectHelper;
 @XmlRootElement(name = "pollEnrich")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition>
implements EndpointRequiredDefinition {
+    @XmlElementRef
+    private ExpressionDefinition expression;
     @XmlAttribute(name = "uri")
     private String resourceUri;
     // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the
uri, by prefixing with ref:
@@ -94,24 +100,33 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef))
{
-            throw new IllegalArgumentException("Either uri or ref must be provided for resource
endpoint");
+        if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)
&& expression == null) {
+            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression
must be configured");
         }
 
         // lookup endpoint
-        Endpoint endpoint;
+        PollingConsumer consumer = null;
         if (resourceUri != null) {
-            endpoint = routeContext.resolveEndpoint(resourceUri);
-        } else {
-            endpoint = routeContext.resolveEndpoint(null, resourceRef);
+            Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
+            consumer = endpoint.createPollingConsumer();
+        } else if (resourceRef != null) {
+            Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
+            consumer = endpoint.createPollingConsumer();
         }
 
+        // if no timeout then we should block, and there use a negative timeout
+        long time = timeout != null ? timeout : -1;
+
+        // create the expression if any was configured
+        Expression exp = createResourceExpression(routeContext);
+
         PollEnricher enricher;
-        if (timeout != null) {
-            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout);
+        if (exp != null) {
+            enricher = new PollEnricher(null, exp, time);
+        } else if (consumer != null) {
+            enricher = new PollEnricher(null, consumer, time);
         } else {
-            // if no timeout then we should block, and there use a negative timeout
-            enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1);
+            throw new IllegalArgumentException("Either resourceUri, resourceRef or expression
must be configured");
         }
 
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -152,6 +167,20 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
         return strategy;
     }
 
+    /**
+     * Creates the {@link org.apache.camel.Expression} from the expression node to use to
compute the endpoint to poll from.
+     *
+     * @param routeContext  the route context
+     * @return the created expression, or <tt>null</tt> if no expression configured
+     */
+    protected Expression createResourceExpression(RouteContext routeContext) {
+        if (expression != null) {
+            return expression.createExpression(routeContext);
+        } else {
+            return null;
+        }
+    }
+
     public String getResourceUri() {
         return resourceUri;
     }
@@ -257,4 +286,17 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
+
+    public ExpressionDefinition getExpression() {
+        return expression;
+    }
+
+    /**
+     * Sets an expression to use for dynamic computing the endpoint to poll from.
+     * <p/>
+     * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt>
is not in use.
+     */
+    public void setExpression(ExpressionDefinition expression) {
+        this.expression = expression;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index c84ea47..b525c0f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3453,6 +3453,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     }
 
     /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher
EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then
we use <tt>receiveNoWait</tt>
+     * otherwise we use <tt>receive(timeout)</tt>.
+     *
+     * @param expression             to use an expression to dynamically compute the endpoint
to poll from
+     * @param timeout                timeout in millis to wait at most for data to be available.
+     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input
data and additional data.
+     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange,
org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(Expression expression, long timeout, String aggregationStrategyRef,
boolean aggregateOnException) {
+        PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+        pollEnrich.setExpression(new ExpressionDefinition(expression));
+        pollEnrich.setTimeout(timeout);
+        pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
+        pollEnrich.setAggregateOnException(aggregateOnException);
+        addOutput(pollEnrich);
+        return (Type) this;
+    }
+
+    /**
      * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke
this route as
      * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
      * The hook invoke callbacks for either onComplete or onFailure.

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9cbca74..ab313fb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -18,11 +18,15 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
@@ -46,12 +50,15 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see Enricher
  */
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware,
IdAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware,
IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
+    private CamelContext camelContext;
+    private ConsumerCache consumerCache;
     private String id;
     private AggregationStrategy aggregationStrategy;
-    private PollingConsumer consumer;
+    private final PollingConsumer consumer;
+    private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
 
@@ -77,9 +84,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
     public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer,
long timeout) {
         this.aggregationStrategy = aggregationStrategy;
         this.consumer = consumer;
+        this.expression = null;
         this.timeout = timeout;
     }
 
+    /**
+     * Creates a new {@link PollEnricher}.
+     *
+     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional
data.
+     * @param expression expression to use to compute the endpoint to poll from.
+     * @param timeout timeout in millis
+     */
+    public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long
timeout) {
+        this.aggregationStrategy = aggregationStrategy;
+        this.expression = expression;
+        this.consumer = null;
+        this.timeout = timeout;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public String getId() {
         return id;
     }
@@ -89,7 +119,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
     }
 
     public Endpoint getEndpoint() {
-        return consumer.getEndpoint();
+        return consumer != null ? consumer.getEndpoint() : null;
     }
 
     public AggregationStrategy getAggregationStrategy() {
@@ -162,17 +192,35 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
             return true;
         }
 
+        // which consumer to use
+        PollingConsumer target = consumer;
+        Endpoint endpoint = null;
+
+        // use dynamic endpoint so calculate the endpoint to use
+        if (expression != null) {
+            try {
+                Object recipient = expression.evaluate(exchange, Object.class);
+                endpoint = resolveEndpoint(exchange, recipient);
+                // acquire the consumer from the cache
+                target = consumerCache.acquirePollingConsumer(endpoint);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
         Exchange resourceExchange;
         try {
             if (timeout < 0) {
-                LOG.debug("Consumer receive: {}", consumer);
+                LOG.debug("Consumer receive: {}", target);
                 resourceExchange = consumer.receive();
             } else if (timeout == 0) {
-                LOG.debug("Consumer receiveNoWait: {}", consumer);
-                resourceExchange = consumer.receiveNoWait();
+                LOG.debug("Consumer receiveNoWait: {}", target);
+                resourceExchange = target.receiveNoWait();
             } else {
-                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
-                resourceExchange = consumer.receive(timeout);
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
+                resourceExchange = target.receive(timeout);
             }
 
             if (resourceExchange == null) {
@@ -184,6 +232,11 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
             exchange.setException(new CamelExchangeException("Error during poll", exchange,
e));
             callback.done(true);
             return true;
+        } finally {
+            // return the consumer back to the cache
+            if (expression != null) {
+                consumerCache.releasePollingConsumer(endpoint, target);
+            }
         }
 
         try {
@@ -209,9 +262,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
 
             // set header with the uri of the endpoint enriched so we can use that for tracing
etc
             if (exchange.hasOut()) {
-                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
             } else {
-                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
             }
         } catch (Throwable e) {
             exchange.setException(new CamelExchangeException("Error occurred during aggregation",
exchange, e));
@@ -223,6 +276,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
         return true;
     }
 
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            recipient = ((String)recipient).trim();
+        }
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
     /**
      * Strategy to pre check polling.
      * <p/>
@@ -251,11 +312,19 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor,
Endp
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(aggregationStrategy, consumer);
+        if (expression != null && consumerCache == null) {
+            // create consumer cache if we use dynamic expressions for computing the endpoints
to poll
+            consumerCache = new ConsumerCache(this, getCamelContext());
+        }
+        ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, aggregationStrategy);
+        ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+    }
+
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index 884f674..3ab90d6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -118,7 +118,6 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor,
Tra
             return true;
         }
 
-
         // we should preserve existing MEP so remember old MEP
         // if you want to permanently to change the MEP then use .setExchangePattern in the
DSL
         final ExchangePattern existingPattern = exchange.getPattern();

http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
new file mode 100644
index 0000000..4e983a7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionTest extends ContextTestSupport {
+
+    public void testPollEnricExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .pollEnrich(header("source"), 1000, null, false)
+                    .to("mock:result");
+            }
+        };
+    }
+}


Mime
View raw message