camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Craig Brumfield <craig.brumfi...@apihotels.com>
Subject Problem with Aggregator HawtDB persistence
Date Tue, 07 Jun 2011 20:59:51 GMT
I am using Camel 2.6.0 with Active MQ 5.4.2, and am having a problem with the Aggregator using
the HawtDB persistence (1.5 for HawtDB and 1.3 for HawtBuf).

What I am trying to do is to consume files from an FTP source and then aggregate these files
into a single exchange.  The FTP site contains 3 different files with the same base file name,
e.g., FILENAME. abc, FILENAME.xyz, and FILENAME.done.  I am using a custom file strategy object
to verify that the FILENAME.done is there before consuming either of the other two files (because
the doneFile support did not exist when I first wrote this code), and this all works fine.
  The filename is used as the aggregator correlation key, e.g., A123456 in the output below.

If I run my test before implementing the aggregator persistence using the HawtDB database,
everything works fine.  The FILENAME.abc is consumed, then the FILENAME.xyz is consumed and
these are then sent to my aggregator class process() method containing a grouped exchange.
 In this method, I write the files to local temp files (using an input stream and File objects)
and then read these files using a third party program that parses these files and presents
the results to me in record format.

However, if I make a single change to my activemq.xml file to add the reference to my aggregator
persistence, it continues to consume both files, but never aggregates them and sends them
to my process() method.

  <aggregate groupExchanges="true" completionSize="2" aggregationRepositoryRef="aggregatorRepository">
    <correlationExpression>
      <header>basename</header>
    </correlationExpression>
    <process ref="myFileParser"/>
  </aggregate>

  <bean id="aggregatorRepository" class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository">
    <property name="repositoryName" value="myAggregator"/>
    <property name="persistentFileName" value="data/myAggregator.dat"/>
  </bean>

My reason for wanting to use this persistence framework is to provide some better error handling
in the event that by server crashes before completely processing the aggregated result since
there seems not to be transactional support with the aggregator.  The commit() method on my
custom file strategy class gets called as each file is consumed, not after the aggregated
results are processed.

Is there any reason this should not work-am I misunderstanding what this aggregator persistence
should provide?

When it consumes the first file, the log output from HawtDB is as follows:

2011-06-07 14:38:25,834 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ done  +++ Scan
2011-06-07 14:38:25,834 | TRACE | org.apache.camel.component.hawtdb.HawtDBAggregationRepository
| Scanned and found no exchange to recover.
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository
index with name myAggregator -> null
2011-06-07 14:38:25,846 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read
only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,848 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,848 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository
| Getting key  [A123456] -> null
2011-06-07 14:38:25,849 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository
| Adding key   [A123456] -> Exchange[null]
2011-06-07 14:38:25,852 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ start +++ Adding key [A123456]
2011-06-07 14:38:25,853 | DEBUG | org.apache.camel.component.hawtdb.HawtDBFile | Created new
repository index with name myAggregator at location 1
2011-06-07 14:38:25,853 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository
index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,854 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read
only: false for executed work: Adding key [A123456]
2011-06-07 14:38:25,858 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ done  +++ Adding key [A123456]

When it consumes the second file, the output is as follows:

2011-06-07 14:38:25,886 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository
index with name myAggregator at location 1
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository
index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,888 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read
only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,890 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing
work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,905 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository
| Getting key  [A123456] -> Exchange[Message: [Body is null]]

At this point, it seems to be stuck.  The commit() method of my file strategy never gets called
for the second file, nor is the grouped Exchange sent to my process() method.

Any suggestions?

Thanks,
Craig




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message