camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1057202 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda: SedaConsumer.java SedaEndpoint.java
Date Mon, 10 Jan 2011 14:10:05 GMT
Author: davsclaus
Date: Mon Jan 10 14:10:05 2011
New Revision: 1057202

URL: http://svn.apache.org/viewvc?rev=1057202&view=rev
Log:
CAMEL-3523: Optimized seda component to only use logic to keep track on seda endpoints come
and go if multiple consumers option has been enabled.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1057202&r1=1057201&r2=1057202&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Mon Jan 10 14:10:05 2011
@@ -176,7 +176,7 @@ public class SedaConsumer extends Servic
             }
            
             // use a multicast processor to process it
-            MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
+            MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
 
             // and use the asynchronous routing engine to support it
             AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1057202&r1=1057201&r2=1057202&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Mon Jan 10 14:10:05 2011
@@ -54,7 +54,8 @@ public class SedaEndpoint extends Defaul
     private long timeout = 30000;
     private volatile Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
     private volatile Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
-    private volatile MulticastProcessor conumserMulticastProcessor;
+    private volatile MulticastProcessor consumerMulticastProcessor;
+    private volatile boolean multicastStarted;
 
     public SedaEndpoint() {
     }
@@ -100,30 +101,43 @@ public class SedaEndpoint extends Defaul
         return queue;
     }
     
-    protected synchronized MulticastProcessor getConumserMulticastProcessor() {
-        return conumserMulticastProcessor;
+    protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception
{
+        if (!multicastStarted && consumerMulticastProcessor != null) {
+            // only start it on-demand to avoid starting it during stopping
+            ServiceHelper.startService(consumerMulticastProcessor);
+            multicastStarted = true;
+        }
+        return consumerMulticastProcessor;
     }
     
     protected synchronized void updateMulticastProcessor() throws Exception {
-        if (conumserMulticastProcessor != null) {
-            ServiceHelper.stopService(conumserMulticastProcessor);
+        if (consumerMulticastProcessor != null) {
+            ServiceHelper.stopService(consumerMulticastProcessor);
         }
 
         int size = getConsumers().size();
         if (size == 0 && multicastExecutor != null) {
-            // stop the multicastExecutor
+            // stop the multicast executor as its not needed anymore when size is zero
             getCamelContext().getExecutorServiceStrategy().shutdown(multicastExecutor);
             multicastExecutor = null;
         }
-        if (size == 1 && multicastExecutor == null) {
-            multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this,
getEndpointUri() + "(multicast)");
-        }
-        List<Processor> processors = new ArrayList<Processor>(size);
-        for (SedaConsumer consumer : getConsumers()) {
-            processors.add(consumer.getProcessor());
+        if (size > 1) {
+            if (multicastExecutor == null) {
+                // create multicast executor as we need it when we have more than 1 processor
+                multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this,
getEndpointUri() + "(multicast)");
+            }
+            // create list of consumers to multicast to
+            List<Processor> processors = new ArrayList<Processor>(size);
+            for (SedaConsumer consumer : getConsumers()) {
+                processors.add(consumer.getProcessor());
+            }
+            // create multicast processor
+            multicastStarted = false;
+            consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors,
null, true, multicastExecutor, false, false, 0);
+        } else {
+            // not needed
+            consumerMulticastProcessor = null;
         }
-        conumserMulticastProcessor = new MulticastProcessor(getCamelContext(), processors,
null, true, multicastExecutor, false, false, 0);
-        ServiceHelper.startService(conumserMulticastProcessor);
     }
 
     public void setQueue(BlockingQueue<Exchange> queue) {
@@ -210,12 +224,16 @@ public class SedaEndpoint extends Defaul
 
     void onStarted(SedaConsumer consumer) throws Exception {
         consumers.add(consumer);
-        updateMulticastProcessor();
+        if (isMultipleConsumers()) {
+            updateMulticastProcessor();
+        }
     }
 
     void onStopped(SedaConsumer consumer) throws Exception {
         consumers.remove(consumer);
-        updateMulticastProcessor();
+        if (isMultipleConsumers()) {
+            updateMulticastProcessor();
+        }
     }
 
 }



Mime
View raw message