camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanq <jqu...@abebooks.com>
Subject Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers?
Date Thu, 31 Dec 2009 19:59:40 GMT

Sorry - I meant - this won't cause me too much trouble since I could just
change the code type to be File not GenericFile.  Just thought it was
strange.

I actually changed it to do a .convertBodyTo(String.class) in my route -
since I was just reading the file and that was why I needed a GenericFile
object to get the name to read.  Not sure why I didn't think of doing that
earlier.  

Jonathan 

jonathanq wrote:
> 
> Claus,
> 
> I figured out why my custom Route Policy wasn't working for me.  
> 
> It seems that there is an issue when I use preMove on the File endpoint -
> when the Exchange reaches my first Processor, it is no longer a
> "GenericFile" class.
> 
> My processor does this on the first line:
> 
> GenericFile file = exchange.getIn().getBody(GenericFile.class);
> 
> This works fine with my route - but as soon as I use preMove and put the
> in progress files into a "processing" directory.  The type of the body is
> no longer GenericFile, but is now a java.io.File.  So of course, file is
> now null.
> 
> This isn't an issue from my code, I can change the type and everything
> will work.  But it seems very strange that it changes types when "preMove"
> is set on the endpoint.
> 
> Is that expected behavior?
> 
> Jonathan
> 
> 
> Claus Ibsen-2 wrote:
>> 
>> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <jquail@abebooks.com> wrote:
>>>
>>> Excellent - that would definitely help my solution, as I could use lock
>>> files
>>> and if we had to kill the process, it would just delete those on next
>>> start
>>> up and re-process the files.
>>>
>>> So when is 2.2.0 coming out? :-)
>>>
>> I implemented this feature today so yeah it will be in 2.2
>> 
>> Early 2010. I hope we get it out in the start of February. The last
>> major goal is to have an improved thread pool configuration.
>> 
>> 
>>> The solution I have works - and will probably what we will use for this
>>> application. But that will help with future applications as we do end up
>>> writing a lot of file based camel processes.
>>>
>> 
>> Yeah file is actually much harder than at first thought. Our goal is
>> to make the file / ftp components in Camel flexible and to cover many
>> of the use cases out there. So any feedback is valuable.
>> 
>> I will git it some though to see if we can change the dynamic inflight
>> throttler to be measuring metrics a bit earlier.
>> 
>> Okay I guess its time to celebrate the new year.
>> 
>> 
>> 
>>> Thanks for all the help!
>>>
>>> Jonathan
>>>
>>>
>>> Claus Ibsen-2 wrote:
>>>>
>>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <jquail@abebooks.com>
>>>> wrote:
>>>>>
>>>>> I took a good look at the Route Policy - at first the
>>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>>>> really
>>>>> only want 5 exchanges to be in-flight at a time.
>>>>>
>>>>> Unfortunately it would never suspend the consumer.  I dug deeper into
>>>>> the
>>>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>>>> checks the number of inflight exchanges AFTER an exchange has been
>>>>> processed
>>>>> (the code to stop or start a consumer is all done in the
>>>>> onExchangeDone
>>>>> method).
>>>>>
>>>>> Since in my case the exchanges will take a while to process - it
>>>>> wouldn't
>>>>> know it had exceeded the maximum number until after it had finished
>>>>> processing one of them.
>>>>>
>>>>> In my opinion that is a bug - or at the very least an important thing
>>>>> to
>>>>> note in the documentation.  I spent a fair bit of time trying to
>>>>> figure
>>>>> out
>>>>> why I could not get it to work as it appeared it was supposed to.  All
>>>>> because it was not checking the inflight numbers to the threshold
>>>>> until
>>>>> after it had finished processing an exchange.
>>>>>
>>>>> I also tried writing my own FileThrottlingRoutePolicy that would test
>>>>> how
>>>>> many files were in a "inprogress" directory - and stop the consumer if
>>>>> it
>>>>> exceeded the max concurrent files.
>>>>>
>>>>> However I ran into read/write issues when I used the preMove of files
>>>>> -
>>>>> for
>>>>> some reason my processes later would throw exceptions about file not
>>>>> found
>>>>> or file lock (I can't remember which - i have been trying so many
>>>>> different
>>>>> things today to try and get this working).
>>>>>
>>>>> In the end I solved my problem by avoiding my problem :)
>>>>>
>>>>> The primary reason I didn't want the file locks to occur is it would
>>>>> be a
>>>>> manual cleanup if we ever had to kill the process while it's running.
>>>>> Otherwise the next time it started, it would ignore any of the files
>>>>> that
>>>>> had a lock file as well.
>>>>>
>>>>
>>>> We have a ticket for that
>>>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>>>
>>>>
>>>>
>>>>> I re-wrote my route to work as follows:
>>>>>
>>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>>>
>>>>
>>>> Nice solution :)
>>>>
>>>>> This way - when files are "finished" they will be placed in a
>>>>> "processed"
>>>>> directory, when they fail they are put in a "failed" directory.
>>>>>  Anything
>>>>> still in the incoming directory is to be processed.  Because the
>>>>> memory
>>>>> of
>>>>> what was processed and what hasn't been was all in memory - restarting
>>>>> the
>>>>> process will just re-start any of the files still in the incoming
>>>>> directory.
>>>>>
>>>>> No more Lock files means restarting it won't cause us to have to
>>>>> delete
>>>>> .lock files.
>>>>>
>>>>> I wish there was still an easier way to do what I wanted.  Now I just
>>>>> have
>>>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>>>  Although
>>>>> if I understand your comment (and the documentation) I can't actually
>>>>> rely
>>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>>>> depending on the system load?
>>>>>
>>>>> Jonathan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Claus Ibsen-2 wrote:
>>>>>>
>>>>>> Hi
>>>>>>
>>>>>> See also route policy to throttle the file consumer to a pace of
5
>>>>>> concurrent files
>>>>>> http://camel.apache.org/routepolicy.html
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez
>>>>>> <gabriel.magniez@steria.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>> jonathanq wrote:
>>>>>>>>
>>>>>>>> I am trying to write a process that will use a file endpoint
(camel
>>>>>>>> 2.1.0)
>>>>>>>> to read from a directory.
>>>>>>>>
>>>>>>>> I need the process to read a file from the directory and
then do
>>>>>>>> some
>>>>>>>> processing on the contents (namely hitting a REST service
for each
>>>>>>>> record
>>>>>>>> in the file).  We have been asked to limit the number of
threads
>>>>>>>> that
>>>>>>>> are
>>>>>>>> hitting the service to 5.  So we decided to simply process
5 files
>>>>>>>> at
>>>>>>>> a
>>>>>>>> time (to avoid concurrency issues reading 1 file and writing
to 1
>>>>>>>> file
>>>>>>>> with 5 threads)
>>>>>>>>
>>>>>>>> I tried a few different approaches, and I wanted to see if
there
>>>>>>>> was a
>>>>>>>> way
>>>>>>>> to do what I want.
>>>>>>>>
>>>>>>>> Approach 1:
>>>>>>>>
>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>
>>>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>>>
>>>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>>>> camelLock
>>>>>>>> on
>>>>>>>> all) and then sends them to the seda endpoint.  I saw log
messages
>>>>>>>> that
>>>>>>>> referred to thread 1 through 6.  But from what I read on
the
>>>>>>>> documentation, thread() is not necessarily going t limit
it at that
>>>>>>>> number.
>>>>>>>>
>>>>>>
>>>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>>>> forward.
>>>>>>
>>>>>>
>>>>>>>> Approach 2:
>>>>>>>>
>>>>>>>> from("file://incoming").thread(5).process()
>>>>>>>>
>>>>>>>> This only processed 5 files at a time - but created camelLocks
on
>>>>>>>> all
>>>>>>>> files in the directory.
>>>>>>>>
>>>>>>>> So then I tried approach 3:
>>>>>>>>
>>>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>>>
>>>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>>>
>>>>>>>> Again this seems to work, however it puts a camelLock on
all the
>>>>>>>> files
>>>>>>>> (because they were all processed by the first part of the
route,
>>>>>>>> they
>>>>>>>> are
>>>>>>>> just queued up in the second).
>>>>>>>>
>>>>>>>>
>>>>>>>> While approach 3 works - what I would really like is to not
have
>>>>>>>> the
>>>>>>>> camelLock placed on the files that are not being processed.
>>>>>>>>
>>>>>>>> So watching the directory, there would be (at most) 5 files
with
>>>>>>>> camelLock
>>>>>>>> files created at a time, when they finish they are moved
to the
>>>>>>>> .camel
>>>>>>>> directory, and then it starts processing the next file in
the
>>>>>>>> directory.
>>>>>>>>
>>>>>>
>>>>>> You can also implement your own ProcessStrategy where you can deny
>>>>>> consuming in more files than 5 at any given time.
>>>>>> See the processStrategy option on the file consumer. Just return
>>>>>> false
>>>>>> on the begin() method.
>>>>>>
>>>>>> See
>>>>>> http://camel.apache.org/file2.html
>>>>>> in the bottom of the page.
>>>>>>
>>>>>>
>>>>>>>> Is that possible?  Is there anything I should be sure to
do in an
>>>>>>>> error
>>>>>>>> route so that I "roll back" the camel locks to ensure that
>>>>>>>> unprocessed
>>>>>>>> files are ready to process the next time the application
starts?
>>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on
the
>>>>>>> file
>>>>>>> endpoint i.e.:
>>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>>>
>>>>>>> Check the file component documentation :
>>>>>>> http://camel.apache.org/file2.html
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Claus Ibsen
>>>>>> Apache Camel Committer
>>>>>>
>>>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>>>> Open Source Integration: http://fusesource.com
>>>>>> Blog: http://davsclaus.blogspot.com/
>>>>>> Twitter: http://twitter.com/davsclaus
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> Apache Camel Committer
>>>>
>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>> Open Source Integration: http://fusesource.com
>>>> Blog: http://davsclaus.blogspot.com/
>>>> Twitter: http://twitter.com/davsclaus
>>>>
>>>>
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
>> 
>> -- 
>> Claus Ibsen
>> Apache Camel Committer
>> 
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>> 
>> 
> 
> 

-- 
View this message in context: http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981818.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Mime
View raw message