camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bryan Keller (JIRA)" <j...@apache.org>
Subject [jira] Commented: (CAMEL-3337) Aggregator using an unbounded queue, can use up all heap
Date Sat, 13 Nov 2010 21:45:24 GMT

    [ https://issues.apache.org/activemq/browse/CAMEL-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63335#action_63335
] 

Bryan Keller commented on CAMEL-3337:
-------------------------------------

An additional problem with the job queue is that if the server goes down, everything in this
queue is obviously lost, as it is not persistent or anything.

In any event, I was able to work around this issue by creating my own executor service that
is synchronous (i.e. the execute() method merely calls Runnable.run()), and has no job queue.
I set the aggregator to use it by using the "executorServiceRef" attribute.

> Aggregator using an unbounded queue, can use up all heap
> --------------------------------------------------------
>
>                 Key: CAMEL-3337
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-3337
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.5.0
>         Environment: JDK 1.6.0_22 (Windows 7 64-bit and OS X 10.6.5)
>            Reporter: Bryan Keller
>
> My app is having memory issues due to the use of an aggregator. My app is fairly straightforward.
It reads from a CSV using streaming, tokenizes it by line, passes the result to a processor,
aggregates the result, then puts this on a JMS queue. Here is the route definition (for Spring):
> [code]
> <route>
>   <from "file:in" />
>   <split streaming="true">
>     <tokenize token="\n" regex="false" />
>     <unmarshal><csv /></unmarshal>
>     <bean ref="myBean" method="translate" />
>     <aggregate strategyRef="myAggregationStrategy">
>     <correlationExpression><constant>true</constant></correlationExpression>
>     <completionTimeout><simple>1000</simple></completionTimeout>
>     <completionSize><simple>100</simple></completionSize>
>     <to uri="activemq:queue:myQueue"/>   		
>   </aggregate>
>   </split>
> </route>
> [/code]
> The problem happens when the consumer of "myQueue" is not as fast as the file reading
and parsing. With a slow consumer, ActiveMQ will eventually throttle the producer so the message
queue doesn't use up all memory and/or disk space.
> As messages are passed to the aggregator, it internally submits jobs to an executor which
will then put the message in the queue. This executor uses an unbounded queue. If the message
producer has been throttled, then the process jobs cannot queue the messages quickly enough,
and the executor's queue will continue to back up until all memory is used.
> As a workaround, I am thinking I could implement my own executor service which is synchronous
or at least blocks when the queue size reaches a certain size. I haven't yet firgured out
how to configure this however.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message