camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanq <>
Subject Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?
Date Wed, 30 Dec 2009 23:33:29 GMT

I took a good look at the Route Policy - at first the
ThrottlingInflightRoutePolicy class seemed like it could work - as I really
only want 5 exchanges to be in-flight at a time.

Unfortunately it would never suspend the consumer.  I dug deeper into the
code and discovered why.  The ThrottlingInflightRoutePolicy class only
checks the number of inflight exchanges AFTER an exchange has been processed
(the code to stop or start a consumer is all done in the onExchangeDone

Since in my case the exchanges will take a while to process - it wouldn't
know it had exceeded the maximum number until after it had finished
processing one of them.

In my opinion that is a bug - or at the very least an important thing to
note in the documentation.  I spent a fair bit of time trying to figure out
why I could not get it to work as it appeared it was supposed to.  All
because it was not checking the inflight numbers to the threshold until
after it had finished processing an exchange.

I also tried writing my own FileThrottlingRoutePolicy that would test how
many files were in a "inprogress" directory - and stop the consumer if it
exceeded the max concurrent files.

However I ran into read/write issues when I used the preMove of files - for
some reason my processes later would throw exceptions about file not found
or file lock (I can't remember which - i have been trying so many different
things today to try and get this working).

In the end I solved my problem by avoiding my problem :)

The primary reason I didn't want the file locks to occur is it would be a
manual cleanup if we ever had to kill the process while it's running. 
Otherwise the next time it started, it would ignore any of the files that
had a lock file as well.

I re-wrote my route to work as follows:


This way - when files are "finished" they will be placed in a "processed"
directory, when they fail they are put in a "failed" directory.  Anything
still in the incoming directory is to be processed.  Because the memory of
what was processed and what hasn't been was all in memory - restarting the
process will just re-start any of the files still in the incoming directory.

No more Lock files means restarting it won't cause us to have to delete
.lock files.

I wish there was still an easier way to do what I wanted.  Now I just have
to rely on the threads(5) to do the limiting to 5 files at a time.  Although
if I understand your comment (and the documentation) I can't actually rely
on threads(5) to spawn 5 will just spawn UP TO 5 threads
depending on the system load?


Claus Ibsen-2 wrote:
> Hi
> See also route policy to throttle the file consumer to a pace of 5
> concurrent files
> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <>
> wrote:
>> jonathanq wrote:
>>> I am trying to write a process that will use a file endpoint (camel
>>> 2.1.0)
>>> to read from a directory.
>>> I need the process to read a file from the directory and then do some
>>> processing on the contents (namely hitting a REST service for each
>>> record
>>> in the file).  We have been asked to limit the number of threads that
>>> are
>>> hitting the service to 5.  So we decided to simply process 5 files at a
>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>> with 5 threads)
>>> I tried a few different approaches, and I wanted to see if there was a
>>> way
>>> to do what I want.
>>> Approach 1:
>>> from("file://incoming").to("seda:filequeue")
>>> from("seda:filequeue").thread(5).process()
>>> Now - this reads in ALL of the files in the directory (places camelLock
>>> on
>>> all) and then sends them to the seda endpoint.  I saw log messages that
>>> referred to thread 1 through 6.  But from what I read on the
>>> documentation, thread() is not necessarily going t limit it at that
>>> number.
> thread(5) will limit to at most 5 concurrent threads from this point
> forward.
>>> Approach 2:
>>> from("file://incoming").thread(5).process()
>>> This only processed 5 files at a time - but created camelLocks on all
>>> files in the directory.
>>> So then I tried approach 3:
>>> from("file://incoming").to("seda:filequeue")
>>> from("seda:filequeue?concurrentConsumers=5").process()
>>> Again this seems to work, however it puts a camelLock on all the files
>>> (because they were all processed by the first part of the route, they
>>> are
>>> just queued up in the second).
>>> While approach 3 works - what I would really like is to not have the
>>> camelLock placed on the files that are not being processed.
>>> So watching the directory, there would be (at most) 5 files with
>>> camelLock
>>> files created at a time, when they finish they are moved to the .camel
>>> directory, and then it starts processing the next file in the directory.
> You can also implement your own ProcessStrategy where you can deny
> consuming in more files than 5 at any given time.
> See the processStrategy option on the file consumer. Just return false
> on the begin() method.
> See
> in the bottom of the page.
>>> Is that possible?  Is there anything I should be sure to do in an error
>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>> files are ready to process the next time the application starts?
>> Hi,
>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>> endpoint i.e.:
>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>> Check the file component documentation :
>> --
>> View this message in context:
>> Sent from the Camel - Users mailing list archive at
> -- 
> Claus Ibsen
> Apache Camel Committer
> Author of Camel in Action:
> Open Source Integration:
> Blog:
> Twitter:

View this message in context:
Sent from the Camel - Users mailing list archive at

View raw message