camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Gardner <ryeb...@gmail.com>
Subject Concurrent processing of messages - How do you configure a seda consumer (or any other transport)?
Date Mon, 16 Mar 2009 21:22:48 GMT
I am currently using Camel 1.6.0

I have a process in my application where a small number of somewhat  
time-sensitive messages are sent out to various channels (how isn't  
really relevant - could be email, jabber, smoke signals, whatever)  
Rather than process them in a queue, I would like to send them out  
concurrently. The piece that creates the messages has a loop, and in  
the java code it calls I am injecting an endpoint, and then in a loop  
I am sending messages to that endpoint:
	
	@EndpointInject(name="fooEndpoint")
	protected ProducerTemplate fooTemplate

	....

	for(some criteria) {
		...
		 fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
	}

This part works fine. The fooEndpoint is created in a Spring DSL:

	<camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />

A java dsl routebuilder builds the route to process these messages  
like this:

    from(endpoint("foo"))
              .choice()
                 .when 
( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
                     .choice()
                         .when 
( header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
                             .to("someEndpoint")
                         .when 
( header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
                             .beanRef("someBean")
                         .when 
( header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
                             .beanRef("someOtherBean")
                         .otherwise()
                             .to("log:foo:level=ERROR")

                 .when 
(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
                     .to("log:outbound-messages?level=INFO")
                 .otherwise()
                     .to("log:outbound-messages?level=ERROR");


I naively assumed that because I was throwing out new messages in the  
loop (sending out a bunch of new messages in the  
fooTempate.sendBodyAndHeaders) it would create a new thread to handle  
them because I was using the seda endpoint. I realized when I saw that  
they were going in a queue that was not the case.

I tried to remedy this by setting the "concurrentConsumers" property  
on the consuming endpoint like this:

from("seda:fooOutbound?concurrentConsumers=8")
	.choice()
	...

but that gave me this exception:

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to  
resolve endpoint: seda:incidentOutboundContactPending? 
concurrentConsumers=8 due to: There are 1 parameters that couldn't be  
set on the endpoint. Check the uri if the parameters are spelt  
correctly and that they are properties of the endpoint. Unknown  
parameters=[{concurrentConsumers=8}]
	at  
org 
.apache 
.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
	at  
org 
.apache 
.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java: 
331)
	... 70 more

When I it up using a thread pool, it didn't seem to be handling it  
concurrently either - but I didn't look at it all that closely. It  
sounded to me like a concurrent consumer setup was more in line with  
what I wanted.

    from(endpoint("foo")).thread(8)
              .choice()

Is there something I'm missing? Any help is appreciated.

Ryan

Mime
View raw message