camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <>
Subject Re: Looking for advice or help on batching/queueing for a fairly simple route
Date Sun, 15 Apr 2012 08:06:47 GMT
On Sun, Apr 8, 2012 at 4:14 AM, Aaron Daubman <> wrote:
> Greetings,
> I have the following quite-simple route:
> ---snip---
> BeanIODataFormat dataCoreFormatTest = new
> BeanIODataFormat("beanio/mappings.xml", "dataCoreFileTest");
> dataCoreFormatTest.setIgnoreInvalidRecords(true); //Ignore invalid records
> StringBuilder fileUri = new StringBuilder("file://");
> fileUri.append(props.get("ingestDir"))
>        .append("?autoCreate=false")
>        .append("&include=" + props.get("filePattern"))
>        .append("&sortBy=" + props.get("fileSort"))
>        .append("&preMove=" + props.get("progressDir"))
>        .append("&move=" + props.get("completedDir"))
>        .append("&moveFailed=" + props.get("failedDir"))
>        .append("&runLoggingLevel=" + props.get("ingestLogLevel"))
>        .append("&startingDirectoryMustExist=true");
> from(fileUri.toString())  //Read in log-files after they have been rotated,
> starting with the oldest in the directory
>    .log("Consuming file ${file:name}")
>    .unmarshal(dataCoreFormatTest) //use BeanIO to read in the CSV and
> create an array of dataCore objects, one-per-record in the log file
>    .to("mybatis:batchInsertdataCore?statementType=InsertList");  //use
> MyBatis to insert the dataCore objects into MySQL DB
> ---snip---
> My issues are that:
> 1) there could be many files when the app starts up, and I would like to
> process one at a time - I see that file2 supports maxMessagesPerPoll,
> however I don't think that will work since I need to process in strict
> oldest-file-first order and the note on maxMessagesPerPoll says that it
> occurs BEFORE sorting, so if I use it, I am not guaranteed oldest file in
> the dir each time.

Yeah it used to be like that in the past. But then some people did FTP
and they had a lot of FTP files, and the FTP library take up more
memory for each
file handle.

We could possible consider adding a new option so you can flip this
between the two behaviors.
You would then need to be sure that when you use sorting like this,
then it will take up some memory; as its in-memory sorting.

An alternative currently, is to implement a custom filter, and then do
your manual - find the oldest file in the current dir - and then only
return true for that matching file name.

> - How do I pass the contents of just one file at a time to be unmarshalled
> by BeanIO while ensuring strict ordering (oldest files processed first)
> 2) There could be many records in a single file, however I would like to
> set some threshold to be created into objects for multiple reasons.
> - memory management - I do not want to create an unbounded number of objects
> - sorting performance - I will be adding a call to ensure time-ordering of
> records after unmarshalling and before database insertion
> - for database performance, so that mybatis gets passed a list of no more
> than X (say, 5000) objects to write to the database at a time?
> I feel like I have read about built in support that would enable things
> like this, however, after reading through the EIP patterns again, the
> closest seems to be applying the 'throttling' pattern, which only performs
> time-based limitation, whereas I want to limit absolutely (irrespective of
> time). I think I should be able to use splitter and aggregator for this
> (perhaps together) but, if so, am looking for guidance as to how best to
> use one or both to achieve the desired results. Finally, is there a way to
> place the maxMessagesPerPoll that would do this without affecting sorting?
> pseudo-code would look something like:
> ---pseudo-code---
> from(dirOfFiles) //read one file at a time from the dir, ensuring the
> oldest file in the dir is always the one being read (
>  .limit(1) //can't use maxMessagesPerPoll since that will not perform
> desired sorting, how to limit to a single file?
>  .unmarshal(dataCoreFormatTest) //create ArrayList of objects from the
> file's csv content
>  .sort(body(), new MyReverseComparator()) //sort the arraylist in
> time-order using custom comparator
>  .limit(5000) //buffer messages, sending up to the first 5000 after the
> sort on to the next processor- again, would like to use maxMessagesPerPoll
> here, but not sure how
>  .to("mybatis...") //finally insert the 5000 received messages into the
> database
> ---pseudo-code---
> I'm hoping these are all simple questions for a seasoned Camel user.
> Some final questions:
> - How do you know when a producer produces a List<T> or just a String?
> - How do you know when a consumer expects a List or a String
> e.g. even for file2, if there are multiple files in the directory, is a
> List<String> passed to the next component where each element of the list is
> one file's content, or is the content of one file passed along the route at
> a time?
> Thanks again!
>     Aaron

Claus Ibsen
CamelOne 2012 Conference, May 15-16, 2012:
Twitter: davsclaus, fusenews
Author of Camel in Action:

View raw message