camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Okello Nelson <cn.oke...@gmail.com>
Subject Re: Exchanges Aggregation for batch loading to database
Date Mon, 27 May 2013 14:18:21 GMT
Hi Guys,

I've been trying to sort this issue. I've created a custom aggregation
strategy instead of relying on the inbuilt "groupExchanges()". My
aggregation strategy is shown below:

@Service( value = "keAggregationStrategy" )
public class KEAggregationStrategy implements IKEAggregationStrategy {

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
 if(oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
 } else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
 }

}

Now, when I run the app, the following exception is thrown by ActiveMQ:
FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
ActiveMQ.

javax.jms.JMSException: Failed to build body from content.
Serializable class not available to broker. Reason:
java.lang.ClassNotFoundException:
com.package.models.bc.ke.MyEntityClass




On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.okello@gmail.com> wrote:

> Hi Guys,
>
> I have the following Java DSL:
>
> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
> .routeId(fileType + "_ValidQ-To-DB")
>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>  .aggregate(header("CRBOriginalFileName"))
> .completionSize(5000)
> .parallelProcessing()
>  .groupExchanges()
> .to("bean:keBouncedChequeLoader");
>
> The destination is a bean. The bean is as shown below:
>
> public void process(Exchange exchange) throws Exception {
> List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE,
> List.class);
>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
> Transaction tx = session.beginTransaction();
>  for(int i = 0; i < exchanges.size(); i++) {
> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>  session.save(exchanges.get(i));
> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
> bc.getClientNumber() + "\n", true);
>  if( i % 20 == 0) {
> session.flush();
> session.clear();
>  }
> }
>  tx.commit();
>  session.close();
>  }
>
> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
> error
>
> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>
> I'm not sure what I'm doing wrong. Any assistance will be appreciated very
> much.
>
> Kind Regards,
> Okello Nelson.
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.okello@gmail.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message