camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?
Date Thu, 31 Dec 2009 08:42:18 GMT
On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jquail@abebooks.com> wrote:
>
> I took a good look at the Route Policy - at first the
> ThrottlingInflightRoutePolicy class seemed like it could work - as I really
> only want 5 exchanges to be in-flight at a time.
>
> Unfortunately it would never suspend the consumer.  I dug deeper into the
> code and discovered why.  The ThrottlingInflightRoutePolicy class only
> checks the number of inflight exchanges AFTER an exchange has been processed
> (the code to stop or start a consumer is all done in the onExchangeDone
> method).
>
> Since in my case the exchanges will take a while to process - it wouldn't
> know it had exceeded the maximum number until after it had finished
> processing one of them.
>
> In my opinion that is a bug - or at the very least an important thing to
> note in the documentation.  I spent a fair bit of time trying to figure out
> why I could not get it to work as it appeared it was supposed to.  All
> because it was not checking the inflight numbers to the threshold until
> after it had finished processing an exchange.
>
> I also tried writing my own FileThrottlingRoutePolicy that would test how
> many files were in a "inprogress" directory - and stop the consumer if it
> exceeded the max concurrent files.
>
> However I ran into read/write issues when I used the preMove of files - for
> some reason my processes later would throw exceptions about file not found
> or file lock (I can't remember which - i have been trying so many different
> things today to try and get this working).
>
> In the end I solved my problem by avoiding my problem :)
>
> The primary reason I didn't want the file locks to occur is it would be a
> manual cleanup if we ever had to kill the process while it's running.
> Otherwise the next time it started, it would ignore any of the files that
> had a lock file as well.
>

We have a ticket for that
https://issues.apache.org/activemq/browse/CAMEL-2082



> I re-wrote my route to work as follows:
>
> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>

Nice solution :)

> This way - when files are "finished" they will be placed in a "processed"
> directory, when they fail they are put in a "failed" directory.  Anything
> still in the incoming directory is to be processed.  Because the memory of
> what was processed and what hasn't been was all in memory - restarting the
> process will just re-start any of the files still in the incoming directory.
>
> No more Lock files means restarting it won't cause us to have to delete
> .lock files.
>
> I wish there was still an easier way to do what I wanted.  Now I just have
> to rely on the threads(5) to do the limiting to 5 files at a time.  Although
> if I understand your comment (and the documentation) I can't actually rely
> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
> depending on the system load?
>
> Jonathan
>
>
>
>
> Claus Ibsen-2 wrote:
>>
>> Hi
>>
>> See also route policy to throttle the file consumer to a pace of 5
>> concurrent files
>> http://camel.apache.org/routepolicy.html
>>
>>
>>
>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <gabriel.magniez@steria.com>
>> wrote:
>>>
>>>
>>> jonathanq wrote:
>>>>
>>>> I am trying to write a process that will use a file endpoint (camel
>>>> 2.1.0)
>>>> to read from a directory.
>>>>
>>>> I need the process to read a file from the directory and then do some
>>>> processing on the contents (namely hitting a REST service for each
>>>> record
>>>> in the file).  We have been asked to limit the number of threads that
>>>> are
>>>> hitting the service to 5.  So we decided to simply process 5 files at a
>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>> with 5 threads)
>>>>
>>>> I tried a few different approaches, and I wanted to see if there was a
>>>> way
>>>> to do what I want.
>>>>
>>>> Approach 1:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue").thread(5).process()
>>>>
>>>> Now - this reads in ALL of the files in the directory (places camelLock
>>>> on
>>>> all) and then sends them to the seda endpoint.  I saw log messages that
>>>> referred to thread 1 through 6.  But from what I read on the
>>>> documentation, thread() is not necessarily going t limit it at that
>>>> number.
>>>>
>>
>> thread(5) will limit to at most 5 concurrent threads from this point
>> forward.
>>
>>
>>>> Approach 2:
>>>>
>>>> from("file://incoming").thread(5).process()
>>>>
>>>> This only processed 5 files at a time - but created camelLocks on all
>>>> files in the directory.
>>>>
>>>> So then I tried approach 3:
>>>>
>>>> from("file://incoming").to("seda:filequeue")
>>>>
>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>
>>>> Again this seems to work, however it puts a camelLock on all the files
>>>> (because they were all processed by the first part of the route, they
>>>> are
>>>> just queued up in the second).
>>>>
>>>>
>>>> While approach 3 works - what I would really like is to not have the
>>>> camelLock placed on the files that are not being processed.
>>>>
>>>> So watching the directory, there would be (at most) 5 files with
>>>> camelLock
>>>> files created at a time, when they finish they are moved to the .camel
>>>> directory, and then it starts processing the next file in the directory.
>>>>
>>
>> You can also implement your own ProcessStrategy where you can deny
>> consuming in more files than 5 at any given time.
>> See the processStrategy option on the file consumer. Just return false
>> on the begin() method.
>>
>> See
>> http://camel.apache.org/file2.html
>> in the bottom of the page.
>>
>>
>>>> Is that possible?  Is there anything I should be sure to do in an error
>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>> files are ready to process the next time the application starts?
>>>>
>>>
>>> Hi,
>>>
>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>> endpoint i.e.:
>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>
>>> Check the file component documentation :
>>> http://camel.apache.org/file2.html
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> --
> View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Mime
View raw message