camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hedi <hedi.boufa...@amadeus.com>
Subject Aggregator output not aggregated !
Date Wed, 25 Feb 2015 16:41:51 GMT
Hello Camel users,

I am using ActiveMQ and the embedded Camel "engine" that comes with it. I
have to say that I am new to both (Camel and even ActiveMQ).

I am trying to implement a simple message aggregator but I cannot get my
aggregation strategy to work correctly. I am looking for some help on this. 

Basically, my correlationExpression and Completion criteria seem to work OK.
However, no matter what I do inside my "AggregationStrategy.aggregate"
method, the output of the aggregator seems unaffected.

My activemq.xml looks like this:

    <camelContext id="camel" trace="true"
xmlns="http://camel.apache.org/schema/spring">
        <route id="InvalAggregation">
            <from uri="broker:queue:QIN"/>
	    <aggregate completionSize="3" strategyRef="aggregatorStrategy">
	        <correlationExpression>
		    <constant>true</constant>
	        </correlationExpression>
	        <to uri="broker:queue:QOUT"/>
	    </aggregate>
        </route>
    </camelContext>

    <bean id="aggregatorStrategy" class="MyAggregationStrategy"/>

I have set the correlationExpression to <constant>true</constant> and the
completionSize to "3" such that every group of 3 messages generate a single
message on the output queue.
I do see that 1 message is generated on QOUT out of 3 incomming messages on
QIN. So both correlationExpression and completionSize behave OK.


Now, the problem is with my AggregationStrategy. It looks like this:

import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;

//simply combines Exchange String body values using '+' as a delimiter
class MyAggregationStrategy implements AggregationStrategy {
 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        if (oldExchange == null) {

            return newExchange;
        } 
 
        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldBody + "+" + newBody);

        return oldExchange;

    }
}

With the above code, when I send 6 messages on QIN with bodies: "A", "B",
"C", "D", "E", "F", QOUT gets 2 messages: "A" and "D". 
What I expected on QOUT was 2 aggregated messages with bodies "A+B+C" and
"D+E+F".
The above observation is based on sending/reading messages via ActiveMQ Web
console or with actual ActiveMQ producer/consumer processes.

The strange thing is that if I look at the ActiveMQ log, I see that Camel
does aggregate the messages as expected. But somewhow ActiveMQ queue QOUT
gets something different:

(simplified log for readability):
....
INFO | Apache ActiveMQ 5.10.0 is starting
....
INFO | Apache Camel 2.13.1 (CamelContext: camel) started in 0.675 seconds
....
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:A
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:B
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:C
INFO | (InvalAggregation)      aggregate[true] --> broker://queue:QOUT <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:A+B+C
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:D
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:E
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:F
INFO | (InvalAggregation)     aggregate[true] --> broker://queue:QOUT <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:D+E+F
 
There must be something fundamental I missed! Anyone has any idea ?

Many Thanks in advance for your help !
Hedi




--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-output-not-aggregated-tp5763177.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message