Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 43187 invoked from network); 10 Jan 2011 14:10:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Jan 2011 14:10:31 -0000 Received: (qmail 54061 invoked by uid 500); 10 Jan 2011 14:10:31 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 53789 invoked by uid 500); 10 Jan 2011 14:10:30 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 53779 invoked by uid 99); 10 Jan 2011 14:10:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jan 2011 14:10:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jan 2011 14:10:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F3FE923888BD; Mon, 10 Jan 2011 14:10:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1057202 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda: SedaConsumer.java SedaEndpoint.java Date: Mon, 10 Jan 2011 14:10:05 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110110141005.F3FE923888BD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Mon Jan 10 14:10:05 2011 New Revision: 1057202 URL: http://svn.apache.org/viewvc?rev=1057202&view=rev Log: CAMEL-3523: Optimized seda component to only use logic to keep track on seda endpoints come and go if multiple consumers option has been enabled. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1057202&r1=1057201&r2=1057202&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Jan 10 14:10:05 2011 @@ -176,7 +176,7 @@ public class SedaConsumer extends Servic } // use a multicast processor to process it - MulticastProcessor mp = endpoint.getConumserMulticastProcessor(); + MulticastProcessor mp = endpoint.getConsumerMulticastProcessor(); // and use the asynchronous routing engine to support it AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1057202&r1=1057201&r2=1057202&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Jan 10 14:10:05 2011 @@ -54,7 +54,8 @@ public class SedaEndpoint extends Defaul private long timeout = 30000; private volatile Set producers = new CopyOnWriteArraySet(); private volatile Set consumers = new CopyOnWriteArraySet(); - private volatile MulticastProcessor conumserMulticastProcessor; + private volatile MulticastProcessor consumerMulticastProcessor; + private volatile boolean multicastStarted; public SedaEndpoint() { } @@ -100,30 +101,43 @@ public class SedaEndpoint extends Defaul return queue; } - protected synchronized MulticastProcessor getConumserMulticastProcessor() { - return conumserMulticastProcessor; + protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { + if (!multicastStarted && consumerMulticastProcessor != null) { + // only start it on-demand to avoid starting it during stopping + ServiceHelper.startService(consumerMulticastProcessor); + multicastStarted = true; + } + return consumerMulticastProcessor; } protected synchronized void updateMulticastProcessor() throws Exception { - if (conumserMulticastProcessor != null) { - ServiceHelper.stopService(conumserMulticastProcessor); + if (consumerMulticastProcessor != null) { + ServiceHelper.stopService(consumerMulticastProcessor); } int size = getConsumers().size(); if (size == 0 && multicastExecutor != null) { - // stop the multicastExecutor + // stop the multicast executor as its not needed anymore when size is zero getCamelContext().getExecutorServiceStrategy().shutdown(multicastExecutor); multicastExecutor = null; } - if (size == 1 && multicastExecutor == null) { - multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, getEndpointUri() + "(multicast)"); - } - List processors = new ArrayList(size); - for (SedaConsumer consumer : getConsumers()) { - processors.add(consumer.getProcessor()); + if (size > 1) { + if (multicastExecutor == null) { + // create multicast executor as we need it when we have more than 1 processor + multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, getEndpointUri() + "(multicast)"); + } + // create list of consumers to multicast to + List processors = new ArrayList(size); + for (SedaConsumer consumer : getConsumers()) { + processors.add(consumer.getProcessor()); + } + // create multicast processor + multicastStarted = false; + consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0); + } else { + // not needed + consumerMulticastProcessor = null; } - conumserMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0); - ServiceHelper.startService(conumserMulticastProcessor); } public void setQueue(BlockingQueue queue) { @@ -210,12 +224,16 @@ public class SedaEndpoint extends Defaul void onStarted(SedaConsumer consumer) throws Exception { consumers.add(consumer); - updateMulticastProcessor(); + if (isMultipleConsumers()) { + updateMulticastProcessor(); + } } void onStopped(SedaConsumer consumer) throws Exception { consumers.remove(consumer); - updateMulticastProcessor(); + if (isMultipleConsumers()) { + updateMulticastProcessor(); + } } }