camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Carman <>
Subject Re: Socket-based Asynchronous Calls...
Date Wed, 24 Aug 2011 10:43:24 GMT
I have read the source:

Take a look at the process() method.  In there, there is a block of
code that does:

        ChannelFuture channelFuture;
        final Channel channel;
        try {
            channelFuture = openConnection(exchange, callback);
            channel = openChannel(channelFuture);
        } catch (Exception e) {
            return true;

This is not inside an if block or anything and the openConnection()
method does actually open it, it isn't just returning a
previously-opened connection or anything.

Perhaps I'm missing something (entirely possible), but it appears that
it's opening the connection every time the process() method is called.

On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <> wrote:
> That doesn't sound right, what have you read? Logs/docs?
> And are you using keep-alive?
> Taariq
> On 24 Aug 2011, at 12:12 AM, James Carman <> wrote:
>> Well, it looks like the camel-netty component won't work for me.  It
>> appears that it opens the connection for each exchange.  Am I reading
>> that right?  What I need is a persistent connection with automatic
>> reconnects.  Oh well, back to the drawing board.
>> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>> <> wrote:
>>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>>> going to need to write.  I need an async processor that remembers the
>>> AsyncCallback and associates it with a correlation id.  Then, when
>>> another exchange comes in that has the same correlation id, it will
>>> lookup the previous callback and say that it's done.  I have a lot of
>>> questions, though.  I've never had to get so "down and dirty" with
>>> Camel before.  The components have just worked for me "off the shelf."
>>> 1.  Do I just copy the input message of the Exchange that comes in
>>> second to the output message of the originating exchange?
>>> 2.  How do I do a timeout for the original caller (the CXF request)?
>>> 3.  How do I detect that the caller has timed out if they do?
>>> I'm sure I'll have more questions, but these are the ones off the top
>>> of my head.
>>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <> wrote:
>>>> James I think the rest of your puzzle is solved by Camel's async API,
>>>> you might have to check if your task is done, maybe your
>>>> requestResponse populates some collection of responses and provides
>>>> some API to return the response given a correlationID.
>>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>>> your answer.
>>>> [1]
>>>> Taariq
>>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>>> <> wrote:
>>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>>>> more about how these aggregators work..  However, I still don't
>>>>> understand how to take care of my problem.  I guess I'm going to have
>>>>> to roll my own processor or something.
>>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <>
>>>>>> Hmmm.
>>>>>> Maybe others can help with that if it's possible, I haven't had to
wrestle with it.
>>>>>> In my case it is actually a cxf service too, but it's asynchronous
 and I send the response once I have it, indicating either timeout or the actual response.
>>>>>> Sorry I responded to your question without going back to see your
other posts.
>>>>>> Taariq
>>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <>
>>>>>>> In my case, the originating request comes from CXF.  How do
I send the
>>>>>>> aggregated response back to CXF?
>>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <>
>>>>>>>> The consumer that handles the aggregated/timed-out request
or response.
>>>>>>>> I have to resend a few times if it's the request, I simply
feed it back into "direct:socketRequestRoute" with the header for the number of retry attempts
>>>>>>>> If it's the response I can forward to some process.
>>>>>>>> Taariq
>>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <>
>>>>>>>>> What's listening on the:
>>>>>>>>> to("direct:requestResponse")
>>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <>
>>>>>>>>>> Sure
>>>>>>>>>> You can of course solve what I've described many
ways, but I'll
>>>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>>> This first route is the main route I mentioned earlier,
so you send
>>>>>>>>>> your socket messages here and it's multicast to both
the aggregator
>>>>>>>>>> and to the socket.
>>>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>>> This next route will aggregate, both requests and
responses are sent
>>>>>>>>>> here as you envisaged.
>>>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>>>> requestResponseAggregator)
>>>>>>>>>>                .completionSize(2)
>>>>>>>>>>                .completionTimeout(5000)
>>>>>>>>>>                .to("direct:requestResponse");
//Here you can send the
>>>>>>>>>> "aggregated" message, in my case it's only the response
I forward
>>>>>>>>>> unless there's a timeout, then I forward the request
of course.
>>>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>>>   //this headerEnricher doesn't have to be a processor,
you have many
>>>>>>>>>> options to add a header.
>>>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>>> Taariq
>>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>>>> <> wrote:
>>>>>>>>>>> Care to share an example?  I'm not picturing
>>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack
<> wrote:
>>>>>>>>>>>> Hi James
>>>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>>>> I send the message to a route that forwards
to both the aggregator and to the socket.
>>>>>>>>>>>> When the response comes in I use an enricher
to add the ID to the headers and then forward to the aggregator.
>>>>>>>>>>>> Taariq
>>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman
<> wrote:
>>>>>>>>>>>>> Willem,
>>>>>>>>>>>>> Thank you for your help.  I don't think
this is doing exactly what I
>>>>>>>>>>>>> need, though.  The real trick here is
the asynchronous nature of the
>>>>>>>>>>>>> "server" on the other end of this situation.
 I thought about using an
>>>>>>>>>>>>> aggregator to make sure the response
gets matched up with the request
>>>>>>>>>>>>> using a correlation id.  The aggregator
wouldn't aggregate multiple
>>>>>>>>>>>>> responses together into one, it would
just make sure it matches the
>>>>>>>>>>>>> correct response with its request.  Does
this sound like a valid
>>>>>>>>>>>>> approach?  If so, how the heck do I
go about it? :)
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> James
>>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem
Jiang <> wrote:
>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>> Camel async process engine already
provides the way that you want.
>>>>>>>>>>>>>> You can take a look at the camel-cxf
code[1][2] for some example.
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33
AM, Zbarcea Hadrian<>
>>>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>>> I hope I understand your
scenario correctly. Here are a few thoughts. I
>>>>>>>>>>>>>>>> assume want to use camel-netty
[1] to send messages to your sever (if you
>>>>>>>>>>>>>>>> have your own code that does
that, you can use it too, but you'd have to
>>>>>>>>>>>>>>>> write your own Processor
or Component). Iiuic, your scenario is converting a
>>>>>>>>>>>>>>>> 2x in-only to a 1x in-out
async mep. You should then treat your exchange as
>>>>>>>>>>>>>>>> an async in-out and let your
framework (Camel) decompose it and compose it
>>>>>>>>>>>>>>>> back again. I would not keep
threads blocked so I believe your best bet is
>>>>>>>>>>>>>>>> using the Camel async messaging
[2] and Futures (look at the examples using
>>>>>>>>>>>>>>>> asyncSend* and asyncCallback*).
The issue is that Camel is stateless so
>>>>>>>>>>>>>>>> you'll need a correlationId,
which you must have already and something to
>>>>>>>>>>>>>>>> keep your state. A good bet
would be jms [3], or you could write your own.
>>>>>>>>>>>>>>>> If you used jms you would
need to use both a correlationId and a replyTo
>>>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>> Perhaps a bit more information
might be appropriate here.  Eventually,
>>>>>>>>>>>>>>> I'd like to "expose" this route
via web services (using CXF of
>>>>>>>>>>>>>>> course).  So, I would need to
either block the request thread, waiting
>>>>>>>>>>>>>>> for a reply or perhaps check
out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>>>> processing stuff (I'm thinking
this might help us get more done with
>>>>>>>>>>>>>>> less http request threads) to
do more of a continuation thing.
>>>>>>>>>>>>>>> We already have a correlation
id.  The "protocol" requires one and the
>>>>>>>>>>>>>>> server process just echos it
back in the response message.
>>>>>>>>>>>>>>>> You may have to play a bit
with the correlationId and if you cannot use
>>>>>>>>>>>>>>>> the same you can do a second
transformation/correlation using a claim-check
>>>>>>>>>>>>>>>> sort of pattern. If you don't
want to use jms you can implement your own (in
>>>>>>>>>>>>>>>> memory) persistence and correlation.
You can also use a resequencer [4] if
>>>>>>>>>>>>>>>> you want to enforce the order.
If you use asyncCallback, you get the replies
>>>>>>>>>>>>>>>> when they become available,
and you can control that.
>>>>>>>>>>>>>>> I don't think a resequencer is
necessary.  I don't want to guarantee
>>>>>>>>>>>>>>> the ordering.  I'm mostly interested
in throughput here.  So, if a
>>>>>>>>>>>>>>> message comes in after another,
but it can be processed faster, so be
>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>> It's an interesting scenario,
I'll definitely give it more thought, but I
>>>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>> You have been very helpful.  Thank
you for taking the time!
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Willem
>>>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>>>> FuseSource
>>>>>>>>>>>>>> Web:
>>>>>>>>>>>>>> Blog:
>>>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>>>> Weibo: willemjiang

View raw message