Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 23058 invoked from network); 9 Feb 2009 21:18:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Feb 2009 21:18:40 -0000 Received: (qmail 433 invoked by uid 500); 9 Feb 2009 21:18:40 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 414 invoked by uid 500); 9 Feb 2009 21:18:40 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 405 invoked by uid 99); 9 Feb 2009 21:18:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Feb 2009 13:18:40 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Feb 2009 21:18:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 80E312388963; Mon, 9 Feb 2009 21:18:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r742739 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ test/java/org/apache/camel/component/seda/ Date: Mon, 09 Feb 2009 21:18:16 -0000 To: commits@camel.apache.org From: romkal@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090209211816.80E312388963@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: romkal Date: Mon Feb 9 21:18:15 2009 New Revision: 742739 URL: http://svn.apache.org/viewvc?rev=742739&view=rev Log: CAMEL-1297: Seda component can configure concurrent consumers parameter Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=742739&r1=742738&r2=742739&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Mon Feb 9 21:18:15 2009 @@ -39,6 +39,7 @@ @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { - return new SedaEndpoint(uri, this, parameters); + int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1); + return new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=742739&r1=742738&r2=742739&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Feb 9 21:18:15 2009 @@ -17,6 +17,10 @@ package org.apache.camel.component.seda; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; @@ -39,7 +43,7 @@ private SedaEndpoint endpoint; private AsyncProcessor processor; - private Thread thread; + private ExecutorService executor; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; @@ -88,14 +92,23 @@ } protected void doStart() throws Exception { - thread = new Thread(this, getThreadName(endpoint.getEndpointUri())); - thread.setDaemon(true); - thread.start(); + int concurrentConsumers = endpoint.getConcurrentConsumers(); + executor = Executors.newFixedThreadPool(concurrentConsumers, new ThreadFactory() { + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, getThreadName(endpoint.getEndpointUri())); + thread.setDaemon(true); + return thread; + } + }); + for (int i = 0; i < concurrentConsumers; i++) { + executor.execute(this); + } } protected void doStop() throws Exception { - thread.join(); - thread = null; + executor.shutdownNow(); + executor = null; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=742739&r1=742738&r2=742739&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Feb 9 21:18:15 2009 @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Component; @@ -40,24 +41,31 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { private BlockingQueue queue; private int size = 1000; + private int concurrentConsumers = 1; public SedaEndpoint() { } public SedaEndpoint(String endpointUri, Component component, BlockingQueue queue) { + this(endpointUri, component, queue, 1); + } + + public SedaEndpoint(String endpointUri, Component component, BlockingQueue queue, int concurrentConsumers) { super(endpointUri, component); this.queue = queue; + this.concurrentConsumers = concurrentConsumers; } - public SedaEndpoint(String uri, SedaComponent component, Map parameters) { - this(uri, component, component.createQueue(uri, parameters)); + public SedaEndpoint(String endpointUri, BlockingQueue queue) { + this(endpointUri, queue, 1); } - public SedaEndpoint(String endpointUri, BlockingQueue queue) { + public SedaEndpoint(String endpointUri, BlockingQueue queue, int concurrentConsumers) { super(endpointUri); this.queue = queue; + this.concurrentConsumers = concurrentConsumers; } - + public Producer createProducer() throws Exception { return new CollectionProducer(this, getQueue()); } @@ -72,7 +80,7 @@ } return queue; } - + public void setQueue(BlockingQueue queue) { this.queue = queue; } @@ -85,6 +93,14 @@ this.size = size; } + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public int getConcurrentConsumers() { + return concurrentConsumers; + } + public boolean isSingleton() { return true; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java?rev=742739&r1=742738&r2=742739&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java Mon Feb 9 21:18:15 2009 @@ -33,4 +33,9 @@ LinkedBlockingQueue blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue); assertEquals("remainingCapacity", 2000, blockingQueue.remainingCapacity()); } + + public void testConcurrentConsumersConfigured() { + SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:foo?concurrentConsumers=5", SedaEndpoint.class); + assertEquals("concurrentConsumers", 5, endpoint.getConcurrentConsumers()); + } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java?rev=742739&r1=742738&r2=742739&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java Mon Feb 9 21:18:15 2009 @@ -22,6 +22,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.PollingConsumer; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -31,6 +32,7 @@ */ public class SedaConsumerStartStopTest extends ContextTestSupport { + private static final String SEDA_QUEUE_CONSUMERS_5 = "seda:queue?concurrentConsumers=5"; private PollingConsumer consumer; public boolean isUseRouteBuilder() { @@ -86,4 +88,29 @@ assertMockEndpointsSatisfied(); } + + public void testConcurrentConsumers() throws Exception { + context.addRoutes(new RouteBuilder(context) { + + @Override + public void configure() throws Exception { + from(SEDA_QUEUE_CONSUMERS_5).delay(500).to("mock:result"); + + } + }); + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + + for (int i = 0; i < 10; i++) { + sendBody(SEDA_QUEUE_CONSUMERS_5, i); + } + + Thread.sleep(800); + assertEquals(5, mock.getReceivedCounter()); + + Thread.sleep(700); + assertEquals(10, mock.getReceivedCounter()); + } }