camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Jablonski <ajablon...@ravenpack.com>
Subject Re: Curious routing case
Date Tue, 11 Apr 2017 08:45:49 GMT
Hello,

I don't think this route definition is fitting my use case, though I learnt
a thing or two about the interesting patterns linked. Thanks!

Ok, so let me try to clarify the use case.


1. The stream is infinite, it's not a batch job. The messages keep on
coming from SQS 'all the time'

2. More important thing is about parallel processing.

Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.

Let's say this is the order in which the messages happen to appear
 in the route from SQS

A1, A2, B1, C1, B2, A3, C2, B3

Now what I am trying to achieve is grouping the messages that have to be
processed sequentially (order doesn't matter as long as no two messages
from the same group are processed at the same time).
So I am trying to somehow get these streams

A1, A2, A3

B1, B2, B3

C1, C2


So, A1 B1 and C1 can be processed in parallel because they are from
different groups, but the messages within groups need to be processed one
by one.

In my example, there are 3 groups, but there can be many and I don't know
what they are in advance. The processing logic between the groups is
similar and is a function of the group so I can get a processor for group A
from a method call getProcessor(A), B getProcessor(B), etc.

I am stuck at how to do that in Camel, because since I don't know the
groups in advance, I would need to create processing routes dynamically.

Say the system starts, and A1 arrives, there can't be any processor for
group A yet, since it's the first message from the group and I need to
somehow dynamically add processing capability of the group A to the route
and then perhaps if the messages from group A stop arriving for some time,
that processor could be removed.

How to add the parallel part between the group messages is also blurry to
me. One way of doing this I was thinking was to do a multicast to all the
dynamically created processing routes for groups and stick a filter before
so that only messages from particular group can go through. From multicast
page:

from("direct:a").multicast().parallelProcessing().to("direct:x",
"direct:y", "direct:z");

But here the x,y,z endpoints are hardcoded. I could write up some custom
multicast I suppose to search the routes in CamelContext...... not sure.

Thanks
Artur





On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <ajablonski@ravenpack.com>
wrote:

> Hi Zoran,
>
> Thank you for such detailed response. This looks very promising. i will
> need to get my head around the aggregator pattern.
> For this week I will be busy with other tasks, but I will get back to it
> as soon as I can to see if I can get Camel work for the use case.
>
> Cheerio
> Artur
>
> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zoran@regvart.com> wrote:
>
>> Hi Artur,
>> I was thinking that the order of the messages would be important as
>> you need to process them sequentially.
>>
>> So I think you could use the dynamic message routing[1] with
>> aggregator[2], something like:
>>
>>     from("aws-sqs:...")
>>         .process("#preProcess")
>>         .toD("direct:${header.nextRoute}");
>>
>>     from("direct:parallel")...;
>>     from("direct:sequential").aggregate(simple("${header.group}"
>> )).completion..;
>>
>> So from yout SQS queue you would use a processor to pre-process
>> message whose responsibility would be to set the (custom) `nextRoute`
>> and (custom) `group` headers. `nextRoute` would be `parallel` or
>> `sequential`, and if `sequential` the messages would be aggregated
>> using the `group` header.
>>
>> You would want to define your own custom aggregation strategy or use
>> the completion* options that are available to you. There also might be
>> need to use seda[3] to fine tune any parallel processing. You might
>> throw in there a data format unmarshaller[4] instead of the
>> `preProcess` processor and use something like `${body.xyz} == foo` in
>> the `toD` expression.
>>
>> And I would guess that you need to examine transactions or persistence
>> at some point also in case your aggregation step runs for a long time
>> or if your use case is sensitive to message loss if interrupted --
>> which would undoubtedly lead you back to using queues to separate
>> those two ways of processing,
>>
>> HTH,
>>
>> zoran
>>
>> [1] https://camel.apache.org/message-endpoint.html
>> [2] https://camel.apache.org/aggregator2.html
>> [3] https://camel.apache.org/seda.html
>> [4] https://camel.apache.org/data-format.html
>>
>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
>> <ajablonski@ravenpack.com> wrote:
>> > Hey Zoran.
>> >
>> > I read again the patterns you mentioned. In my use case the order of
>> > processing within a group doesn't matter as long as two messages from
>> the
>> > same group are never processed in parallel. So i guess resenquencer is
>> out
>> > of the picture unless I didn't get the intention.
>> >
>> > So what we are left with is the content based router. Sure. The message
>> > comes, i can see what group it belongs two... And what next? Perhaps
>> it's
>> > the very first message from that group so I would need to trigger
>> creating
>> > route/processor for that group somehow, perhaps messages from this group
>> > were processed before in which case the processor for the group should
>> > already exist...
>> >
>> >
>> >
>> >
>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zoran@regvart.com> wrote:
>> >
>> >> Hi Artur,
>> >> have a look at Camel EIP page[1], what you describe sounds to me like
>> >> Resequencer and Content based router patterns,
>> >>
>> >> zoran
>> >>
>> >> [1] https://camel.apache.org/eip.html
>> >>
>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>> >> <ajablonski@ravenpack.com> wrote:
>> >> > Hello.
>> >> >
>> >> > I wonder if someone could push me in the right direction trying to
>> >> express
>> >> > quite curious case in Camel route.
>> >> >
>> >> > Imagine there's a stream of messages some of which can be processed
>> in
>> >> > parallel and some have to be processed serially. You can group the
>> >> messages
>> >> > that require serial processing together by looking at the message
>> body.
>> >> You
>> >> > don't know upfront how many groups can occur in the stream.
>> >> >
>> >> > The way I thought about doing this is having a route for each message
>> >> > group. Since I don't know upfront how many and what groups there
>> will be
>> >> > then I would need to create routes dynamically. If a message comes
>> >> > belonging to a group that doesn't have it's handling route, then i
>> could
>> >> > create it (is that even possible??) Then if there's no messages
>> coming
>> >> for
>> >> > a given group in some time I could remove the route for the group to
>> >> > cleanup (is that possible?)
>> >> >
>> >> > New to Camel
>> >> >
>> >> > Thx!
>> >> > Artur
>> >>
>> >>
>> >>
>> >> --
>> >> Zoran Regvart
>> >>
>>
>>
>>
>> --
>> Zoran Regvart
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message