camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taariq Levack <>
Subject Re: Socket-based Asynchronous Calls...
Date Tue, 16 Aug 2011 20:15:13 GMT
Indeed it is.

On 16 Aug 2011, at 10:12 PM, James Carman <> wrote:

> Is your socket endpoint set up to be async?
> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <> wrote:
>> Sure
>> You can of course solve what I've described many ways, but I'll
>> explain using 3 routes as that's what I used.
>> This first route is the main route I mentioned earlier, so you send
>> your socket messages here and it's multicast to both the aggregator
>> and to the socket.
>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>  "someOutboundSocketEndpoint");
>> This next route will aggregate, both requests and responses are sent
>> here as you envisaged.
>> from("direct:requestResponseAggregator").
>>                .aggregate(header("someCorrellationId"),
>> requestResponseAggregator)
>>                .completionSize(2)
>>                .completionTimeout(5000)
>>                .to("direct:requestResponse"); //Here you can send the
>> "aggregated" message, in my case it's only the response I forward
>> unless there's a timeout, then I forward the request of course.
>> Finally the route that consumes the socket responses.
>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>   //this headerEnricher doesn't have to be a processor, you have many
>> options to add a header.
>> If that's not clear feel free to ask.
>> Taariq
>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>> <> wrote:
>>> Care to share an example?  I'm not picturing it.
>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <> wrote:
>>>> Hi James
>>>> I did that too for what it's worth.
>>>> I send the message to a route that forwards to both the aggregator and to
the socket.
>>>> When the response comes in I use an enricher to add the ID to the headers
and then forward to the aggregator.
>>>> Taariq
>>>> On 16 Aug 2011, at 8:55 PM, James Carman <>
>>>>> Willem,
>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>> "server" on the other end of this situation.  I thought about using an
>>>>> aggregator to make sure the response gets matched up with the request
>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>> responses together into one, it would just make sure it matches the
>>>>> correct response with its request.  Does this sound like a valid
>>>>> approach?  If so, how the heck do I go about it? :)
>>>>> Thanks,
>>>>> James
>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <>
>>>>>> Hi James,
>>>>>> Camel async process engine already provides the way that you want.
>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>> [1]
>>>>>> [2]
>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<>
>>>>>>>  wrote:
>>>>>>>> Hi James,
>>>>>>>> I hope I understand your scenario correctly. Here are a few
thoughts. I
>>>>>>>> assume want to use camel-netty [1] to send messages to your
sever (if you
>>>>>>>> have your own code that does that, you can use it too, but
you'd have to
>>>>>>>> write your own Processor or Component). Iiuic, your scenario
is converting a
>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
your exchange as
>>>>>>>> an async in-out and let your framework (Camel) decompose
it and compose it
>>>>>>>> back again. I would not keep threads blocked so I believe
your best bet is
>>>>>>>> using the Camel async messaging [2] and Futures (look at
the examples using
>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
stateless so
>>>>>>>> you'll need a correlationId, which you must have already
and something to
>>>>>>>> keep your state. A good bet would be jms [3], or you could
write your own.
>>>>>>>> If you used jms you would need to use both a correlationId
and a replyTo
>>>>>>>> queue.
>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>> course).  So, I would need to either block the request thread,
>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>> processing stuff (I'm thinking this might help us get more done
>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>> We already have a correlation id.  The "protocol" requires one
and the
>>>>>>> server process just echos it back in the response message.
>>>>>>>> You may have to play a bit with the correlationId and if
you cannot use
>>>>>>>> the same you can do a second transformation/correlation using
a claim-check
>>>>>>>> sort of pattern. If you don't want to use jms you can implement
your own (in
>>>>>>>> memory) persistence and correlation. You can also use a resequencer
[4] if
>>>>>>>> you want to enforce the order. If you use asyncCallback,
you get the replies
>>>>>>>> when they become available, and you can control that.
>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>> the ordering.  I'm mostly interested in throughput here.  So,
if a
>>>>>>> message comes in after another, but it can be processed faster,
so be
>>>>>>> it.
>>>>>>>> It's an interesting scenario, I'll definitely give it more
thought, but I
>>>>>>>> hope this helps.
>>>>>>>> Hadrian
>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>> --
>>>>>> Willem
>>>>>> ----------------------------------
>>>>>> FuseSource
>>>>>> Web:
>>>>>> Blog: (English)
>>>>>> (Chinese)
>>>>>> Twitter: willemjiang
>>>>>> Weibo: willemjiang

View raw message