camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <>
Subject Re: Looking for advice or help on batching/queueing for a fairly simple route
Date Sat, 21 Apr 2012 15:11:22 GMT

I have logged a JIRA to allow the file/ftp components to not do eager
limit. Then you can sort all the files beforehand, and then limit

On Sun, Apr 15, 2012 at 10:06 AM, Claus Ibsen <> wrote:
> On Sun, Apr 8, 2012 at 4:14 AM, Aaron Daubman <> wrote:
>> Greetings,
>> I have the following quite-simple route:
>> ---snip---
>> BeanIODataFormat dataCoreFormatTest = new
>> BeanIODataFormat("beanio/mappings.xml", "dataCoreFileTest");
>> dataCoreFormatTest.setIgnoreInvalidRecords(true); //Ignore invalid records
>> StringBuilder fileUri = new StringBuilder("file://");
>> fileUri.append(props.get("ingestDir"))
>>        .append("?autoCreate=false")
>>        .append("&include=" + props.get("filePattern"))
>>        .append("&sortBy=" + props.get("fileSort"))
>>        .append("&preMove=" + props.get("progressDir"))
>>        .append("&move=" + props.get("completedDir"))
>>        .append("&moveFailed=" + props.get("failedDir"))
>>        .append("&runLoggingLevel=" + props.get("ingestLogLevel"))
>>        .append("&startingDirectoryMustExist=true");
>> from(fileUri.toString())  //Read in log-files after they have been rotated,
>> starting with the oldest in the directory
>>    .log("Consuming file ${file:name}")
>>    .unmarshal(dataCoreFormatTest) //use BeanIO to read in the CSV and
>> create an array of dataCore objects, one-per-record in the log file
>>    .to("mybatis:batchInsertdataCore?statementType=InsertList");  //use
>> MyBatis to insert the dataCore objects into MySQL DB
>> ---snip---
>> My issues are that:
>> 1) there could be many files when the app starts up, and I would like to
>> process one at a time - I see that file2 supports maxMessagesPerPoll,
>> however I don't think that will work since I need to process in strict
>> oldest-file-first order and the note on maxMessagesPerPoll says that it
>> occurs BEFORE sorting, so if I use it, I am not guaranteed oldest file in
>> the dir each time.
> Yeah it used to be like that in the past. But then some people did FTP
> integration
> and they had a lot of FTP files, and the FTP library take up more
> memory for each
> file handle.
> We could possible consider adding a new option so you can flip this
> between the two behaviors.
> You would then need to be sure that when you use sorting like this,
> then it will take up some memory; as its in-memory sorting.
> An alternative currently, is to implement a custom filter, and then do
> your manual - find the oldest file in the current dir - and then only
> return true for that matching file name.
>> - How do I pass the contents of just one file at a time to be unmarshalled
>> by BeanIO while ensuring strict ordering (oldest files processed first)
>> 2) There could be many records in a single file, however I would like to
>> set some threshold to be created into objects for multiple reasons.
>> - memory management - I do not want to create an unbounded number of objects
>> - sorting performance - I will be adding a call to ensure time-ordering of
>> records after unmarshalling and before database insertion
>> - for database performance, so that mybatis gets passed a list of no more
>> than X (say, 5000) objects to write to the database at a time?
>> I feel like I have read about built in support that would enable things
>> like this, however, after reading through the EIP patterns again, the
>> closest seems to be applying the 'throttling' pattern, which only performs
>> time-based limitation, whereas I want to limit absolutely (irrespective of
>> time). I think I should be able to use splitter and aggregator for this
>> (perhaps together) but, if so, am looking for guidance as to how best to
>> use one or both to achieve the desired results. Finally, is there a way to
>> place the maxMessagesPerPoll that would do this without affecting sorting?
>> pseudo-code would look something like:
>> ---pseudo-code---
>> from(dirOfFiles) //read one file at a time from the dir, ensuring the
>> oldest file in the dir is always the one being read (
>>  .limit(1) //can't use maxMessagesPerPoll since that will not perform
>> desired sorting, how to limit to a single file?
>>  .unmarshal(dataCoreFormatTest) //create ArrayList of objects from the
>> file's csv content
>>  .sort(body(), new MyReverseComparator()) //sort the arraylist in
>> time-order using custom comparator
>>  .limit(5000) //buffer messages, sending up to the first 5000 after the
>> sort on to the next processor- again, would like to use maxMessagesPerPoll
>> here, but not sure how
>>  .to("mybatis...") //finally insert the 5000 received messages into the
>> database
>> ---pseudo-code---
>> I'm hoping these are all simple questions for a seasoned Camel user.
>> Some final questions:
>> - How do you know when a producer produces a List<T> or just a String?
>> - How do you know when a consumer expects a List or a String
>> e.g. even for file2, if there are multiple files in the directory, is a
>> List<String> passed to the next component where each element of the list is
>> one file's content, or is the content of one file passed along the route at
>> a time?
>> Thanks again!
>>     Aaron
> --
> Claus Ibsen
> -----------------
> CamelOne 2012 Conference, May 15-16, 2012:
> FuseSource
> Email:
> Web:
> Twitter: davsclaus, fusenews
> Blog:
> Author of Camel in Action:

Claus Ibsen
CamelOne 2012 Conference, May 15-16, 2012:
Twitter: davsclaus, fusenews
Author of Camel in Action:

View raw message