camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <>
Subject Re: File and async/<camel:threads> woes
Date Sun, 30 Sep 2012 09:57:35 GMT

Thanks for sharing your findings.

On Wed, Sep 26, 2012 at 7:47 PM, Ralf Steppacher
<> wrote:
> Well, turns out that the only reliable way of preventing the file
> consumer thread from being assigned tasks is to set the queue depth of
> the executor pool to "unlimited" (maxQueueSize="0") and use the
> throttling route policy and sufficiently low maxMessagesPerPoll at the
> file endpoint to manage the queue depth.
> Ralf
> -----Original Message-----
> From: Ralf Steppacher <>
> Reply-to:
> To:
> Subject: Re: File and async/<camel:threads> woes
> Date: Tue, 25 Sep 2012 19:27:17 +0200
> Claus,
> I finally implemented your suggestion with the route policy and was able
> to work around the problem of the caller thread getting blocked by a
> large task. Indeed the throttling affects the number of files being
> processed, not the number of exchanges created by the splitter.
> In my case, if the number of threads in the pool is about double the
> suspend threshold I can prevent the caller thread from being blocked.
> However, if the pattern of file sizes changes, this might not be true
> anymore.
> I also realized that I have to set useFixedDelay=false on the file
> endpoint to allow for continuous feeding of files for async processing.
> The default of useFixedDelay=true also leads to large files blocking
> processing of pending files (by way of preventing the file endpoint from
> polling).
> I double checked that this was not my root problem all along; but it is
> not. Without the throttling route policy and with useFixedDelay=false
> processing still gets stuck on large files.
> All the above I have tested with Camel 2.9.2. Due to time constraints I
> have not yet given 2.10.x another shot with this.
> Ralf
> -----Original Message-----
> From: Claus Ibsen <>
> Reply-to:
> To:
> Subject: Re: File and async/<camel:threads> woes
> Date: Sun, 9 Sep 2012 10:33:45 +0200
> Hi
> You can use route policy to control the route consumer to
> suspend/resume, depending on number of in flight exchanges for the
> route.
> Then you can suspend when reaching 5, and resume when back to 4 or lower.
> There is a ThrottlingInflightRoutePolicy for that. Though its
> watermark is percent based. Also consider setting maxMessagesPerPoll=1
> on the file consumer endpoint, so it only grabs 1 filer at a time.
> About the thread pools. Take a closer look at the difference between
> Abort and Reject. You would need to use the one  that ensures a
> rollback, when the thread pool is full. So the preMove file gets
> rolled back (assuming that logic works).
> On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
> <> wrote:
>> Hello Claus,
>> If I want 5 files to be processed in parallel line by line, every line
>> taken care of asynchronously, don't I need the
>> <threads ...><camel:split ...></camel:split></threads> construct?
>> If threads and split refer to the same thread pool, then of course I
>> would not have to worry about finding the sweet spot in terms of number
>> of threads in the two pools; to keep all threads in split-pool as busy
>> as possible and have no thread in the threads-pool waiting. I prefer to
>> keep them apart because that way I have more control about the number
>> and order (small files first) in which they are processed.
>> Using one thread pool would not solve my problem of the rejectedPolicy
>> not being honored, would it? This really kills me. I am processing files
>> ranging from 1kb to 200mb, sorted by their size. If the caller thread
>> ends up with the 200mb file, all other processing stalls until the big
>> file has been processed. I need to prevent processing of small files
>> being blocked by large files.
>> Is the file consumer supposed to preMove all files from the input
>> directory to the inprogress directory (2.10.0 behavior) or just the ones
>> it is actually going to process, i.e. number of threads (2.9.2
>> behavior)?
>> With rejectedPolicy="Abort":
>> If the 2.10.0 preMove behavior is the expected behavior, is it expected
>> that all files that could not be processed on the first poll of the file
>> endpoint (file number > number of threads) stay in the inprogress
>> directory for ever, never being processed (2.10.0 behavior) or should
>> they be picked up as soon as a thread becomes free (2.9.2 behavior with
>> respective preMove behavior) and the file endpoint polls again?
>> Thanks!
>> Ralf
>> -----Original Message-----
>> From: Claus Ibsen <>
>> Reply-to:
>> To:
>> Subject: Re: File and async/<camel:threads> woes
>> Date: Thu, 6 Sep 2012 11:30:29 +0200
>> Hi
>> Why do it so complicated with 2 thread pools. The splitter can just
>> refer to a custom thread pool profile / thread pool which you can
>> customize as you want.
>> Also the fine consumer with preMove will move the file asap, it starts routing.
>> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
>> <> wrote:
>>> Hallo all,
>>> I have several issues defining an async route in Spring XML.
>>> Can someone shed some light on what the expected behavior is and what
>>> would be a bug and/or a misunderstanding on my side?
>>> The stripped down route:
>>> <endpoint id="fileBufferFrom"
>>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress"
>>> <camel:route id="processFromFileBuffer">
>>>   <camel:from ref="fileBufferFrom" />
>>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>>> threadName="file consumer" rejectedPolicy="Abort">
>>>     <camel:convertBodyTo type="" />
>>>     <camel:split streaming="true" parallelProcessing="true">
>>>         <camel:tokenize token="\r\n" />
>>>         <camel:to ref="mq.csv" />
>>>     </camel:split>
>>>   </camel:threads>
>>> </camel:route>
>>> Camel 2.9.2
>>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>>> the in-folder to the inprogress-folder. If there are more files they are
>>> moved to the inprogress-folder as soon as the caller thread is free to
>>> move them and worker threads are available.
>>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>>> to get ignored and the reject policy "CallerRuns" is applied. The source
>>> is a file endpoint, so I assume the thread with the scanned directory as
>>> its name is the caller thread? I can see that thread come to live in
>>> VisualVM.
>>> If maxQueueSize = 0 then all files present in the input directory are
>>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>>> honored! There are always maxPoolSize files processed in parallel.
>>> Camel 2.10.0
>>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>>> the policy is honored, sort of. All files present in the input directory
>>> are always moved to the inprogress-folder. Only maxPoolSiz +
>>> maxQueueSize files are being processed. All others stay untouched in the
>>> "inprogress" folder!
>>> Using <camel:threadPool> and referencing that in <camel:threads>
>>> not change the above behavior for 2.9.2 or 2.10.0.
>>> The behavior I expected was that with rejectPolicy="Abort" always
>>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>>> inprogress-folder and processed from there. As processing of one file
>>> completes it gets deleted and a file from the in-folder is moved to the
>>> inprogress-folder on the next poll of the in-folder.
>>> Thanks!
>>> Ralf

Claus Ibsen
Red Hat, Inc.
FuseSource is now part of Red Hat
Twitter: davsclaus
Author of Camel in Action:

View raw message