camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aka...@apache.org
Subject svn commit: r1051321 [1/2] - in /camel/trunk/components: ./ camel-routebox/ camel-routebox/src/ camel-routebox/src/main/ camel-routebox/src/main/java/ camel-routebox/src/main/java/org/ camel-routebox/src/main/java/org/apache/ camel-routebox/src/main/ja...
Date Tue, 21 Dec 2010 00:01:57 GMT
Author: akarpe
Date: Tue Dec 21 00:01:55 2010
New Revision: 1051321

URL: http://svn.apache.org/viewvc?rev=1051321&view=rev
Log:
CAMEL-3285 Create a new blackbox component(routebox) which can encapsulate routes in an inner context and can direct payloads to inner routes based on a strategy or a dispatch map

Added:
    camel/trunk/components/camel-routebox/
    camel/trunk/components/camel-routebox/pom.xml   (with props)
    camel/trunk/components/camel-routebox/src/
    camel/trunk/components/camel-routebox/src/main/
    camel/trunk/components/camel-routebox/src/main/java/
    camel/trunk/components/camel-routebox/src/main/java/org/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java   (with props)
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java   (with props)
    camel/trunk/components/camel-routebox/src/main/resources/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/LICENSE.txt   (with props)
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/NOTICE.txt   (with props)
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/org/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/org/apache/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/org/apache/camel/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/org/apache/camel/component/
    camel/trunk/components/camel-routebox/src/main/resources/META-INF/services/org/apache/camel/component/routebox
    camel/trunk/components/camel-routebox/src/test/
    camel/trunk/components/camel-routebox/src/test/java/
    camel/trunk/components/camel-routebox/src/test/java/org/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/RouteboxDefaultContextAndRouteBuilderTest.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/RouteboxDirectProducerOnlyTest.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/RouteboxDirectTest.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/RouteboxDispatchMapTest.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/RouteboxSedaTest.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/demo/
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/demo/Book.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/demo/BookCatalog.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/demo/RouteboxDemoTestSupport.java   (with props)
    camel/trunk/components/camel-routebox/src/test/java/org/apache/camel/component/routebox/demo/SimpleRouteBuilder.java   (with props)
    camel/trunk/components/camel-routebox/src/test/resources/
    camel/trunk/components/camel-routebox/src/test/resources/log4j.properties   (with props)
Modified:
    camel/trunk/components/pom.xml

Added: camel/trunk/components/camel-routebox/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/pom.xml?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/pom.xml (added)
+++ camel/trunk/components/camel-routebox/pom.xml Tue Dec 21 00:01:55 2010
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>components</artifactId>
+		<groupId>org.apache.camel</groupId>
+		<version>2.6-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>camel-routebox</artifactId>
+	<packaging>bundle</packaging>
+	<name>Camel :: Routebox </name>
+	<description>Camel Routebox component to encapsulate routes</description>
+
+	<properties>
+		<camel.osgi.export.pkg>
+		    org.apache.camel.component.routebox.*,
+		    org.apache.camel.component.routebox.seda.*
+		</camel.osgi.export.pkg>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- logging -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+</project>

Propchange: camel/trunk/components/camel-routebox/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.routebox.direct.RouteboxDirectEndpoint;
+import org.apache.camel.component.routebox.seda.RouteboxSedaEndpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class RouteboxComponent extends DefaultComponent {
+    RouteboxConfiguration config;
+    private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
+    
+    public RouteboxComponent() {
+        config = new RouteboxConfiguration();
+    }
+
+    public RouteboxComponent(CamelContext context) {
+        super(context);
+        config = new RouteboxConfiguration();
+    }
+    
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+        throws Exception {
+        RouteboxEndpoint blackboxRouteEndpoint = null;
+        
+        config.parseURI(new URI(uri), parameters, this);
+        if (config.getInnerProtocol().equalsIgnoreCase("direct")) {
+            blackboxRouteEndpoint = new RouteboxDirectEndpoint(uri, this, config);
+            setProperties(blackboxRouteEndpoint.getConfig(), parameters);
+        } else {
+            String baseUri = getQueueKey(uri);
+            blackboxRouteEndpoint = new RouteboxSedaEndpoint(uri, this, config, createQueue(baseUri, parameters));
+            setProperties(blackboxRouteEndpoint.getConfig(), parameters);
+        }
+        
+        return blackboxRouteEndpoint;
+    }
+
+    public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) {
+        if (queues.containsKey(uri)) {
+            return queues.get(uri);
+        }
+
+        // create queue
+        BlockingQueue<Exchange> queue;
+        Integer size = config.getQueueSize();
+        if (size != null && size > 0) {
+            queue = new LinkedBlockingQueue<Exchange>(size);
+        } else {
+            queue = new LinkedBlockingQueue<Exchange>();
+        }
+
+        queues.put(uri, queue);
+        return queue;
+    }
+    
+    protected String getQueueKey(String uri) {
+        if (uri.contains("?")) {
+            // strip parameters
+            uri = uri.substring(0, uri.indexOf('?'));
+        }
+        return uri;
+    }
+    
+    @Override
+    protected void doStop() throws Exception {
+        queues.clear();
+        super.doStop();
+    }
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultProducerTemplate;
+import org.apache.camel.spi.Registry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RouteboxConfiguration {
+    private static final transient Log LOG = LogFactory.getLog(RouteboxConfiguration.class);
+    private URI uri;
+    private String authority;
+    private String endpointName;
+    private URI consumerUri;
+    private URI producerUri;
+    private RouteboxDispatchStrategy dispatchStrategy;
+    private Map<String, String> dispatchMap;
+    private CamelContext innerContext;
+    private List<RouteBuilder> routeBuilders = new ArrayList<RouteBuilder>();
+    private Registry innerRegistry;
+    private boolean forkContext = true;
+    private boolean local = true;
+    private long connectionTimeout = 20000;
+    private long pollInterval = 1000;
+    private String innerProtocol;
+    private int threads = 20;
+    private int queueSize;
+    private ProducerTemplate innerProducerTemplate;
+    private boolean sendToConsumer = true;
+
+    public RouteboxConfiguration() {
+    }
+
+    public RouteboxConfiguration(URI uri) throws Exception {
+        this();
+        this.uri = uri;
+    }
+
+    public void parseURI(URI uri, Map<String, Object> parameters, RouteboxComponent component) throws Exception {
+        String protocol = uri.getScheme();
+        
+        if (!protocol.equalsIgnoreCase("routebox")) {
+            throw new IllegalArgumentException("Unrecognized protocol: " + protocol + " for uri: " + uri);
+        }
+        
+        setUri(uri);
+        setAuthority(uri.getAuthority());
+        LOG.info("Authority: " + uri.getAuthority());
+        
+        setEndpointName(getAuthority());
+        
+        if (parameters.containsKey("threads")) {
+            setThreads(Integer.valueOf((String) parameters.get("threads")));
+        }
+        
+        if (parameters.containsKey("forkContext")) {
+            if (!(Boolean.valueOf((String) parameters.get("forkContext")))) {
+                setForkContext(false);
+            }
+        }
+        
+        if (parameters.containsKey("innerProtocol")) {
+            setInnerProtocol((String) parameters.get("innerProtocol"));
+            if ((!innerProtocol.equalsIgnoreCase("seda")) && (!innerProtocol.equalsIgnoreCase("direct"))) {
+                throw new IllegalArgumentException("Unrecognized inner protocol: " + innerProtocol + " for uri: " + uri);
+            }
+        } else {
+            setInnerProtocol("direct");
+        }
+        
+        if (parameters.containsKey("sendToConsumer")) {
+            if (!Boolean.valueOf((String) parameters.get("sendToConsumer"))) {
+                setSendToConsumer(false);
+            }
+        }
+        
+        if (parameters.containsKey("connectionTimeout")) {
+            setConnectionTimeout(Long.parseLong((String) parameters.get("connectionTimeout")));
+        }
+        
+        if (parameters.containsKey("pollInterval")) {
+            setConnectionTimeout(Long.parseLong((String) parameters.get("pollInterval")));
+        }
+        
+        if (parameters.containsKey("routeBuilders")) {
+            routeBuilders = (List<RouteBuilder>) component.resolveAndRemoveReferenceParameter(parameters, "routeBuilders", List.class);
+        }
+        
+        if (parameters.containsKey("innerRegistry")) {
+            innerRegistry = (Registry) component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class);
+        }
+        
+        if (isForkContext()) {
+            if (innerRegistry != null) {
+                innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry));
+            } else {
+                innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext());
+            }
+
+        } else {
+            innerContext = component.getCamelContext();
+        }
+        
+        //configureInnerContext();
+        innerProducerTemplate = new DefaultProducerTemplate(innerContext);
+        innerProducerTemplate.start();
+        setQueueSize(component.getAndRemoveParameter(parameters, "size", Integer.class, 0));
+        consumerUri = component.resolveAndRemoveReferenceParameter(parameters, "consumerUri", URI.class, new URI("routebox:" + getEndpointName()));
+        producerUri = component.resolveAndRemoveReferenceParameter(parameters, "producerUri", URI.class, new URI("routebox:" + getEndpointName()));        
+        
+        dispatchStrategy = component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy", RouteboxDispatchStrategy.class, null);
+        dispatchMap = (HashMap<String, String>) component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap", HashMap.class, new HashMap<String, String>());
+        if ((dispatchStrategy == null) && (dispatchMap == null)) { 
+            LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route");
+        }        
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public String getAuthority() {
+        return authority;
+    }
+
+    public void setAuthority(String authority) {
+        this.authority = authority;
+    }
+
+    public CamelContext getInnerContext() {
+        return innerContext;
+    }
+
+    public void setInnerContext(CamelContext innerContext) {
+        this.innerContext = innerContext;
+    }
+
+    public void setRouteBuilders(List<RouteBuilder> routeBuilders) {
+        this.routeBuilders = routeBuilders;
+    }
+
+    public List<RouteBuilder> getRouteBuilders() {
+        return routeBuilders;
+    }
+
+    public void setForkContext(boolean forkContext) {
+        this.forkContext = forkContext;
+    }
+    
+    public boolean isForkContext() {
+        return forkContext;
+    }
+
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    public int getThreads() {
+        return threads;
+    }
+
+    public void setEndpointName(String endpointName) {
+        this.endpointName = endpointName;
+    }
+
+    public String getEndpointName() {
+        return endpointName;
+    }
+
+    public void setLocal(boolean local) {
+        this.local = local;
+    }
+
+    public boolean isLocal() {
+        return local;
+    }
+
+    public void setProducerUri(URI producerUri) {
+        this.producerUri = producerUri;
+    }
+
+    public URI getProducerUri() {
+        return producerUri;
+    }
+
+    public void setConsumerUri(URI consumerUri) {
+        this.consumerUri = consumerUri;
+    }
+
+    public URI getConsumerUri() {
+        return consumerUri;
+    }
+
+    public void setDispatchStrategy(RouteboxDispatchStrategy dispatchStrategy) {
+        this.dispatchStrategy = dispatchStrategy;
+    }
+
+    public RouteboxDispatchStrategy getDispatchStrategy() {
+        return dispatchStrategy;
+    }
+
+    public void setConnectionTimeout(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public long getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public long getPollInterval() {
+        return pollInterval;
+    }
+
+    public void setPollInterval(long pollInterval) {
+        this.pollInterval = pollInterval;
+    }
+
+    public void setQueueSize(int queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public void setInnerProducerTemplate(ProducerTemplate innerProducerTemplate) {
+        this.innerProducerTemplate = innerProducerTemplate;
+    }
+
+    public ProducerTemplate getInnerProducerTemplate() {
+        return innerProducerTemplate;
+    }
+
+    public void setInnerProtocol(String innerProtocol) {
+        this.innerProtocol = innerProtocol;
+    }
+
+    public String getInnerProtocol() {
+        return innerProtocol;
+    }
+
+    public void setInnerRegistry(Registry innerRegistry) {
+        this.innerRegistry = innerRegistry;
+    }
+
+    public Registry getInnerRegistry() {
+        return innerRegistry;
+    }
+
+    public void setSendToConsumer(boolean sendToConsumer) {
+        this.sendToConsumer = sendToConsumer;
+    }
+
+    public boolean isSendToConsumer() {
+        return sendToConsumer;
+    }
+
+    public void setDispatchMap(Map<String, String> dispatchMap) {
+        this.dispatchMap = dispatchMap;
+    }
+
+    public Map<String, String> getDispatchMap() {
+        return dispatchMap;
+    }
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+import org.apache.camel.Consumer;
+
+public interface RouteboxConsumer extends Consumer {
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public abstract class RouteboxEndpoint extends DefaultEndpoint {
+    RouteboxConfiguration config;
+
+    public RouteboxEndpoint() {
+        super();
+    }
+    
+    public RouteboxEndpoint(String endpointUri) {
+        super(endpointUri);
+    }
+    
+    public RouteboxEndpoint(String endpointUri, CamelContext camelContext) {
+        super(endpointUri, camelContext);
+    }
+
+    public RouteboxEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public RouteboxEndpoint(String endpointUri, Component component, RouteboxConfiguration config) {
+        super(endpointUri, component);
+        this.config = config;
+    }  
+
+    public RouteboxConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(RouteboxConfiguration config) {
+        this.config = config;
+    }
+    
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+public interface RouteboxProducer {
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class RouteboxServiceSupport extends ServiceSupport {
+    private static final transient Log LOG = LogFactory.getLog(RouteboxServiceSupport.class);
+    private RouteboxEndpoint endpoint;
+    private ExecutorService executor;
+    private int pendingExchanges;
+    private boolean startedInnerContext;
+    
+    public RouteboxServiceSupport(RouteboxEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+    
+    protected void doStopInnerContext() throws Exception {
+        CamelContext context = endpoint.getConfig().getInnerContext();
+        context.stop();
+        setStartedInnerContext(false);
+    }
+
+    protected void doStartInnerContext() throws Exception {
+        // Add Route Builders and definitions to the inner camel context and start the context
+        CamelContext context = endpoint.getConfig().getInnerContext();
+        List<RouteBuilder> routeBuildersList = endpoint.getConfig().getRouteBuilders();
+        if (!(routeBuildersList.isEmpty())) {
+            for (RouteBuilder routeBuilder : routeBuildersList) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding routebuilder " + routeBuilder + " to " + context.getName());
+                }
+                context.addRoutes(routeBuilder);
+            }
+        }       
+        
+        context.start();
+        setStartedInnerContext(true);
+    }
+
+    public void setPendingExchanges(int pendingExchanges) {
+        this.pendingExchanges = pendingExchanges;
+    }
+
+    public int getPendingExchanges() {
+        return pendingExchanges;
+    }
+
+    public RouteboxEndpoint getRouteboxEndpoint() {
+        return endpoint;
+    }
+
+    public void setRouteboxEndpoint(RouteboxEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public ExecutorService getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(ExecutorService executor) {
+        this.executor = executor;
+    }
+
+
+    public void setStartedInnerContext(boolean startedInnerContext) {
+        this.startedInnerContext = startedInnerContext;
+    }
+
+
+    public boolean isStartedInnerContext() {
+        return startedInnerContext;
+    }
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.direct;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxServiceSupport;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ShutdownAware;
+
+public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer, ShutdownAware, SuspendableService {
+    protected ProducerTemplate producer;
+    private final Processor processor;
+    private volatile AsyncProcessor asyncProcessor;
+    private ExceptionHandler exceptionHandler;
+
+    public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor processor) {
+        super(endpoint);
+        this.processor = processor;
+        producer = endpoint.getConfig().getInnerProducerTemplate();
+    }
+    
+    protected void doStart() throws Exception {
+        // add consumer to endpoint
+        boolean existing = this == ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer();
+        if (!existing && ((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) {
+            throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one consumer.");
+        }
+        if (!existing) {
+            ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this);
+        }
+        
+        // now start the inner context
+        if (!isStartedInnerContext()) {
+            doStartInnerContext(); 
+        }
+        
+    }
+
+    protected void doStop() throws Exception {
+        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+        
+        // now stop the inner context
+        if (isStartedInnerContext()) {
+            doStopInnerContext();
+        }
+
+    }
+
+    protected void doSuspend() throws Exception {
+        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+    }
+
+    protected void doResume() throws Exception {
+        // resume by using the start logic
+        doStart();
+    }
+    
+    public Exchange processRequest(Exchange exchange) {
+        return exchange;
+        
+    }
+    
+    /**
+     * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
+     * processor on the consumer. If the processor does not implement the interface,
+     * it will be adapted so that it does.
+     */
+    public synchronized AsyncProcessor getAsyncProcessor() {
+        if (asyncProcessor == null) {            
+            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+        }
+        return asyncProcessor;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    /**
+     * Handles the given exception using the {@link #getExceptionHandler()}
+     * 
+     * @param t the exception to handle
+     */
+    protected void handleException(Throwable t) {
+        Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
+        getExceptionHandler().handleException(newt);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
+     */
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // deny stopping on shutdown as we want direct consumers to run in case some other queues
+        // depend on this consumer to run, so it can complete its exchanges
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
+     */
+    public int getPendingExchangesSize() {
+        // return 0 as we do not have an internal memory queue with a variable size
+        // of inflight messages. 
+        return 0;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
+     */
+    public void prepareShutdown() {
+        
+    }
+    
+    public Endpoint getEndpoint() {
+        return (Endpoint) getRouteboxEndpoint();
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.direct;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.routebox.RouteboxConfiguration;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
+
+public class RouteboxDirectEndpoint extends RouteboxEndpoint {
+    private volatile Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>();
+
+    public RouteboxDirectEndpoint(String endpointUri) {
+        super(endpointUri);
+    }
+
+    public RouteboxDirectEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public RouteboxDirectEndpoint(String uri, Component component, RouteboxConfiguration config) {
+        super(uri, component, config);
+    }
+
+    public Producer createProducer() throws Exception {
+        return new RouteboxDirectProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new RouteboxDirectConsumer(this, processor);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void addConsumer(RouteboxDirectConsumer consumer) {
+        String key = consumer.getEndpoint().getEndpointKey();
+        consumers.put(key, consumer);
+    }
+
+    public void removeConsumer(RouteboxDirectConsumer consumer) {
+        String key = consumer.getEndpoint().getEndpointKey();
+        consumers.remove(key);
+    }
+
+    public boolean hasConsumer(RouteboxDirectConsumer consumer) {
+        String key = consumer.getEndpoint().getEndpointKey();
+        return consumers.containsKey(key);
+    }
+
+    public RouteboxDirectConsumer getConsumer() {
+        String key = getEndpointKey();
+        return consumers.get(key);
+    }
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.direct;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.routebox.RouteboxServiceSupport;
+import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RouteboxDirectProducer extends RouteboxServiceSupport implements Producer, AsyncProcessor {
+    private static final transient Log LOG = LogFactory.getLog(RouteboxDirectProducer.class);
+    protected ProducerTemplate producer;
+    private ExceptionHandler exceptionHandler;
+    
+    public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) {
+        super(endpoint);
+        producer = endpoint.getConfig().getInnerProducerTemplate();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        Exchange result = null;
+        
+        if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) {
+            throw new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("**** Dispatching to Inner Route ****");
+            }
+            RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
+            result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange);
+        }
+        if (getRouteboxEndpoint().getConfig().isSendToConsumer()) {
+            ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor().process(result);
+        }
+    }
+
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        boolean flag = true;
+        
+        if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) 
+            && (((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
+            exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange));
+            callback.done(true);
+            flag = true;
+        } else {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("**** Dispatching to Inner Route ****");
+                }
+                
+                RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
+                exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange);      
+                if (getRouteboxEndpoint().getConfig().isSendToConsumer()) {
+                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
+                    flag = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() {
+                        public void done(boolean doneSync) {
+                            // we only have to handle async completion of this policy
+                            if (doneSync) {
+                                return;
+                            }
+                            callback.done(false);
+                        }
+                    });
+                } 
+                callback.done(true);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+        return flag;
+    }
+    
+    protected void doStart() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Starting producer: " + this);
+        }
+        
+        if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
+            // start an inner context
+            if (!isStartedInnerContext()) {
+                doStartInnerContext(); 
+            }
+        }
+        
+    }
+
+    protected void doStop() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping producer: " + this);
+        }
+        
+        if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {        
+            // stop the inner context
+            if (isStartedInnerContext()) {
+                doStopInnerContext();
+            }
+        }
+
+    }
+    
+    @Override
+    public String toString() {
+        return "Producer[" + getRouteboxEndpoint()
+                .getEndpointUri() + "]";
+    }
+
+    public Endpoint getEndpoint() {
+        return getRouteboxEndpoint();
+    }
+
+    public Exchange createExchange() {
+        return getRouteboxEndpoint()
+                .createExchange();
+    }
+
+    public Exchange createExchange(ExchangePattern pattern) {
+        return getRouteboxEndpoint()
+                .createExchange(pattern);
+    }
+
+    public Exchange createExchange(Exchange exchange) {
+        return getRouteboxEndpoint()
+                .createExchange(exchange);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.seda;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxServiceSupport;
+import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RouteboxSedaConsumer extends RouteboxServiceSupport implements RouteboxConsumer, Runnable, ShutdownAware {
+    private static final transient Log LOG = LogFactory.getLog(RouteboxSedaConsumer.class);
+    protected AsyncProcessor processor;
+    protected ProducerTemplate producer;
+    private int pendingExchanges;
+    private ExceptionHandler exceptionHandler;
+    
+    public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) {
+        super(endpoint);
+        this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
+        producer = endpoint.getConfig().getInnerProducerTemplate();
+        producer.setMaximumCacheSize(endpoint.getConfig().getThreads());
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.impl.ServiceSupport#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this);
+        doStartInnerContext(); 
+        
+        // Create a URI link from the primary context to routes in the new inner context
+        int poolSize = getRouteboxEndpoint().getConfig().getThreads();
+        setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy()
+                        .newFixedThreadPool(this, ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize));
+        for (int i = 0; i < poolSize; i++) {
+            getExecutor().execute((Runnable) this);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.impl.ServiceSupport#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this);
+        // Shutdown the executor
+        ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
+        setExecutor(null);
+        
+        doStopInnerContext(); 
+    }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {       
+        BlockingQueue<Exchange> queue = ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue();
+        while (queue != null && isRunAllowed()) {
+            try {
+                final Exchange exchange = queue.poll(getRouteboxEndpoint().getConfig().getPollInterval(), TimeUnit.MILLISECONDS);
+                dispatchToInnerRoute(queue, exchange);
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+                }
+                continue;
+            }
+        }
+    }
+    
+    private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final Exchange exchange) throws InterruptedException {
+        Exchange result = null;
+
+        if (exchange != null) {
+            if (isRunAllowed()) {
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("**** Dispatching to Inner Route ****");
+                    }
+                    RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
+                    result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); 
+                    AsyncProcessorHelper.process(processor, result, new AsyncCallback() {
+                        public void done(boolean doneSync) {
+                            // noop
+                        }
+                    });
+                } catch (Exception e) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                }
+            } else {
+                if (LOG.isWarnEnabled()) {
+                    LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange);
+                }                
+                queue.put(exchange);
+            }
+        }
+    }
+    
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.Consumer#getEndpoint()
+     */
+    public Endpoint getEndpoint() {
+        return (Endpoint) getRouteboxEndpoint();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
+     */
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
+     */
+    public int getPendingExchangesSize() {
+        return getPendingExchanges();
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
+     */
+    public void prepareShutdown() {
+        
+    }
+    
+    public void setProcessor(AsyncProcessor processor) {
+        this.processor = processor;
+    }
+
+    public AsyncProcessor getProcessor() {
+        return processor;
+    }
+
+    public void setPendingExchanges(int pendingExchanges) {
+        this.pendingExchanges = pendingExchanges;
+    }
+
+    public int getPendingExchanges() {
+        return pendingExchanges;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
+    }
+    
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.seda;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.component.routebox.RouteboxComponent;
+import org.apache.camel.component.routebox.RouteboxConfiguration;
+import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
+import org.apache.camel.component.routebox.RouteboxProducer;
+import org.apache.camel.spi.BrowsableEndpoint;
+
+public class RouteboxSedaEndpoint extends RouteboxEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
+    private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
+    private volatile BlockingQueue<Exchange> queue;
+    private volatile Set<RouteboxProducer> producers = new CopyOnWriteArraySet<RouteboxProducer>();
+    private volatile Set<RouteboxConsumer> consumers = new CopyOnWriteArraySet<RouteboxConsumer>();
+
+    public RouteboxSedaEndpoint(String endpointUri, RouteboxComponent component, RouteboxConfiguration config) throws Exception {
+        super(endpointUri, component, config);
+    }
+
+    public RouteboxSedaEndpoint(String endpointUri, RouteboxComponent component, RouteboxConfiguration config, BlockingQueue<Exchange> queue) throws Exception {
+        this(endpointUri, component, config);
+        this.queue = queue;
+    }
+    
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new RouteboxSedaConsumer(this, processor);        
+    }
+
+    public Producer createProducer() throws Exception {
+        return new RouteboxSedaProducer(this, queue, getWaitForTaskToComplete(), getConfig().getConnectionTimeout());
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void onStarted(RouteboxProducer producer) {
+        producers.add(producer);
+    }
+
+    public void onStopped(RouteboxProducer producer) {
+        producers.remove(producer);
+    }
+
+    public void onStarted(RouteboxConsumer consumer) {
+        consumers.add(consumer);
+    }
+
+    public void onStopped(RouteboxConsumer consumer) {
+        consumers.remove(consumer);
+    }
+
+    public Set<RouteboxConsumer> getConsumers() {
+        return new HashSet<RouteboxConsumer>(consumers);
+    }
+
+    public Set<RouteboxProducer> getProducers() {
+        return new HashSet<RouteboxProducer>(producers);
+    }
+
+    public void setQueue(BlockingQueue<Exchange> queue) {
+        this.queue = queue;
+    }
+
+    public WaitForTaskToComplete getWaitForTaskToComplete() {
+        return waitForTaskToComplete;
+    }
+
+    public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
+        this.waitForTaskToComplete = waitForTaskToComplete;
+    }
+
+    public BlockingQueue<Exchange> getQueue() {
+        if (queue == null) {
+            if (getConfig().getQueueSize() > 0) {
+                queue = new LinkedBlockingQueue<Exchange>(getConfig().getQueueSize());
+            } else {
+                queue = new LinkedBlockingQueue<Exchange>();
+            }
+        }
+        return queue;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported()
+     */
+    public boolean isMultipleConsumersSupported() {
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges()
+     */
+    public List<Exchange> getExchanges() {
+        return new ArrayList<Exchange>(getQueue());
+    }
+
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.seda;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.component.routebox.RouteboxProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.SynchronizationAdapter;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ * @version $Revision: 955924 $
+ */
+public class RouteboxSedaProducer extends DefaultAsyncProducer implements RouteboxProducer {
+    protected final Collection<Exchange> queue;
+    private final RouteboxSedaEndpoint endpoint;
+    private final WaitForTaskToComplete waitForTaskToComplete;
+    private final long timeout;
+
+    public RouteboxSedaProducer(RouteboxSedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.queue = queue;
+        this.waitForTaskToComplete = waitForTaskToComplete;
+        this.timeout = timeout;
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        // use a new copy of the exchange to route async and handover the on completion to the new copy
+        // so its the new copy that performs the on completion callback when its done
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+        // set a new from endpoint to be the seda queue
+        copy.setFromEndpoint(endpoint);
+
+        WaitForTaskToComplete wait = waitForTaskToComplete;
+        if (exchange.getProperty(Exchange.ASYNC_WAIT) != null) {
+            wait = exchange.getProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
+        }
+
+        if (wait == WaitForTaskToComplete.Always
+            || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
+
+            // latch that waits until we are complete
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            // we should wait for the reply so install a on completion so we know when its complete
+            copy.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onDone(Exchange response) {
+                    // check for timeout, which then already would have invoked the latch
+                    if (latch.getCount() == 0) {
+                        if (log.isTraceEnabled()) {
+                            log.trace(this + ". Timeout occurred so response will be ignored: " + (response.hasOut() ? response.getOut() : response.getIn()));
+                        }
+                        return;
+                    } else {
+                        if (log.isTraceEnabled()) {
+                            log.trace(this + " with response: " + (response.hasOut() ? response.getOut() : response.getIn()));
+                        }
+                        try {
+                            ExchangeHelper.copyResults(exchange, response);
+                        } finally {
+                            // always ensure latch is triggered
+                            latch.countDown();
+                        }
+                    }
+                }
+
+                @Override
+                public boolean allowHandover() {
+                    return false;
+                }
+
+                @Override
+                public String toString() {
+                    return "onDone at [" + endpoint.getEndpointUri() + "]";
+                }
+            });
+
+            queue.add(copy);
+
+            if (timeout > 0) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Waiting for task to complete using timeout (ms): " + timeout + " at [" + endpoint.getEndpointUri() + "]");
+                }
+                // lets see if we can get the task done before the timeout
+                boolean done = false;
+                try {
+                    done = latch.await(timeout, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                if (!done) {
+                    exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+                    // count down to indicate timeout
+                    latch.countDown();
+                }
+            } else {
+                if (log.isTraceEnabled()) {
+                    log.trace("Waiting for task to complete (blocking) at [" + endpoint.getEndpointUri() + "]");
+                }
+                // no timeout then wait until its done
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        } else {
+            // no wait, eg its a InOnly then just add to queue and return
+            queue.add(copy);
+        }
+
+        // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
+        // so we should just signal the callback we are done synchronously
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        endpoint.onStarted(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        endpoint.onStopped(this);
+        super.doStop();
+    }
+}

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.strategy;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A strategy for identifying the route consumer in the routebox where the exchange should to be dispatched
+ */
+public interface RouteboxDispatchStrategy {
+    /**
+     * Receives an incoming exchange and consumer list and identifies the inner route consumer for dispatching the exchange
+     *
+     * @param innerRouteConsumers the list of possible real-time inner route consumers available 
+     *        to where the exchange can be dispatched in the routebox
+     * @param exchange the incoming exchange
+     * @return a selected consumer to whom the exchange can be directed
+     */
+    URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws Exception;
+} 

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1051321&view=auto
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java (added)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java Tue Dec 21 00:01:55 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.routebox.strategy;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
+import org.apache.camel.impl.SynchronizationAdapter;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class RouteboxDispatcher {
+    private static final transient Log LOG = LogFactory.getLog(RouteboxDispatcher.class);
+    private ProducerTemplate producer;
+    
+    public RouteboxDispatcher(ProducerTemplate producer) {
+        super();
+        this.producer = producer;
+    }
+
+    public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
+        URI dispatchUri = null;
+        Exchange reply = null;
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+        }
+        
+        dispatchUri = selectDispatchUri(endpoint, exchange);
+        
+        if (exchange.getPattern() == ExchangePattern.InOnly) {
+            reply = producer.send(dispatchUri.toASCIIString(), exchange);
+        } else {
+            reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());        
+        }
+
+        return reply;
+    }
+    
+    public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
+        URI dispatchUri = null;
+        Exchange reply = null;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+        }
+        
+        dispatchUri = selectDispatchUri(endpoint, exchange);
+        
+        if (exchange.getPattern() == ExchangePattern.InOnly) {
+            producer.asyncSend(dispatchUri.toASCIIString(), exchange);
+            reply = exchange;
+        } else {
+            Future<Exchange> future = producer.asyncCallback(dispatchUri.toASCIIString(), exchange, new SynchronizationAdapter());
+            reply = future.get(endpoint.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS);
+        }
+        
+        return reply;
+    }
+    
+    protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
+        URI dispatchUri = null;
+        
+        List<URI> consumerUris = getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
+        if (consumerUris.isEmpty()) {
+            throw new CamelException("No routes found for dispatch in Routebox");
+        } else if (consumerUris.size() == 1) {
+            dispatchUri = consumerUris.get(0);
+        } else {
+            if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
+                //apply URI string found in dispatch Map
+                if (endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"))) {             
+                    dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")));
+                } else {
+                    throw new CamelException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"));
+                }
+            } else {
+                //apply dispatch strategy
+                dispatchUri = endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, exchange);
+                if (dispatchUri == null) {
+                    throw new CamelException("No matching inner routes found for Operation");
+                }
+            }
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dispatch URI set to: " + dispatchUri.toASCIIString());
+        }
+        
+        return dispatchUri;
+    }
+    
+    protected List<URI> getInnerContextConsumerList(CamelContext context) throws URISyntaxException {
+        List<URI> consumerList = new ArrayList<URI>();
+        List<RouteDefinition> routeDefinitions = context.getRouteDefinitions();
+        for (RouteDefinition routeDefinition : routeDefinitions) {
+            List<FromDefinition> inputs = routeDefinition.getInputs();
+            for (FromDefinition input : inputs) {
+                consumerList.add(new URI(input.getUri()));
+            }
+        }
+        return consumerList;
+    }
+    
+    public Exchange issueRequest(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
+        Exchange exchange = producer.send(endpoint, pattern, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                Message in = exchange.getIn();
+                for (Map.Entry<String, Object> header : headers.entrySet()) {
+                    in.setHeader(header.getKey(), header.getValue());
+                }
+                in.setBody(body);
+            }
+        });
+        
+        return exchange;
+    }
+    
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message