camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r1165283 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/direct/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/impl...
Date Mon, 05 Sep 2011 13:54:45 GMT
Author: cschneider
Date: Mon Sep  5 13:54:44 2011
New Revision: 1165283

URL: http://svn.apache.org/viewvc?rev=1165283&view=rev
Log:
CAMEL-4417 Move AsnycProcessorTypeConverter.convert method to processor while keeping the
old one for compatibility

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
    camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
Mon Sep  5 13:54:44 2011
@@ -21,7 +21,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class DirectProducer extends Defa
             callback.done(true);
             return true;
         } else {
-            AsyncProcessor processor = AsyncProcessorTypeConverter.convert(endpoint.getConsumer().getProcessor());
+            AsyncProcessor processor = AsyncProcessorConverterHelper.convert(endpoint.getConsumer().getProcessor());
             return AsyncProcessorHelper.process(processor, exchange, callback);
         }
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Mon Sep  5 13:54:44 2011
@@ -31,7 +31,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.LoggingExceptionHandler;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
@@ -61,7 +61,7 @@ public class SedaConsumer extends Servic
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = AsyncProcessorTypeConverter.convert(processor);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Mon Sep
 5 13:54:44 2011
@@ -20,7 +20,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
@@ -64,7 +64,7 @@ public class DefaultConsumer extends Ser
      */
     public synchronized AsyncProcessor getAsyncProcessor() {
         if (asyncProcessor == null) {            
-            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+            asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         }
         return asyncProcessor;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Mon Sep
 5 13:54:44 2011
@@ -30,7 +30,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.support.ServiceSupport;
@@ -281,7 +281,7 @@ public class ProducerCache extends Servi
 
         try {
             // invoke the callback
-            AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(producer);
+            AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
             sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange,
pattern, new AsyncCallback() {
                 @Override
                 public void done(boolean doneSync) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
Mon Sep  5 13:54:44 2011
@@ -16,13 +16,12 @@
  */
 package org.apache.camel.impl.converter;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 
 /**
  * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}.
@@ -33,49 +32,11 @@ import org.apache.camel.processor.Delega
  */
 public class AsyncProcessorTypeConverter implements TypeConverter {
 
-    private static final class ProcessorToAsyncProcessorBridge extends DelegateProcessor
implements AsyncProcessor {
-
-        private ProcessorToAsyncProcessorBridge(Processor processor) {
-            super(processor);
-        }
-
-        public boolean process(Exchange exchange, AsyncCallback callback) {
-            if (processor == null) {
-                // no processor then we are done
-                callback.done(true);
-                return true;
-            }
-            try {
-                processor.process(exchange);
-            } catch (Throwable e) {
-                // must catch throwable so we catch all
-                exchange.setException(e);
-            } finally {
-                // we are bridging a sync processor as async so callback with true
-                callback.done(true);
-            }
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            if (processor != null) {
-                return processor.toString();
-            } else {
-                return "Processor is null";
-            }
-        }
-    }
-
     public <T> T convertTo(Class<T> type, Object value) {
         if (value != null) {
             if (type.equals(AsyncProcessor.class)) {
-                if (value instanceof AsyncProcessor) {
-                    return type.cast(value);
-                } else if (value instanceof Processor) {
-                    // Provide an async bridge to the regular processor.
-                    final Processor processor = (Processor)value;
-                    return type.cast(new ProcessorToAsyncProcessorBridge(processor));
+                if (value instanceof Processor) {
+                    return type.cast(AsyncProcessorConverterHelper.convert((Processor)value));
                 }
             }
         }
@@ -93,11 +54,14 @@ public class AsyncProcessorTypeConverter
     public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object
value) throws NoTypeConversionAvailableException {
         return convertTo(type, exchange, value);
     }
-
+    
+    /**
+     * @deprecated use AnycProcessorConverter.convert instead
+     * @param value
+     * @return
+     */
+    @Deprecated
     public static AsyncProcessor convert(Processor value) {
-        if (value instanceof AsyncProcessor) {
-            return (AsyncProcessor)value;
-        }
-        return new ProcessorToAsyncProcessorBridge(value);
+        return AsyncProcessorConverterHelper.convert(value);
     }
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java?rev=1165283&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java
Mon Sep  5 13:54:44 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}.
+ * Processing will still occur synchronously but it will provide the required
+ * notifications that the caller expects.
+ *
+ * @version 
+ */
+public final class AsyncProcessorConverterHelper {
+    
+    private AsyncProcessorConverterHelper() {
+        // Helper class
+    }
+
+    private static final class ProcessorToAsyncProcessorBridge extends DelegateProcessor
implements AsyncProcessor {
+
+        private ProcessorToAsyncProcessorBridge(Processor processor) {
+            super(processor);
+        }
+
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            if (processor == null) {
+                // no processor then we are done
+                callback.done(true);
+                return true;
+            }
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                // must catch throwable so we catch all
+                exchange.setException(e);
+            } finally {
+                // we are bridging a sync processor as async so callback with true
+                callback.done(true);
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            if (processor != null) {
+                return processor.toString();
+            } else {
+                return "Processor is null";
+            }
+        }
+    }
+
+    public static AsyncProcessor convert(Processor value) {
+        if (value instanceof AsyncProcessor) {
+            return (AsyncProcessor)value;
+        }
+        return new ProcessorToAsyncProcessorBridge(value);
+    }
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Mon
Sep  5 13:54:44 2011
@@ -25,7 +25,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -46,7 +45,7 @@ public class ChoiceProcessor extends Ser
 
     public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise) {
         this.filters = filters;
-        this.otherwise = AsyncProcessorTypeConverter.convert(otherwise);
+        this.otherwise = AsyncProcessorConverterHelper.convert(otherwise);
     }
 
     public void process(Exchange exchange) throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Mon
Sep  5 13:54:44 2011
@@ -29,7 +29,6 @@ import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Service;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.processor.interceptor.TraceFormatter;
@@ -301,7 +300,7 @@ public class DefaultChannel extends Serv
             exchange.getUnitOfWork().pushRouteContext(routeContext);
         }
 
-        AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+        AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
         boolean sync = async.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 // pop the route context we just used

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
Mon Sep  5 13:54:44 2011
@@ -24,7 +24,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -53,7 +52,7 @@ public class DelegateAsyncProcessor exte
     }
 
     public DelegateAsyncProcessor(Processor processor) {
-        this(AsyncProcessorTypeConverter.convert(processor));
+        this(AsyncProcessorConverterHelper.convert(processor));
     }
 
     @Override
@@ -70,7 +69,7 @@ public class DelegateAsyncProcessor exte
     }
 
     public void setProcessor(Processor processor) {
-        this.processor = AsyncProcessorTypeConverter.convert(processor);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     protected void doStart() throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Mon Sep
 5 13:54:44 2011
@@ -22,7 +22,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -109,7 +108,7 @@ public class Enricher extends ServiceSup
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
 
-        AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer);
+        AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
         boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback()
{
             public void done(boolean doneSync) {
                 // we only have to handle async completion of the routing slip

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java
Mon Sep  5 13:54:44 2011
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 
@@ -53,7 +52,7 @@ public class InterceptorToAsyncProcessor
      * @param target the target
      */
     public InterceptorToAsyncProcessorBridge(Processor interceptor, AsyncProcessor target)
{
-        this.interceptor = AsyncProcessorTypeConverter.convert(interceptor);
+        this.interceptor = AsyncProcessorConverterHelper.convert(interceptor);
         this.target = target;
     }
 
@@ -89,7 +88,7 @@ public class InterceptorToAsyncProcessor
     }
 
     public void setTarget(Processor target) {
-        this.target = AsyncProcessorTypeConverter.convert(target);
+        this.target = AsyncProcessorConverterHelper.convert(target);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Mon Sep  5 13:54:44 2011
@@ -45,7 +45,6 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
@@ -569,7 +568,7 @@ public class MulticastProcessor extends 
             }
 
             // let the prepared process it, remember to begin the exchange pair
-            AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
             pair.begin();
             sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() {
                 public void done(boolean doneSync) {
@@ -701,7 +700,7 @@ public class MulticastProcessor extends 
 
             // let the prepared process it, remember to begin the exchange pair
             // we invoke it synchronously as parallel async routing is too hard
-            AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
             pair.begin();
             AsyncProcessorHelper.process(async, exchange);
         } finally {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Mon Sep
 5 13:54:44 2011
@@ -25,7 +25,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
@@ -75,7 +74,7 @@ public class Pipeline extends MulticastP
             // get the next processor
             Processor processor = processors.next();
 
-            AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
             boolean sync = process(exchange, nextExchange, callback, processors, async);
 
             // continue as long its being processed synchronously
@@ -123,7 +122,7 @@ public class Pipeline extends MulticastP
                 // continue processing the pipeline asynchronously
                 Exchange nextExchange = exchange;
                 while (continueRouting(processors, nextExchange)) {
-                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+                    AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
 
                     // check for error if so we should break out
                     if (!continueProcessing(nextExchange, "so breaking out of pipeline",
LOG)) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Mon Sep  5 13:54:44 2011
@@ -29,7 +29,6 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -185,7 +184,7 @@ public abstract class RedeliveryErrorHan
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
         this.output = output;
-        this.outputAsync = AsyncProcessorTypeConverter.convert(output);
+        this.outputAsync = AsyncProcessorConverterHelper.convert(output);
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
         this.deadLetterUri = deadLetterUri;
@@ -695,7 +694,7 @@ public abstract class RedeliveryErrorHan
             exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
 
             // the failure processor could also be asynchronous
-            AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor);
+            AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
             sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() {
                 public void done(boolean sync) {
                     log.trace("Failure processor done: {} processing Exchange: {}", processor,
exchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Mon
Sep  5 13:54:44 2011
@@ -26,7 +26,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -48,7 +47,7 @@ public class TryProcessor extends Servic
     private List<AsyncProcessor> processors;
 
     public TryProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses,
Processor finallyProcessor) {
-        this.tryProcessor = AsyncProcessorTypeConverter.convert(tryProcessor);
+        this.tryProcessor = AsyncProcessorConverterHelper.convert(tryProcessor);
         this.catchProcessor = new DoCatchProcessor(catchClauses);
         this.finallyProcessor = new DoFinallyProcessor(finallyProcessor);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Mon Sep  5 13:54:44 2011
@@ -25,7 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -53,7 +53,7 @@ public class IdempotentConsumer extends 
         this.idempotentRepository = idempotentRepository;
         this.eager = eager;
         this.skipDuplicate = skipDuplicate;
-        this.processor = AsyncProcessorTypeConverter.convert(processor);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Mon Sep  5 13:54:44 2011
@@ -23,7 +23,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.processor.Traceable;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -208,7 +208,7 @@ public class FailOverLoadBalancer extend
         }
         log.debug("Processing failover at attempt {} for {}", attempts, exchange);
 
-        AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+        AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
         return AsyncProcessorHelper.process(albp, exchange, new FailOverAsyncCallback(exchange,
attempts, index, callback, processors));
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
Mon Sep  5 13:54:44 2011
@@ -22,7 +22,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 
 /**
@@ -40,7 +40,7 @@ public abstract class QueueLoadBalancer 
             if (processor == null) {
                 throw new IllegalStateException("No processors could be chosen to process
" + exchange);
             } else {
-                AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+                AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
                 boolean sync = AsyncProcessorHelper.process(albp, exchange, new AsyncCallback()
{
                     public void done(boolean doneSync) {
                         // only handle the async case

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
Mon Sep  5 13:54:44 2011
@@ -23,8 +23,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -121,7 +121,7 @@ public class AsyncEndpointPolicyTest ext
                     invoked++;
                     // let the original processor continue routing
                     exchange.getIn().setHeader(name, "was wrapped");
-                    AsyncProcessor ap = AsyncProcessorTypeConverter.convert(processor);
+                    AsyncProcessor ap = AsyncProcessorConverterHelper.convert(processor);
                     boolean sync = ap.process(exchange, new AsyncCallback() {
                         public void done(boolean doneSync) {
                             // we only have to handle async completion of this policy

Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
(original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
Mon Sep  5 13:54:44 2011
@@ -28,7 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultExchangeHolder;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,7 @@ public class HazelcastSedaConsumer exten
     public HazelcastSedaConsumer(final Endpoint endpoint, final Processor processor) {
         super(endpoint, processor);
         this.endpoint = (HazelcastSedaEndpoint) endpoint;
-        this.processor = AsyncProcessorTypeConverter.convert(processor);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
     @Override

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
(original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
Mon Sep  5 13:54:44 2011
@@ -23,7 +23,7 @@ import org.apache.camel.ShutdownRunningT
 import org.apache.camel.SuspendableService;
 import org.apache.camel.component.routebox.RouteboxConsumer;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.spi.ShutdownAware;
 
 public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer,
ShutdownAware, SuspendableService {
@@ -78,7 +78,7 @@ public class RouteboxDirectConsumer exte
      */
     public synchronized AsyncProcessor getAsyncProcessor() {
         if (asyncProcessor == null) {            
-            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+            asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         }
         return asyncProcessor;
     }

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
(original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
Mon Sep  5 13:54:44 2011
@@ -26,7 +26,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class RouteboxDirectProducer exte
                 RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
                 exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange);   
  
                 if (getRouteboxEndpoint().getConfig().isSendToConsumer()) {
-                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
+                    AsyncProcessor processor = AsyncProcessorConverterHelper.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor());
                     flag = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback()
{
                         public void done(boolean doneSync) {
                             // we only have to handle async completion of this policy

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
(original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
Mon Sep  5 13:54:44 2011
@@ -29,7 +29,7 @@ import org.apache.camel.ShutdownRunningT
 import org.apache.camel.component.routebox.RouteboxConsumer;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.processor.AsyncProcessorConverterHelper;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
@@ -42,7 +42,7 @@ public class RouteboxSedaConsumer extend
 
     public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) {
         super(endpoint);
-        this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
+        this.setProcessor(AsyncProcessorConverterHelper.convert(processor));
         this.producer = endpoint.getConfig().getInnerProducerTemplate();
     }
 



Mime
View raw message