camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brad Johnson <brad.john...@mediadriver.com>
Subject Re: Best Strategy to process a large number of rows in File
Date Sat, 16 Apr 2016 14:53:58 GMT
At this point in your code:

  <from uri="seda:processAndStoreInQueue" /
        <unmarshal  ref="myBeanio" />

Are you getting the entire file in memory?  That's going to be big so maybe
not what you want.  It may be livable however.

ActiveMQ can handle a lot of throughput but remember that if you are using
persistent queues you necessarily lose several orders of magnitude in speed
to persist and restore the data and you are both marshaling and
unmarshaling the data.  KahaDB and LevelDB are both journaling databases
and are very fast but any time you go from nanosecond RAM speeds to
milliscecond hard disk speeds you are going to slow down. And that's times
two - down to the disk and back up into memory. Memory consumption is also
going to go up.  It still shouldn't take 6 hours to process.

Something like this could be used to put individual lines from a file onto
your queue and you can even specify it in blocks if you like.  But I don't
know the form of your data.

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="\n" />
    <to uri="activemq:queue:store"/>
  </split>
</route>


In that case it starts reading individual lines from the file, splits them
on the lines, and drops each line onto the queue.  Simultaneously you can
start reading from that queue using the multithreaded consumer that you
have.  In this way you won't be reading the entire file into memory in one
big chunk but will stream it in a line at a time or you can add the "group"
to the tokenize line to specify how many lines should be read in at a
time.

I'd just take that and try it with your seda queue structure first to see
if you can get it to work right for you.  When you read off the queue you
can then unmarshal it via Beanio and do transactions.  At that point you
could specify min/max threads for reading off the queue.

This should also let the JVM reclaim memory as it goes along since the
strings read in and off the queue, transformed and then processed are all
ready for garbage collection.  If the entire file is held in memory and
being passed from queue to queue that won't get released.







On Sat, Apr 16, 2016 at 8:04 AM, Michele <michele.mazzilli@finconsgroup.com>
wrote:

> Hi,
>
> as Ranx suggested, I tried with chain routes and attached screenshot of
> memory usage  seda-memory-usage.png
> <http://camel.465427.n5.nabble.com/file/n5781206/seda-memory-usage.png>  .
> it is evident that there is a net improvement, on average 600 MB memory
> usage.
>
> The chain of routes configured is:
>
> <route id="FileRetriever_Route">
>         <from
>
> uri="{{uri.inbound}}?scheduler=quartz2&amp;scheduler.cron={{poll.consumer.scheduler}}&amp;scheduler.triggerId=FileRetriever&amp;scheduler.triggerGroup=IF_CBIKIT{{uri.inbound.options}}"
> />
>         <to uri="seda:processAndStoreInQueue" />
> </route>
>
> <route id="Splitter_Route">
>         <from uri="seda:processAndStoreInQueue" />
>
>         <unmarshal  ref="myBeanio" />
>
>         <split streaming="true" executorServiceRef="threadPoolExecutor" >
>                 <simple>${body}</simple>
>                 <choice>
>                         <when>
>                                 <simple></simple>
>                                 <setHeader
>
> headerName="CamelSplitIndex"><simple>${in.header.CamelSplitIndex}</simple></setHeader>
>                                 <to uri="seda:process.queue" />
>                         </when>
>                         <otherwise>
>                                 <log message="Message discarded
> ${in.header.CamelSplitIndex} - ${body}"
> />
>                         </otherwise>
>                 </choice>
>         </split>
> </route>
>
> <route id="ProcessAndStoreInSedaQueue">
>         <from uri="seda:process.queue?concurrentConsumers=10" />
>         <process ref="BodyEnricherProcessor" />
>         <to
>
> uri="dozer:transform?mappingFile=file:{{crt2.apps.home}}{{dozer.mapping.path}}&amp;targetModel=com.fincons.ingenico.crt2.cbikit.inbound.model.SerialNumber"
> />
>         <marshal ref="Gson" />
>         <log message="Store in SEDA Queue: ${in.header.CamelSplitIndex} -
> ${body}"
> />
>         <to uri="seda:readyToSendRS.queue" />
> </route>
>
> <route id="ProcessMessageData_Route" errorHandlerRef="DLQErrorHandler" >
>         <from uri="seda:readyToSendRS.queue?concurrentConsumers=10" />
>         <throttle timePeriodMillis="1000" asyncDelayed="true">
>                 <constant>3</constant>
>
>                 <enrich uri="direct:crm-login" strategyRef="OAuthStrategy"
> />
>
>                 <recipientList>
>                         <header>CompleteActionPath</header>
>                 </recipientList>
>
>                 <unmarshal ref="Gson" />
>
>                 <choice>
>                         <when>
>                                 <simple>${in.header.Result} ==
> 'ERROR'</simple>
>                                 <log message="CRM Response: ${body}"
> loggingLevel="ERROR"/>
>                         </when>
>                 </choice>
>         </throttle>
> </route>
>
> Logs:
> Store in SEDA Queue: 36913 -
>
> {"data_caricamento_cbkit_c":"2016-04-05T08:17:51+02:00","data_import_cbikit_c":"2016-04-16T13:41:01+02:00","nome_file_cbikit_c":"20160405074559..csv","serialnumber_c":"12154WL60756738"}
> and Throttling Policy in action:
>  INFO  | d #46 - Throttle | CRMLogin_Route  ...
>
> In addition, from my JUnit Test Camel is fastest (less than 1 minute) in
> the
> processing  Read-Unmarshal-Split-Process-MockResult.
>
> So, when I repalced seda:readyToSendRS.queue with
> activemq:queue:readyToSendRS, the memory usage triples.
>
> I can not use SEDA Component because in according to documentation:
> this component does not implement any kind of persistence or recovery, if
> the VM terminates while messages are yet to be processed. If you need
> persistence, reliability or distributed SEDA, try using either JMS or
> ActiveMQ.
>
> So, Can ActiveMQ manage a large number of messages in few seconds or
> minutes? Or Do I need  to defined a load balancer on Broker?
>
> Thanks a lot for your support.
>
> Best Regards
>
> Michele
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-large-number-of-rows-in-File-tp5779856p5781206.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message