camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taariq Levack <taar...@gmail.com>
Subject Re: Socket-based Asynchronous Calls...
Date Tue, 16 Aug 2011 20:29:07 GMT
The consumer that handles the aggregated/timed-out request or response.

I have to resend a few times if it's the request, I simply feed it back into "direct:socketRequestRoute"
with the header for the number of retry attempts incremented.
If it's the response I can forward to some process.

Taariq

On 16 Aug 2011, at 10:18 PM, James Carman <james@carmanconsulting.com> wrote:

> What's listening on the:
> 
> to("direct:requestResponse")
> 
> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taariql@gmail.com> wrote:
>> Sure
>> 
>> You can of course solve what I've described many ways, but I'll
>> explain using 3 routes as that's what I used.
>> 
>> This first route is the main route I mentioned earlier, so you send
>> your socket messages here and it's multicast to both the aggregator
>> and to the socket.
>> 
>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>  "someOutboundSocketEndpoint");
>> 
>> 
>> This next route will aggregate, both requests and responses are sent
>> here as you envisaged.
>> from("direct:requestResponseAggregator").
>>                .aggregate(header("someCorrellationId"),
>> requestResponseAggregator)
>>                .completionSize(2)
>>                .completionTimeout(5000)
>>                .to("direct:requestResponse"); //Here you can send the
>> "aggregated" message, in my case it's only the response I forward
>> unless there's a timeout, then I forward the request of course.
>> 
>> Finally the route that consumes the socket responses.
>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>   //this headerEnricher doesn't have to be a processor, you have many
>> options to add a header.
>> 
>> If that's not clear feel free to ask.
>> 
>> Taariq
>> 
>> 
>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>> <james@carmanconsulting.com> wrote:
>>> Care to share an example?  I'm not picturing it.
>>> 
>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taariql@gmail.com> wrote:
>>>> Hi James
>>>> 
>>>> I did that too for what it's worth.
>>>> I send the message to a route that forwards to both the aggregator and to
the socket.
>>>> When the response comes in I use an enricher to add the ID to the headers
and then forward to the aggregator.
>>>> 
>>>> Taariq
>>>> 
>>>> On 16 Aug 2011, at 8:55 PM, James Carman <james@carmanconsulting.com>
wrote:
>>>> 
>>>>> Willem,
>>>>> 
>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>> "server" on the other end of this situation.  I thought about using an
>>>>> aggregator to make sure the response gets matched up with the request
>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>> responses together into one, it would just make sure it matches the
>>>>> correct response with its request.  Does this sound like a valid
>>>>> approach?  If so, how the heck do I go about it? :)
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> James
>>>>> 
>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.jiang@gmail.com>
wrote:
>>>>>> Hi James,
>>>>>> 
>>>>>> Camel async process engine already provides the way that you want.
>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>> 
>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>> 
>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>> 
>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbarcea@gmail.com>
>>>>>>>  wrote:
>>>>>>>> 
>>>>>>>> Hi James,
>>>>>>>> 
>>>>>>>> I hope I understand your scenario correctly. Here are a few
thoughts. I
>>>>>>>> assume want to use camel-netty [1] to send messages to your
sever (if you
>>>>>>>> have your own code that does that, you can use it too, but
you'd have to
>>>>>>>> write your own Processor or Component). Iiuic, your scenario
is converting a
>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
your exchange as
>>>>>>>> an async in-out and let your framework (Camel) decompose
it and compose it
>>>>>>>> back again. I would not keep threads blocked so I believe
your best bet is
>>>>>>>> using the Camel async messaging [2] and Futures (look at
the examples using
>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
stateless so
>>>>>>>> you'll need a correlationId, which you must have already
and something to
>>>>>>>> keep your state. A good bet would be jms [3], or you could
write your own.
>>>>>>>> If you used jms you would need to use both a correlationId
and a replyTo
>>>>>>>> queue.
>>>>>>>> 
>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>> 
>>>>>>> 
>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>> course).  So, I would need to either block the request thread,
waiting
>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>> processing stuff (I'm thinking this might help us get more done
with
>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>> 
>>>>>>> We already have a correlation id.  The "protocol" requires one
and the
>>>>>>> server process just echos it back in the response message.
>>>>>>> 
>>>>>>>> You may have to play a bit with the correlationId and if
you cannot use
>>>>>>>> the same you can do a second transformation/correlation using
a claim-check
>>>>>>>> sort of pattern. If you don't want to use jms you can implement
your own (in
>>>>>>>> memory) persistence and correlation. You can also use a resequencer
[4] if
>>>>>>>> you want to enforce the order. If you use asyncCallback,
you get the replies
>>>>>>>> when they become available, and you can control that.
>>>>>>>> 
>>>>>>> 
>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>> the ordering.  I'm mostly interested in throughput here.  So,
if a
>>>>>>> message comes in after another, but it can be processed faster,
so be
>>>>>>> it.
>>>>>>> 
>>>>>>>> It's an interesting scenario, I'll definitely give it more
thought, but I
>>>>>>>> hope this helps.
>>>>>>>> Hadrian
>>>>>>>> 
>>>>>>> 
>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Willem
>>>>>> ----------------------------------
>>>>>> FuseSource
>>>>>> Web: http://www.fusesource.com
>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>> Twitter: willemjiang
>>>>>> Weibo: willemjiang
>>>>>> 
>>>> 
>>> 
>> 

Mime
View raw message