camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jens Breitenstein <mailingl...@j-b-s.de>
Subject Re: Best Strategy to process a large number of rows in File
Date Tue, 29 Mar 2016 17:51:19 GMT
To me it looks like the entire CSV file is converted to activemq 
messages which finally causes the OOM.
The same problem exists with Camel SEDA queues as the messages need to 
be kept in memory until handled.
I guess you need to set a queue size limit which blocks your csv sending 
process if no space is left.

see: http://activemq.apache.org/producer-flow-control.html

Jens


Am 29/03/16 um 10:12 schrieb Michele:
> Hi,
>
> thanks a lot for your reply.
>
> I changed my route introducing  Throttler Pattern like this:
>
> <route id="FileReader_Route">
>      <from uri="file:incoming?....." />
>       <split streaming="true" parallelProcessing="true">
>             <tokenize token="\n" />
> 	   <unmarshal ref="IncomingCSVFileDataFormat" />
>             .....
>             <to uri="activemq:queue:incomingTickets" />
>       </split>
> </route>
>
> <route id="ProcessTicket_Route">
>       <from uri="activemq:queue:incomingTickets" />
>       <throttle timePeriodMillis="10000" asyncDelayed="true">
>      	  <constant>5</constant>
> 	   <process ref="DataProcessor" />
>             <marshal ref="Gson" />
>             <to
> uri="jetty:http://host/rs/v1.0/ticket?jettyHttpBindingRef=CustomJettyHttpBinding"
> />
>       </throttle>
> </route>
>
> But now, I have an error java.lang.OutOfMemoryError: Java heap space on
> ActiveMQ... I defined a PooledConnectionFactory with concurrentConsumers
> equals 5 and maxConnections equals 3
>
> .....
> Message History
> ---------------------------------------------------------------------------------------------------------------------------------------
> RouteId              ProcessorId          Processor
> Elapsed (ms)
> [FileRetriever_Rout] [FileRetriever_Rout]
> [file://C:/inbox?include=%5EEdenred_%5B0-9%5D%7B8%7D.csv&s] [     46034]
> [FileRetriever_Rout] [unmarshal3        ]
> [unmarshal[ref:IncomingFileDataFormat]
> ] [         0]
> [FileRetriever_Rout] [setHeader17       ] [setHeader[CamelSplitIndex]
> ] [         0]
> [FileRetriever_Rout] [setBody1          ] [setBody[simple{${body[0]}}]
> ] [        10]
> [FileRetriever_Rout] [choice2           ]
> [when[simple{${body[INGENICO_OPERATION_ID]} regex '[0-9]+'}]choice[]
> ] [      1634]
> [FileRetriever_Rout] [log15             ] [log
> ] [         0]
> [FileRetriever_Rout] [to6               ]
> [activemq:queue:IF_INGESTATE_Inbound?destination.consumer.dispatchAsync=true&de]
> [      1634]
>
> Exchange
> ---------------------------------------------------------------------------------------------------------------------------------------
> Exchange[
> 	Id                  ID-FGBAL201530-50191-1459236743101-1-259
> 	ExchangePattern     InOnly
> 	Headers             {breadcrumbId=IMP-IF-Ingestate-20160329-093400,
> CamelFileAbsolute=true,
> CamelFileAbsolutePath=C:\CRT-2.0\IF-Ingestate\inbox\Edenred_15092015.csv,
> CamelFileContentType=application/vnd.ms-excel,
> CamelFileLastModified=1458774180380, CamelFileLength=6961789,
> CamelFileName=Edenred_15092015.csv,
> CamelFileNameConsumed=Edenred_15092015.csv,
> CamelFileNameOnly=Edenred_15092015.csv,
> CamelFileParent=C:\CRT-2.0\IF-Ingestate\inbox,
> CamelFilePath=C:\CRT-2.0\IF-Ingestate\inbox\Edenred_15092015.csv,
> CamelFileRelativePath=Edenred_15092015.csv, CamelRedelivered=false,
> CamelRedeliveryCounter=0, CamelSplitIndex=59,
> CrmRSActionPath=/tk_rt_ticket/ingestate/maintenance,
> ImportDateTime=20160329-093400,
> MsgCorrelationId=Inbound_INGESTATE_20160329-093400}
> 	BodyType            java.util.HashMap
> 	Body                {TICKET_ID=00108560, INGENICO_OPERATION_ID=1721710,
> TERMINAL_NUMBEROFCONTACTS=1, TICKET_STATUS=1,
> TERMINAL_LAST_CONTACT_DATE=2012-11-03 01:57:26.360,
> TERMINAL_TECHNOLOGY=TELIUM, TERMINAL_SN=0000107370643471,
> TERMINAL_INGESTATE_ID=ICT-TICKET-RESTAURANT:91215343,
> TICKET_REGISTRATION_DATE=2012-11-02 10:08:00.000, TERMINAL_PN=M40,
> TERMINAL_FIRST_CONTACT_DATE=2012-11-03 01:56:08.343, TERMINAL_ID=91215343,
> RECORDER=EDENRED}
> ]
>
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> org.apache.camel.CamelExecutionException: Exception occurred during
> execution on the exchange: Exchange[Message: {TICKET_ID=00108560,
> INGENICO_OPERATION_ID=1721710, TERMINAL_NUMBEROFCONTACTS=1, TICKET_STATUS=1,
> TERMINAL_LAST_CONTACT_DATE=2012-11-03 01:57:26.360,
> TERMINAL_TECHNOLOGY=TELIUM, TERMINAL_SN=0000107370643471,
> TERMINAL_INGESTATE_ID=ICT-TICKET-RESTAURANT:91215343,
> TICKET_REGISTRATION_DATE=2012-11-02 10:08:00.000, TERMINAL_PN=M40,
> TERMINAL_FIRST_CONTACT_DATE=2012-11-03 01:56:08.343, TERMINAL_ID=91215343,
> RECORDER=EDENRED}]
> 	at
> org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)[209:org.apache.camel.camel-jms:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:652)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:580)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Splitter.process(Splitter.java:104)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:435)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:211)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:175)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
> 	at
> org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerJob.execute(QuartzScheduledPollConsumerJob.java:59)[279:org.apache.camel.camel-quartz2:2.15.1.redhat-620133]
> 	at
> org.quartz.core.JobRunShell.run(JobRunShell.java:202)[278:org.quartz-scheduler.quartz:2.2.1]
> 	at
> org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)[278:org.quartz-scheduler.quartz:2.2.1]
> Caused by: java.lang.OutOfMemoryError: Java heap space
>
>
> Any idea?
>
> How configure active mq component in order to manage the load in JBoss Fuse
> 6.2 context? Or Is there something wrong in my system routing?
>
> Thanks in advance
>
> Best Regards
> Michele
>
>
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-large-number-of-rows-in-File-tp5779856p5779935.html
> Sent from the Camel - Users mailing list archive at Nabble.com.


Mime
View raw message