camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Carman <jcar...@carmanconsulting.com>
Subject Re: Socket-based Asynchronous Calls...
Date Mon, 29 Aug 2011 10:39:51 GMT
I am a commons committer so don't think it hasn't crossed my mind. :)
On Aug 28, 2011 11:48 PM, "Claus Ibsen" <claus.ibsen@gmail.com> wrote:
> On Sat, Aug 27, 2011 at 10:19 AM, James Carman
> <james@carmanconsulting.com> wrote:
>> Well, I can tell you that it certainly didn't seem to work the way I
>> need it to work.  I need a persistent connection (with automatic
>> reconnects).  I also need it to be in/out, but asynchronous (the
>> current incoming message may or may not correspond to the most
>> recently sent message).  For now, I've resorted to just rolling my own
>> solution by directly coding to the Netty API.  I will re-visit with
>> Camel later I'm sure.  Thanks for your help.
>>
>
> Well you could also consider contribution improvements to the Camel
components.
>
> The Camel community love contributions
> http://camel.apache.org/contributing.html
>
>> On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <taariql@gmail.com> wrote:
>>> I expect that the connection will only be closed if the header
>>> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>>>
>>> Glancing at the code I see what you mean, it's quite unlike MINA's
producer
>>> which checks the session to see if it's connected and reuses it, but it
may
>>> be that under the hood, yet further under the hood, hehe, way further
down
>>> into netty's ClientBootstrap and beyond, the connection is being reused.
I
>>> don't know for sure.
>>>
>>> This is from Netty front-page, "True connectionless datagram socket
support
>>> (since 3.1)":
>>> And glancing at that bit elsewhere I think it's possible to do without
this
>>> sort of plumbing, but you'd have to jump into netty code or docs to
confirm.
>>>
>>> Depending on timing and other factors I would go ahead with a POC
because it
>>> either works or it will work, a failing test from your POC will be most
>>> welcome.
>>>
>>> Taariq
>>>
>>>
>>> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
>>> <james@carmanconsulting.com>wrote:
>>>
>>>> I have read the source:
>>>>
>>>>
>>>>
http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>>>
>>>> Take a look at the process() method.  In there, there is a block of
>>>> code that does:
>>>>
>>>>        ChannelFuture channelFuture;
>>>>        final Channel channel;
>>>>        try {
>>>>            channelFuture = openConnection(exchange, callback);
>>>>            channel = openChannel(channelFuture);
>>>>        } catch (Exception e) {
>>>>            exchange.setException(e);
>>>>            callback.done(true);
>>>>            return true;
>>>>        }
>>>>
>>>>
>>>> This is not inside an if block or anything and the openConnection()
>>>> method does actually open it, it isn't just returning a
>>>> previously-opened connection or anything.
>>>>
>>>> Perhaps I'm missing something (entirely possible), but it appears that
>>>> it's opening the connection every time the process() method is called.
>>>>
>>>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <taariql@gmail.com>
wrote:
>>>> > That doesn't sound right, what have you read? Logs/docs?
>>>> > And are you using keep-alive?
>>>> >
>>>> > Taariq
>>>> >
>>>> >
>>>> > On 24 Aug 2011, at 12:12 AM, James Carman <james@carmanconsulting.com
>
>>>> wrote:
>>>> >
>>>> >> Well, it looks like the camel-netty component won't work for me.
 It
>>>> >> appears that it opens the connection for each exchange.  Am I
reading
>>>> >> that right?  What I need is a persistent connection with automatic
>>>> >> reconnects.  Oh well, back to the drawing board.
>>>> >>
>>>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>>>> >> <james@carmanconsulting.com> wrote:
>>>> >>> That's what I've been staring at! :)  Here's what I'm thinking
I'm
>>>> >>> going to need to write.  I need an async processor that remembers
the
>>>> >>> AsyncCallback and associates it with a correlation id.  Then,
when
>>>> >>> another exchange comes in that has the same correlation id,
it will
>>>> >>> lookup the previous callback and say that it's done.  I have
a lot
of
>>>> >>> questions, though.  I've never had to get so "down and dirty"
with
>>>> >>> Camel before.  The components have just worked for me "off the
shelf."
>>>> >>>
>>>> >>> 1.  Do I just copy the input message of the Exchange that comes
in
>>>> >>> second to the output message of the originating exchange?
>>>> >>> 2.  How do I do a timeout for the original caller (the CXF
request)?
>>>> >>> 3.  How do I detect that the caller has timed out if they do?
>>>> >>>
>>>> >>> I'm sure I'll have more questions, but these are the ones off
the
top
>>>> >>> of my head.
>>>> >>>
>>>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taariql@gmail.com>
>>>> wrote:
>>>> >>>> James I think the rest of your puzzle is solved by Camel's
async
API,
>>>> >>>> you might have to check if your task is done, maybe your
>>>> >>>> requestResponse populates some collection of responses and
provides
>>>> >>>> some API to return the response given a correlationID.
>>>> >>>> Stare at the async docs [1] a few more times and I'm sure
you'll
find
>>>> >>>> your answer.
>>>> >>>>
>>>> >>>> [1] http://camel.apache.org/async.html
>>>> >>>>
>>>> >>>> Taariq
>>>> >>>>
>>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>>> >>>> <james@carmanconsulting.com> wrote:
>>>> >>>>> No worries!  Thank you for your help.  It helped me
understand a
bit
>>>> >>>>> more about how these aggregators work..  However, I
still don't
>>>> >>>>> understand how to take care of my problem.  I guess
I'm going to
have
>>>> >>>>> to roll my own processor or something.
>>>> >>>>>
>>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taariql@gmail.com
>
>>>> wrote:
>>>> >>>>>> Hmmm.
>>>> >>>>>> Maybe others can help with that if it's possible,
I haven't had
to
>>>> wrestle with it.
>>>> >>>>>>
>>>> >>>>>> In my case it is actually a cxf service too, but
it's
asynchronous
>>>>  and I send the response once I have it, indicating either timeout or
the
>>>> actual response.
>>>> >>>>>>
>>>> >>>>>> Sorry I responded to your question without going
back to see
your
>>>> other posts.
>>>> >>>>>>
>>>> >>>>>> Taariq
>>>> >>>>>>
>>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>
>>>> >>>>>>> In my case, the originating request comes from
CXF.  How do I
send
>>>> the
>>>> >>>>>>> aggregated response back to CXF?
>>>> >>>>>>>
>>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack
<
taariql@gmail.com>
>>>> wrote:
>>>> >>>>>>>> The consumer that handles the aggregated/timed-out
request or
>>>> response.
>>>> >>>>>>>>
>>>> >>>>>>>> I have to resend a few times if it's the
request, I simply
feed it
>>>> back into "direct:socketRequestRoute" with the header for the number of
>>>> retry attempts incremented.
>>>> >>>>>>>> If it's the response I can forward to some
process.
>>>> >>>>>>>>
>>>> >>>>>>>> Taariq
>>>> >>>>>>>>
>>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman
<
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> What's listening on the:
>>>> >>>>>>>>>
>>>> >>>>>>>>> to("direct:requestResponse")
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq
Levack <
>>>> taariql@gmail.com> wrote:
>>>> >>>>>>>>>> Sure
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> You can of course solve what I've
described many ways, but
I'll
>>>> >>>>>>>>>> explain using 3 routes as that's
what I used.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This first route is the main route
I mentioned earlier, so
you
>>>> send
>>>> >>>>>>>>>> your socket messages here and it's
multicast to both the
>>>> aggregator
>>>> >>>>>>>>>> and to the socket.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>>
from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>> >>>>>>>>>>  "someOutboundSocketEndpoint");
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This next route will aggregate,
both requests and responses
are
>>>> sent
>>>> >>>>>>>>>> here as you envisaged.
>>>> >>>>>>>>>> from("direct:requestResponseAggregator").
>>>> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>> >>>>>>>>>> requestResponseAggregator)
>>>> >>>>>>>>>>                .completionSize(2)
>>>> >>>>>>>>>>                .completionTimeout(5000)
>>>> >>>>>>>>>>                .to("direct:requestResponse");
//Here you can
>>>> send the
>>>> >>>>>>>>>> "aggregated" message, in my case
it's only the response I
>>>> forward
>>>> >>>>>>>>>> unless there's a timeout, then I
forward the request of
course.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Finally the route that consumes
the socket responses.
>>>> >>>>>>>>>>
>>>>
from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>> >>>>>>>>>>   //this headerEnricher doesn't
have to be a processor, you
have
>>>> many
>>>> >>>>>>>>>> options to add a header.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> If that's not clear feel free to
ask.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Taariq
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM,
James Carman
>>>> >>>>>>>>>> <james@carmanconsulting.com>
wrote:
>>>> >>>>>>>>>>> Care to share an example?  I'm
not picturing it.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23
PM, Taariq Levack <
>>>> taariql@gmail.com> wrote:
>>>> >>>>>>>>>>>> Hi James
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> I did that too for what
it's worth.
>>>> >>>>>>>>>>>> I send the message to a
route that forwards to both the
>>>> aggregator and to the socket.
>>>> >>>>>>>>>>>> When the response comes
in I use an enricher to add the ID
to
>>>> the headers and then forward to the aggregator.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Taariq
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55
PM, James Carman <
>>>> james@carmanconsulting.com> wrote:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Willem,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thank you for your help.
 I don't think this is doing
exactly
>>>> what I
>>>> >>>>>>>>>>>>> need, though.  The real
trick here is the asynchronous
nature
>>>> of the
>>>> >>>>>>>>>>>>> "server" on the other
end of this situation.  I thought
about
>>>> using an
>>>> >>>>>>>>>>>>> aggregator to make sure
the response gets matched up with
the
>>>> request
>>>> >>>>>>>>>>>>> using a correlation
id.  The aggregator wouldn't
aggregate
>>>> multiple
>>>> >>>>>>>>>>>>> responses together into
one, it would just make sure it
>>>> matches the
>>>> >>>>>>>>>>>>> correct response with
its request.  Does this sound like
a
>>>> valid
>>>> >>>>>>>>>>>>> approach?  If so, how
the heck do I go about it? :)
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> James
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011
at 9:03 PM, Willem Jiang <
>>>> willem.jiang@gmail.com> wrote:
>>>> >>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Camel async process
engine already provides the way that
you
>>>> want.
>>>> >>>>>>>>>>>>>> You can take a look
at the camel-cxf code[1][2] for some
>>>> example.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> [1]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>> >>>>>>>>>>>>>> [2]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM,
James Carman wrote:
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> On Sat, Aug
6, 2011 at 10:33 AM, Zbarcea Hadrian<
>>>> hzbarcea@gmail.com>
>>>> >>>>>>>>>>>>>>>  wrote:
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> I hope I
understand your scenario correctly. Here are
a
>>>> few thoughts. I
>>>> >>>>>>>>>>>>>>>> assume want
to use camel-netty [1] to send messages to
>>>> your sever (if you
>>>> >>>>>>>>>>>>>>>> have your
own code that does that, you can use it too,
but
>>>> you'd have to
>>>> >>>>>>>>>>>>>>>> write your
own Processor or Component). Iiuic, your
>>>> scenario is converting a
>>>> >>>>>>>>>>>>>>>> 2x in-only
to a 1x in-out async mep. You should then
treat
>>>> your exchange as
>>>> >>>>>>>>>>>>>>>> an async
in-out and let your framework (Camel)
decompose
>>>> it and compose it
>>>> >>>>>>>>>>>>>>>> back again.
I would not keep threads blocked so I
believe
>>>> your best bet is
>>>> >>>>>>>>>>>>>>>> using the
Camel async messaging [2] and Futures (look
at
>>>> the examples using
>>>> >>>>>>>>>>>>>>>> asyncSend*
and asyncCallback*). The issue is that
Camel is
>>>> stateless so
>>>> >>>>>>>>>>>>>>>> you'll need
a correlationId, which you must have
already
>>>> and something to
>>>> >>>>>>>>>>>>>>>> keep your
state. A good bet would be jms [3], or you
could
>>>> write your own.
>>>> >>>>>>>>>>>>>>>> If you used
jms you would need to use both a
correlationId
>>>> and a replyTo
>>>> >>>>>>>>>>>>>>>> queue.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>>
>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Perhaps a bit
more information might be appropriate
here.
>>>>  Eventually,
>>>> >>>>>>>>>>>>>>> I'd like to
"expose" this route via web services (using
CXF
>>>> of
>>>> >>>>>>>>>>>>>>> course).  So,
I would need to either block the request
>>>> thread, waiting
>>>> >>>>>>>>>>>>>>> for a reply
or perhaps check out the new Servlet 3.0
>>>> asynchronous
>>>> >>>>>>>>>>>>>>> processing stuff
(I'm thinking this might help us get
more
>>>> done with
>>>> >>>>>>>>>>>>>>> less http request
threads) to do more of a continuation
>>>> thing.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> We already have
a correlation id.  The "protocol"
requires
>>>> one and the
>>>> >>>>>>>>>>>>>>> server process
just echos it back in the response
message.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> You may
have to play a bit with the correlationId and
if
>>>> you cannot use
>>>> >>>>>>>>>>>>>>>> the same
you can do a second
transformation/correlation
>>>> using a claim-check
>>>> >>>>>>>>>>>>>>>> sort of
pattern. If you don't want to use jms you can
>>>> implement your own (in
>>>> >>>>>>>>>>>>>>>> memory)
persistence and correlation. You can also use
a
>>>> resequencer [4] if
>>>> >>>>>>>>>>>>>>>> you want
to enforce the order. If you use
asyncCallback,
>>>> you get the replies
>>>> >>>>>>>>>>>>>>>> when they
become available, and you can control that.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> I don't think
a resequencer is necessary.  I don't want
to
>>>> guarantee
>>>> >>>>>>>>>>>>>>> the ordering.
 I'm mostly interested in throughput
here.
>>>>  So, if a
>>>> >>>>>>>>>>>>>>> message comes
in after another, but it can be processed
>>>> faster, so be
>>>> >>>>>>>>>>>>>>> it.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> It's an
interesting scenario, I'll definitely give it
more
>>>> thought, but I
>>>> >>>>>>>>>>>>>>>> hope this
helps.
>>>> >>>>>>>>>>>>>>>> Hadrian
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> You have been
very helpful.  Thank you for taking the
time!
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> --
>>>> >>>>>>>>>>>>>> Willem
>>>> >>>>>>>>>>>>>> ----------------------------------
>>>> >>>>>>>>>>>>>> FuseSource
>>>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com
(English)
>>>> >>>>>>>>>>>>>>         http://jnn.javaeye.com
(Chinese)
>>>> >>>>>>>>>>>>>> Twitter: willemjiang
>>>> >>>>>>>>>>>>>> Weibo: willemjiang
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>
>>>> >
>>>>
>>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message