camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taariq Levack <>
Subject Re: Socket-based Asynchronous Calls...
Date Wed, 17 Aug 2011 05:48:10 GMT
James I think the rest of your puzzle is solved by Camel's async API,
you might have to check if your task is done, maybe your
requestResponse populates some collection of responses and provides
some API to return the response given a correlationID.
Stare at the async docs [1] a few more times and I'm sure you'll find
your answer.



On Tue, Aug 16, 2011 at 11:16 PM, James Carman
<> wrote:
> No worries!  Thank you for your help.  It helped me understand a bit
> more about how these aggregators work..  However, I still don't
> understand how to take care of my problem.  I guess I'm going to have
> to roll my own processor or something.
> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <> wrote:
>> Hmmm.
>> Maybe others can help with that if it's possible, I haven't had to wrestle with it.
>> In my case it is actually a cxf service too, but it's asynchronous  and I send the
response once I have it, indicating either timeout or the actual response.
>> Sorry I responded to your question without going back to see your other posts.
>> Taariq
>> On 16 Aug 2011, at 10:33 PM, James Carman <> wrote:
>>> In my case, the originating request comes from CXF.  How do I send the
>>> aggregated response back to CXF?
>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <> wrote:
>>>> The consumer that handles the aggregated/timed-out request or response.
>>>> I have to resend a few times if it's the request, I simply feed it back into
"direct:socketRequestRoute" with the header for the number of retry attempts incremented.
>>>> If it's the response I can forward to some process.
>>>> Taariq
>>>> On 16 Aug 2011, at 10:18 PM, James Carman <>
>>>>> What's listening on the:
>>>>> to("direct:requestResponse")
>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <>
>>>>>> Sure
>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>> explain using 3 routes as that's what I used.
>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>> and to the socket.
>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>  "someOutboundSocketEndpoint");
>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>> here as you envisaged.
>>>>>> from("direct:requestResponseAggregator").
>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>> requestResponseAggregator)
>>>>>>                .completionSize(2)
>>>>>>                .completionTimeout(5000)
>>>>>>                .to("direct:requestResponse"); //Here you
can send the
>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>> Finally the route that consumes the socket responses.
>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>   //this headerEnricher doesn't have to be a processor, you have
>>>>>> options to add a header.
>>>>>> If that's not clear feel free to ask.
>>>>>> Taariq
>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>> <> wrote:
>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <>
>>>>>>>> Hi James
>>>>>>>> I did that too for what it's worth.
>>>>>>>> I send the message to a route that forwards to both the aggregator
and to the socket.
>>>>>>>> When the response comes in I use an enricher to add the ID
to the headers and then forward to the aggregator.
>>>>>>>> Taariq
>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <>
>>>>>>>>> Willem,
>>>>>>>>> Thank you for your help.  I don't think this is doing
exactly what I
>>>>>>>>> need, though.  The real trick here is the asynchronous
nature of the
>>>>>>>>> "server" on the other end of this situation.  I thought
about using an
>>>>>>>>> aggregator to make sure the response gets matched up
with the request
>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate
>>>>>>>>> responses together into one, it would just make sure
it matches the
>>>>>>>>> correct response with its request.  Does this sound
like a valid
>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>> Thanks,
>>>>>>>>> James
>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <>
>>>>>>>>>> Hi James,
>>>>>>>>>> Camel async process engine already provides the way
that you want.
>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for
some example.
>>>>>>>>>> [1]
>>>>>>>>>> [2]
>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<>
>>>>>>>>>>>  wrote:
>>>>>>>>>>>> Hi James,
>>>>>>>>>>>> I hope I understand your scenario correctly.
Here are a few thoughts. I
>>>>>>>>>>>> assume want to use camel-netty [1] to send
messages to your sever (if you
>>>>>>>>>>>> have your own code that does that, you can
use it too, but you'd have to
>>>>>>>>>>>> write your own Processor or Component). Iiuic,
your scenario is converting a
>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You
should then treat your exchange as
>>>>>>>>>>>> an async in-out and let your framework (Camel)
decompose it and compose it
>>>>>>>>>>>> back again. I would not keep threads blocked
so I believe your best bet is
>>>>>>>>>>>> using the Camel async messaging [2] and Futures
(look at the examples using
>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue
is that Camel is stateless so
>>>>>>>>>>>> you'll need a correlationId, which you must
have already and something to
>>>>>>>>>>>> keep your state. A good bet would be jms
[3], or you could write your own.
>>>>>>>>>>>> If you used jms you would need to use both
a correlationId and a replyTo
>>>>>>>>>>>> queue.
>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>> Perhaps a bit more information might be appropriate
here.  Eventually,
>>>>>>>>>>> I'd like to "expose" this route via web services
(using CXF of
>>>>>>>>>>> course).  So, I would need to either block the
request thread, waiting
>>>>>>>>>>> for a reply or perhaps check out the new Servlet
3.0 asynchronous
>>>>>>>>>>> processing stuff (I'm thinking this might help
us get more done with
>>>>>>>>>>> less http request threads) to do more of a continuation
>>>>>>>>>>> We already have a correlation id.  The "protocol"
requires one and the
>>>>>>>>>>> server process just echos it back in the response
>>>>>>>>>>>> You may have to play a bit with the correlationId
and if you cannot use
>>>>>>>>>>>> the same you can do a second transformation/correlation
using a claim-check
>>>>>>>>>>>> sort of pattern. If you don't want to use
jms you can implement your own (in
>>>>>>>>>>>> memory) persistence and correlation. You
can also use a resequencer [4] if
>>>>>>>>>>>> you want to enforce the order. If you use
asyncCallback, you get the replies
>>>>>>>>>>>> when they become available, and you can control
>>>>>>>>>>> I don't think a resequencer is necessary.  I
don't want to guarantee
>>>>>>>>>>> the ordering.  I'm mostly interested in throughput
here.  So, if a
>>>>>>>>>>> message comes in after another, but it can be
processed faster, so be
>>>>>>>>>>> it.
>>>>>>>>>>>> It's an interesting scenario, I'll definitely
give it more thought, but I
>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>> Hadrian
>>>>>>>>>>> You have been very helpful.  Thank you for taking
the time!
>>>>>>>>>> --
>>>>>>>>>> Willem
>>>>>>>>>> ----------------------------------
>>>>>>>>>> FuseSource
>>>>>>>>>> Web:
>>>>>>>>>> Blog: (English)
>>>>>>>>>> (Chinese)
>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>> Weibo: willemjiang

View raw message