Return-Path: Delivered-To: apmail-camel-users-archive@www.apache.org Received: (qmail 31182 invoked from network); 31 Dec 2009 08:43:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 31 Dec 2009 08:43:12 -0000 Received: (qmail 76111 invoked by uid 500); 31 Dec 2009 08:43:11 -0000 Delivered-To: apmail-camel-users-archive@camel.apache.org Received: (qmail 76053 invoked by uid 500); 31 Dec 2009 08:43:11 -0000 Mailing-List: contact users-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@camel.apache.org Delivered-To: mailing list users@camel.apache.org Received: (qmail 76043 invoked by uid 99); 31 Dec 2009 08:43:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Dec 2009 08:43:11 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of claus.ibsen@gmail.com designates 209.85.218.211 as permitted sender) Received: from [209.85.218.211] (HELO mail-bw0-f211.google.com) (209.85.218.211) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Dec 2009 08:43:02 +0000 Received: by bwz3 with SMTP id 3so9959424bwz.36 for ; Thu, 31 Dec 2009 00:42:38 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:in-reply-to:references :from:date:message-id:subject:to:content-type :content-transfer-encoding; bh=3hIf6JlQUAmGKDf1GeCDjUKIrZvyThMtxU+Czdqb0a8=; b=nVHfkTsM7Cjb6yfLjC3qyKCod16Fby4uZKz28Jo/5TP4jL5KHvq2KSSyydR+yOcz6v rLeFD/mT7sUPKOrHxZ2MObs4uUwbNVOZ/dc3Tr2+8P1NzOdV3r7++DzToERr6vmRtPxT 67Yx/YKYLq3PsQR5wmvdNVLEHUdtMn1QZaSDE= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type:content-transfer-encoding; b=G3No04A2loFTbRy4guRKPDPqnEordWIx9HOg9eSeMvMRI3Jag6eXldNYQPpi2x6mHY hjs/UF/AoEj5PhQs6QRXB2Z/YPMBw6Hwq+bQqZuLQKP51pKxZ/6HJL46LJ2GAD9yIMAX hY6s2PT+1vuJPbDnE7lx4H+Rpug3qA90CY+jI= MIME-Version: 1.0 Received: by 10.204.154.207 with SMTP id p15mr7974567bkw.202.1262248958132; Thu, 31 Dec 2009 00:42:38 -0800 (PST) In-Reply-To: <26973577.post@talk.nabble.com> References: <26960942.post@talk.nabble.com> <26965930.post@talk.nabble.com> <5380c69c0912300402m3a05303ds44eb6b376dcba45a@mail.gmail.com> <26973577.post@talk.nabble.com> From: Claus Ibsen Date: Thu, 31 Dec 2009 09:42:18 +0100 Message-ID: <5380c69c0912310042i49f43e75k4117f058a820bf8@mail.gmail.com> Subject: Re: Processing 5 files at a time - Threads? SEDA + Concurrent Consumers? To: users@camel.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable On Thu, Dec 31, 2009 at 12:33 AM, jonathanq wrote: > > I took a good look at the Route Policy - at first the > ThrottlingInflightRoutePolicy class seemed like it could work - as I real= ly > only want 5 exchanges to be in-flight at a time. > > Unfortunately it would never suspend the consumer. =A0I dug deeper into t= he > code and discovered why. =A0The ThrottlingInflightRoutePolicy class only > checks the number of inflight exchanges AFTER an exchange has been proces= sed > (the code to stop or start a consumer is all done in the onExchangeDone > method). > > Since in my case the exchanges will take a while to process - it wouldn't > know it had exceeded the maximum number until after it had finished > processing one of them. > > In my opinion that is a bug - or at the very least an important thing to > note in the documentation. =A0I spent a fair bit of time trying to figure= out > why I could not get it to work as it appeared it was supposed to. =A0All > because it was not checking the inflight numbers to the threshold until > after it had finished processing an exchange. > > I also tried writing my own FileThrottlingRoutePolicy that would test how > many files were in a "inprogress" directory - and stop the consumer if it > exceeded the max concurrent files. > > However I ran into read/write issues when I used the preMove of files - f= or > some reason my processes later would throw exceptions about file not foun= d > or file lock (I can't remember which - i have been trying so many differe= nt > things today to try and get this working). > > In the end I solved my problem by avoiding my problem :) > > The primary reason I didn't want the file locks to occur is it would be a > manual cleanup if we ever had to kill the process while it's running. > Otherwise the next time it started, it would ignore any of the files that > had a lock file as well. > We have a ticket for that https://issues.apache.org/activemq/browse/CAMEL-2082 > I re-wrote my route to work as follows: > > from("file://incoming?maxMessagesPerPoll=3D1&idempotent=3Dtrue&moveFailed= =3Dfailed&move=3Dprocessed&readLock=3Dnone").threads(5).process() > Nice solution :) > This way - when files are "finished" they will be placed in a "processed" > directory, when they fail they are put in a "failed" directory. =A0Anythi= ng > still in the incoming directory is to be processed. =A0Because the memory= of > what was processed and what hasn't been was all in memory - restarting th= e > process will just re-start any of the files still in the incoming directo= ry. > > No more Lock files means restarting it won't cause us to have to delete > .lock files. > > I wish there was still an easier way to do what I wanted. =A0Now I just h= ave > to rely on the threads(5) to do the limiting to 5 files at a time. =A0Alt= hough > if I understand your comment (and the documentation) I can't actually rel= y > on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads > depending on the system load? > > Jonathan > > > > > Claus Ibsen-2 wrote: >> >> Hi >> >> See also route policy to throttle the file consumer to a pace of 5 >> concurrent files >> http://camel.apache.org/routepolicy.html >> >> >> >> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez >> wrote: >>> >>> >>> jonathanq wrote: >>>> >>>> I am trying to write a process that will use a file endpoint (camel >>>> 2.1.0) >>>> to read from a directory. >>>> >>>> I need the process to read a file from the directory and then do some >>>> processing on the contents (namely hitting a REST service for each >>>> record >>>> in the file). =A0We have been asked to limit the number of threads tha= t >>>> are >>>> hitting the service to 5. =A0So we decided to simply process 5 files a= t a >>>> time (to avoid concurrency issues reading 1 file and writing to 1 file >>>> with 5 threads) >>>> >>>> I tried a few different approaches, and I wanted to see if there was a >>>> way >>>> to do what I want. >>>> >>>> Approach 1: >>>> >>>> from("file://incoming").to("seda:filequeue") >>>> >>>> from("seda:filequeue").thread(5).process() >>>> >>>> Now - this reads in ALL of the files in the directory (places camelLoc= k >>>> on >>>> all) and then sends them to the seda endpoint. =A0I saw log messages t= hat >>>> referred to thread 1 through 6. =A0But from what I read on the >>>> documentation, thread() is not necessarily going t limit it at that >>>> number. >>>> >> >> thread(5) will limit to at most 5 concurrent threads from this point >> forward. >> >> >>>> Approach 2: >>>> >>>> from("file://incoming").thread(5).process() >>>> >>>> This only processed 5 files at a time - but created camelLocks on all >>>> files in the directory. >>>> >>>> So then I tried approach 3: >>>> >>>> from("file://incoming").to("seda:filequeue") >>>> >>>> from("seda:filequeue?concurrentConsumers=3D5").process() >>>> >>>> Again this seems to work, however it puts a camelLock on all the files >>>> (because they were all processed by the first part of the route, they >>>> are >>>> just queued up in the second). >>>> >>>> >>>> While approach 3 works - what I would really like is to not have the >>>> camelLock placed on the files that are not being processed. >>>> >>>> So watching the directory, there would be (at most) 5 files with >>>> camelLock >>>> files created at a time, when they finish they are moved to the .camel >>>> directory, and then it starts processing the next file in the director= y. >>>> >> >> You can also implement your own ProcessStrategy where you can deny >> consuming in more files than 5 at any given time. >> See the processStrategy option on the file consumer. Just return false >> on the begin() method. >> >> See >> http://camel.apache.org/file2.html >> in the bottom of the page. >> >> >>>> Is that possible? =A0Is there anything I should be sure to do in an er= ror >>>> route so that I "roll back" the camel locks to ensure that unprocessed >>>> files are ready to process the next time the application starts? >>>> >>> >>> Hi, >>> >>> Maybe you can try to use the parameter maxMessagesPerPoll on the file >>> endpoint i.e.: >>> from("file://incoming?maxMessagesPerPoll=3D5").thread(5).process() >>> >>> Check the file component documentation : >>> http://camel.apache.org/file2.html >>> >>> -- >>> View this message in context: >>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-= Concurrent-Consumers--tp26960942p26965930.html >>> Sent from the Camel - Users mailing list archive at Nabble.com. >>> >>> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Author of Camel in Action: http://www.manning.com/ibsen/ >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> >> > > -- > View this message in context: http://old.nabble.com/Processing-5-files-at= -a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > --=20 Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus