camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: Asynchronous Exchange Processing
Date Thu, 23 Aug 2007 14:10:44 GMT
On 8/23/07, James Strachan <james.strachan@gmail.com> wrote:
> On 8/22/07, Hiram Chirino <hiram@hiramchirino.com> wrote:
> > Hi,
> >
> > Most of our components currently depend on synchronous processing of
> > the Exchange or bad things can happen.  For example the following does
> > not work:
> >
> > from("file:/tmp/foo").to("seda:test");
> > from("seda:test").process( myProcessor );
> >
> > Why? because the file component delete the file as soon as the
> > exchange returns from being sent to seda:test.  What would have been
> > nice is that file deletion did not occur until after the exchange is
> > processed by myProcessor.  But that's occuring in an asynchronous
> > thread.
> >
> > Here's an idea that might help solve this problem.
> > Have the seda component call something like
> >    exchange.getExchangeFuture().done()
> > when the message is processed in it's async thread.
> >
> > and in the file component, have it call
> >    exchange.getExchangeFuture().get();
> >    // then the code that deletes the file
> > or
> >    exchange.getExchangeFuture().setCallback( new Callback() {
> >      public void done( Exchange exch ) {
> >         // then the code that deletes the file
> >      }
> > })
>
> I was pondering about this with relation to this thread the other day...
> http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
>
> I definitely think we need a standard way to register
> post-commit/rollback hooks. i.e. on completion of processing (either
> on a commit/completed or rollback/failed) allow a
> processor/consumer/producer to register some logic such as to delete a
> file, flush some cache etc. Note this is mostly required for
> non-transactional things. e.g. in JPA and JMS we can just use
> transactions for this.

Actually transaction things are easy since they require all processing
in the transaction to be done synchronously.  The hard bit is
processing the exchanges async.

>
> I'm kinda wondering; should we just try make things like files, FTP
> and the like transactional; that is to say, we implement transaction
> hooks so that we can do a file 'delete/rename' which is registered as
> a transaction commit status listener? Just registering some kind of
> onCommit/onRollback callbacks would do the trick though as you
> suggest.
>

I don't like the idea of making this looks like transaction semantics
when it's not.  Traditional transaction semantics force you to do
processing synchronously.  And the point of this is exactly the
opposite.

> There's a second issue which is asynchronous processing; such as a
> producer invoking an asynchronous processor then wanting some kind of
> callback that the processing has completed. I wanted to make the easy
> things really easy with Camel; so was a bit reluctant to add
> asynchronous processing explicitly from the start for fear of making
> the API very complex; most components afterall tend to be synchronous
> (which makes transactions very easy to do too btw).
>

I agree with this..  and this is my greatest fear.  We need to make
sure that the synchronous components stay as simple as they are today.
 But allow async aware components support having their exchanges be
processed async.

> I was thinking we could add some optional API for AsyncProcessor which
> is-a Processor but adds an asynchronous invocation API style; rather
> like the Channel does in the ServiceMix 4 API...
>
> // sync API
> interface Processor {
>         void    process(Exchange exchange);
> }
>
> interface AsyncProcessor extends Processor {
>   // async methods
>   Future<Exchange>      processAsync(Exchange exchange)
>   Future<Exchange>      processsync(Exchange exchange, AsyncHandler handler)
> }
>
> Then rather than adding a kinda done() method to the Exchange and
> calling it throughout every single producer/consumer/Processor
> implementation; we could just use the Future object to know when a
> particular asynchronous operation has completed. i.e. keep the async
> API to the side, for those rare cases folks really wanna use it -
> otherwise we can all stick to the simple sync API that works easily
> with transactions.
>

This might be a good option.  I think that we don't need the
"Future<Exchange>      processAsync(Exchange exchange)" call since to
make an exchange async you just need to route it through a seda:
component.

so perhaps we just add:
Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)

If the path of the exchange is sync, the it's a blocking call and by
the time it returns the Future will be done.  But it reached an async
component like seda: then it will return without the Future being
completed.

> Thoughts?

Sounds like like a good approach...  Perhaps I'll prototype it..

> --
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Mime
View raw message