camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marcelcasado <mcas...@tendrilinc.com>
Subject Issues using camel split and aggregator together
Date Tue, 03 Dec 2013 21:04:59 GMT
I have an issue when using camel split and camel aggregator together the
messages continue down the route before all the messages from split has been
completely processed. In my route splited-agregated messages get into the
"finally" block when I was expecting that only one will get to the "finally"
only once after all the split messages has been completed. I tried different
things but not been able to get to hold the splited messages to hit the
"finally" block.

What I'm trying to do is to split xml elements and the batch them  using the
"aggregate" so I do batch processing. Ideally when done with all the
processing I want to hit the "finally" block.

Here are the camel routes:

        fromF(readUri, inboundDataDir, pollingInterval, filterBeanName)
                .routeId(routeId)
                .doTry()
                .to("bean:" + interfaceActivityReportEnricher)
                .to("bean:" + tenantIdEnricher)
                .split(stax(splitClass, false))
                .streaming()
                .to("bean:dataBatchEnricher")
                .to("direct:" + batchingStrategy)
                .endDoTry()
                .doCatch(Exception.class)
               
.to("bean:errorProcessor?method=handleError(${file:absolute.path},
${exception}, ${property."
                        + INTERFACE_ACTIVITY_REPORT_PROPERTY_NAME + "})")
                .doFinally()
               
.to("bean:completedFileNameEnricher?method=enrichWithCompletedFileName(*," +
inboundDataDir + "," + outboundDataDir + ")")
                .setBody()
                .simple("${property." +
INTERFACE_ACTIVITY_REPORT_PROPERTY_NAME + ".report}")
                .marshal(jaxbDataFormat)
                .toF(writeUri, outboundDataDir, interfaceActivityReportDir)
                .end();

        if (batchingStrategy == "sizeStrategy-TenantIdCorrelation") {
            from("direct:sizeStrategy-TenantIdCorrelation")
                    // aggregate all exchanges correlated by the
TENANT_ID_PROPERTY_NAME property.
                    // Aggregate them using the ArrayListAggregationStrategy
strategy which
                    // and after N messages has been aggregated then
complete the aggregation
                    // and send it to processor
                    .aggregate(property(TENANT_ID_PROPERTY_NAME), new
ArrayListAggregationStrategy())
                    .aggregationRepository(repo)
                    .completionSize(8)
                    .completionTimeout(10000)
                    .forceCompletionOnStop()
                    .parallelProcessing()
                    .to("bean:" + importProcessor)
                    .end();

        }

Thanks,

-Marcel




--
View this message in context: http://camel.465427.n5.nabble.com/Issues-using-camel-split-and-aggregator-together-tp5744266.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message