camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Jablonski <ajablon...@ravenpack.com>
Subject Re: Curious routing case
Date Tue, 11 Apr 2017 13:31:17 GMT
I guess one thing that come to my mind is to hide all this parallel stuff
inside a processor, that would just spit out on the other end the result of
processing all those messages. It would handle grouping and serializing and
stuff I guess that would reduce the complexity of the route with a cost of
complexity of the processor. I have no better ideas anyway, so I will give
it a go

On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <ajablonski@ravenpack.com>
wrote:

> Hello,
>
> I don't think this route definition is fitting my use case, though I
> learnt a thing or two about the interesting patterns linked. Thanks!
>
> Ok, so let me try to clarify the use case.
>
>
> 1. The stream is infinite, it's not a batch job. The messages keep on
> coming from SQS 'all the time'
>
> 2. More important thing is about parallel processing.
>
> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.
>
> Let's say this is the order in which the messages happen to appear
>  in the route from SQS
>
> A1, A2, B1, C1, B2, A3, C2, B3
>
> Now what I am trying to achieve is grouping the messages that have to be
> processed sequentially (order doesn't matter as long as no two messages
> from the same group are processed at the same time).
> So I am trying to somehow get these streams
>
> A1, A2, A3
>
> B1, B2, B3
>
> C1, C2
>
>
> So, A1 B1 and C1 can be processed in parallel because they are from
> different groups, but the messages within groups need to be processed one
> by one.
>
> In my example, there are 3 groups, but there can be many and I don't know
> what they are in advance. The processing logic between the groups is
> similar and is a function of the group so I can get a processor for group A
> from a method call getProcessor(A), B getProcessor(B), etc.
>
> I am stuck at how to do that in Camel, because since I don't know the
> groups in advance, I would need to create processing routes dynamically.
>
> Say the system starts, and A1 arrives, there can't be any processor for
> group A yet, since it's the first message from the group and I need to
> somehow dynamically add processing capability of the group A to the route
> and then perhaps if the messages from group A stop arriving for some time,
> that processor could be removed.
>
> How to add the parallel part between the group messages is also blurry to
> me. One way of doing this I was thinking was to do a multicast to all the
> dynamically created processing routes for groups and stick a filter before
> so that only messages from particular group can go through. From multicast
> page:
>
> from("direct:a").multicast().parallelProcessing().to("direct:x",
> "direct:y", "direct:z");
>
> But here the x,y,z endpoints are hardcoded. I could write up some custom
> multicast I suppose to search the routes in CamelContext...... not sure.
>
> Thanks
> Artur
>
>
>
>
>
> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <ajablonski@ravenpack.com>
> wrote:
>
>> Hi Zoran,
>>
>> Thank you for such detailed response. This looks very promising. i will
>> need to get my head around the aggregator pattern.
>> For this week I will be busy with other tasks, but I will get back to it
>> as soon as I can to see if I can get Camel work for the use case.
>>
>> Cheerio
>> Artur
>>
>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zoran@regvart.com> wrote:
>>
>>> Hi Artur,
>>> I was thinking that the order of the messages would be important as
>>> you need to process them sequentially.
>>>
>>> So I think you could use the dynamic message routing[1] with
>>> aggregator[2], something like:
>>>
>>>     from("aws-sqs:...")
>>>         .process("#preProcess")
>>>         .toD("direct:${header.nextRoute}");
>>>
>>>     from("direct:parallel")...;
>>>     from("direct:sequential").aggregate(simple("${header.group}"
>>> )).completion..;
>>>
>>> So from yout SQS queue you would use a processor to pre-process
>>> message whose responsibility would be to set the (custom) `nextRoute`
>>> and (custom) `group` headers. `nextRoute` would be `parallel` or
>>> `sequential`, and if `sequential` the messages would be aggregated
>>> using the `group` header.
>>>
>>> You would want to define your own custom aggregation strategy or use
>>> the completion* options that are available to you. There also might be
>>> need to use seda[3] to fine tune any parallel processing. You might
>>> throw in there a data format unmarshaller[4] instead of the
>>> `preProcess` processor and use something like `${body.xyz} == foo` in
>>> the `toD` expression.
>>>
>>> And I would guess that you need to examine transactions or persistence
>>> at some point also in case your aggregation step runs for a long time
>>> or if your use case is sensitive to message loss if interrupted --
>>> which would undoubtedly lead you back to using queues to separate
>>> those two ways of processing,
>>>
>>> HTH,
>>>
>>> zoran
>>>
>>> [1] https://camel.apache.org/message-endpoint.html
>>> [2] https://camel.apache.org/aggregator2.html
>>> [3] https://camel.apache.org/seda.html
>>> [4] https://camel.apache.org/data-format.html
>>>
>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
>>> <ajablonski@ravenpack.com> wrote:
>>> > Hey Zoran.
>>> >
>>> > I read again the patterns you mentioned. In my use case the order of
>>> > processing within a group doesn't matter as long as two messages from
>>> the
>>> > same group are never processed in parallel. So i guess resenquencer is
>>> out
>>> > of the picture unless I didn't get the intention.
>>> >
>>> > So what we are left with is the content based router. Sure. The message
>>> > comes, i can see what group it belongs two... And what next? Perhaps
>>> it's
>>> > the very first message from that group so I would need to trigger
>>> creating
>>> > route/processor for that group somehow, perhaps messages from this
>>> group
>>> > were processed before in which case the processor for the group should
>>> > already exist...
>>> >
>>> >
>>> >
>>> >
>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zoran@regvart.com> wrote:
>>> >
>>> >> Hi Artur,
>>> >> have a look at Camel EIP page[1], what you describe sounds to me like
>>> >> Resequencer and Content based router patterns,
>>> >>
>>> >> zoran
>>> >>
>>> >> [1] https://camel.apache.org/eip.html
>>> >>
>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>>> >> <ajablonski@ravenpack.com> wrote:
>>> >> > Hello.
>>> >> >
>>> >> > I wonder if someone could push me in the right direction trying
to
>>> >> express
>>> >> > quite curious case in Camel route.
>>> >> >
>>> >> > Imagine there's a stream of messages some of which can be processed
>>> in
>>> >> > parallel and some have to be processed serially. You can group
the
>>> >> messages
>>> >> > that require serial processing together by looking at the message
>>> body.
>>> >> You
>>> >> > don't know upfront how many groups can occur in the stream.
>>> >> >
>>> >> > The way I thought about doing this is having a route for each
>>> message
>>> >> > group. Since I don't know upfront how many and what groups there
>>> will be
>>> >> > then I would need to create routes dynamically. If a message comes
>>> >> > belonging to a group that doesn't have it's handling route, then
i
>>> could
>>> >> > create it (is that even possible??) Then if there's no messages
>>> coming
>>> >> for
>>> >> > a given group in some time I could remove the route for the group
to
>>> >> > cleanup (is that possible?)
>>> >> >
>>> >> > New to Camel
>>> >> >
>>> >> > Thx!
>>> >> > Artur
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Zoran Regvart
>>> >>
>>>
>>>
>>>
>>> --
>>> Zoran Regvart
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message