Return-Path: X-Original-To: apmail-camel-users-archive@www.apache.org Delivered-To: apmail-camel-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B7DE89D6 for ; Wed, 24 Aug 2011 10:44:38 +0000 (UTC) Received: (qmail 10279 invoked by uid 500); 24 Aug 2011 10:44:36 -0000 Delivered-To: apmail-camel-users-archive@camel.apache.org Received: (qmail 9470 invoked by uid 500); 24 Aug 2011 10:44:18 -0000 Mailing-List: contact users-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@camel.apache.org Delivered-To: mailing list users@camel.apache.org Received: (qmail 9430 invoked by uid 99); 24 Aug 2011 10:44:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2011 10:44:13 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (nike.apache.org: local policy) Received: from [209.85.160.173] (HELO mail-gy0-f173.google.com) (209.85.160.173) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2011 10:44:05 +0000 Received: by gyd12 with SMTP id 12so831367gyd.32 for ; Wed, 24 Aug 2011 03:43:44 -0700 (PDT) Received: by 10.236.179.72 with SMTP id g48mr29633738yhm.50.1314182624316; Wed, 24 Aug 2011 03:43:44 -0700 (PDT) MIME-Version: 1.0 Sender: jcarman@carmanconsulting.com Received: by 10.147.136.15 with HTTP; Wed, 24 Aug 2011 03:43:24 -0700 (PDT) In-Reply-To: References: <2338A988-ACD0-493A-A68F-CA1BB772A6D2@gmail.com> <4E3F35F0.3080108@gmail.com> <2B28C623-6717-4BA3-806B-2FAB4259D34A@gmail.com> <27E994C1-09F6-4DF7-912B-7BC4DBBBB9E1@gmail.com> From: James Carman Date: Wed, 24 Aug 2011 06:43:24 -0400 X-Google-Sender-Auth: udT4a11OPXjaikKQ-mljYhJRFTo Message-ID: Subject: Re: Socket-based Asynchronous Calls... To: users@camel.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org I have read the source: http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main= /java/org/apache/camel/component/netty/NettyProducer.java Take a look at the process() method. In there, there is a block of code that does: ChannelFuture channelFuture; final Channel channel; try { channelFuture =3D openConnection(exchange, callback); channel =3D openChannel(channelFuture); } catch (Exception e) { exchange.setException(e); callback.done(true); return true; } This is not inside an if block or anything and the openConnection() method does actually open it, it isn't just returning a previously-opened connection or anything. Perhaps I'm missing something (entirely possible), but it appears that it's opening the connection every time the process() method is called. On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack wrote: > That doesn't sound right, what have you read? Logs/docs? > And are you using keep-alive? > > Taariq > > > On 24 Aug 2011, at 12:12 AM, James Carman wr= ote: > >> Well, it looks like the camel-netty component won't work for me. =A0It >> appears that it opens the connection for each exchange. =A0Am I reading >> that right? =A0What I need is a persistent connection with automatic >> reconnects. =A0Oh well, back to the drawing board. >> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman >> wrote: >>> That's what I've been staring at! :) =A0Here's what I'm thinking I'm >>> going to need to write. =A0I need an async processor that remembers the >>> AsyncCallback and associates it with a correlation id. =A0Then, when >>> another exchange comes in that has the same correlation id, it will >>> lookup the previous callback and say that it's done. =A0I have a lot of >>> questions, though. =A0I've never had to get so "down and dirty" with >>> Camel before. =A0The components have just worked for me "off the shelf.= " >>> >>> 1. =A0Do I just copy the input message of the Exchange that comes in >>> second to the output message of the originating exchange? >>> 2. =A0How do I do a timeout for the original caller (the CXF request)? >>> 3. =A0How do I detect that the caller has timed out if they do? >>> >>> I'm sure I'll have more questions, but these are the ones off the top >>> of my head. >>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack wrot= e: >>>> James I think the rest of your puzzle is solved by Camel's async API, >>>> you might have to check if your task is done, maybe your >>>> requestResponse populates some collection of responses and provides >>>> some API to return the response given a correlationID. >>>> Stare at the async docs [1] a few more times and I'm sure you'll find >>>> your answer. >>>> >>>> [1] http://camel.apache.org/async.html >>>> >>>> Taariq >>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman >>>> wrote: >>>>> No worries! =A0Thank you for your help. =A0It helped me understand a = bit >>>>> more about how these aggregators work.. =A0However, I still don't >>>>> understand how to take care of my problem. =A0I guess I'm going to ha= ve >>>>> to roll my own processor or something. >>>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack wr= ote: >>>>>> Hmmm. >>>>>> Maybe others can help with that if it's possible, I haven't had to w= restle with it. >>>>>> >>>>>> In my case it is actually a cxf service too, but it's asynchronous = =A0and I send the response once I have it, indicating either timeout or the= actual response. >>>>>> >>>>>> Sorry I responded to your question without going back to see your ot= her posts. >>>>>> >>>>>> Taariq >>>>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman wrote: >>>>>> >>>>>>> In my case, the originating request comes from CXF. =A0How do I sen= d the >>>>>>> aggregated response back to CXF? >>>>>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack = wrote: >>>>>>>> The consumer that handles the aggregated/timed-out request or resp= onse. >>>>>>>> >>>>>>>> I have to resend a few times if it's the request, I simply feed it= back into "direct:socketRequestRoute" with the header for the number of re= try attempts incremented. >>>>>>>> If it's the response I can forward to some process. >>>>>>>> >>>>>>>> Taariq >>>>>>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman wrote: >>>>>>>> >>>>>>>>> What's listening on the: >>>>>>>>> >>>>>>>>> to("direct:requestResponse") >>>>>>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack wrote: >>>>>>>>>> Sure >>>>>>>>>> >>>>>>>>>> You can of course solve what I've described many ways, but I'll >>>>>>>>>> explain using 3 routes as that's what I used. >>>>>>>>>> >>>>>>>>>> This first route is the main route I mentioned earlier, so you s= end >>>>>>>>>> your socket messages here and it's multicast to both the aggrega= tor >>>>>>>>>> and to the socket. >>>>>>>>>> >>>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:request= ResponseAggregator", >>>>>>>>>> =A0"someOutboundSocketEndpoint"); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This next route will aggregate, both requests and responses are = sent >>>>>>>>>> here as you envisaged. >>>>>>>>>> from("direct:requestResponseAggregator"). >>>>>>>>>> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0.aggregate(header("someCorrellati= onId"), >>>>>>>>>> requestResponseAggregator) >>>>>>>>>> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0.completionSize(2) >>>>>>>>>> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0.completionTimeout(5000) >>>>>>>>>> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0.to("direct:requestResponse"); //= Here you can send the >>>>>>>>>> "aggregated" message, in my case it's only the response I forwar= d >>>>>>>>>> unless there's a timeout, then I forward the request of course. >>>>>>>>>> >>>>>>>>>> Finally the route that consumes the socket responses. >>>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to(= "direct:requestResponseAggregator"); >>>>>>>>>> =A0 //this headerEnricher doesn't have to be a processor, you ha= ve many >>>>>>>>>> options to add a header. >>>>>>>>>> >>>>>>>>>> If that's not clear feel free to ask. >>>>>>>>>> >>>>>>>>>> Taariq >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>>>>>>>>> wrote: >>>>>>>>>>> Care to share an example? =A0I'm not picturing it. >>>>>>>>>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack wrote: >>>>>>>>>>>> Hi James >>>>>>>>>>>> >>>>>>>>>>>> I did that too for what it's worth. >>>>>>>>>>>> I send the message to a route that forwards to both the aggreg= ator and to the socket. >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to = the headers and then forward to the aggregator. >>>>>>>>>>>> >>>>>>>>>>>> Taariq >>>>>>>>>>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Willem, >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you for your help. =A0I don't think this is doing exact= ly what I >>>>>>>>>>>>> need, though. =A0The real trick here is the asynchronous natu= re of the >>>>>>>>>>>>> "server" on the other end of this situation. =A0I thought abo= ut using an >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the= request >>>>>>>>>>>>> using a correlation id. =A0The aggregator wouldn't aggregate = multiple >>>>>>>>>>>>> responses together into one, it would just make sure it match= es the >>>>>>>>>>>>> correct response with its request. =A0Does this sound like a = valid >>>>>>>>>>>>> approach? =A0If so, how the heck do I go about it? :) >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> James >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang wrote: >>>>>>>>>>>>>> Hi James, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Camel async process engine already provides the way that you= want. >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some exa= mple. >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel= -cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=3Dm= arkup >>>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel= -cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=3Dm= arkup >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian >>>>>>>>>>>>>>> =A0wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi James, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a fe= w thoughts. I >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to you= r sever (if you >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but= you'd have to >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenar= io is converting a >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat= your exchange as >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose i= t and compose it >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe = your best bet is >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at t= he examples using >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is= stateless so >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already a= nd something to >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could= write your own. >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId= and a replyTo >>>>>>>>>>>>>>>> queue. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=3DcorrelationI= d"); >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here. = =A0Eventually, >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF= of >>>>>>>>>>>>>>> course). =A0So, I would need to either block the request th= read, waiting >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynch= ronous >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more = done with >>>>>>>>>>>>>>> less http request threads) to do more of a continuation thi= ng. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We already have a correlation id. =A0The "protocol" require= s one and the >>>>>>>>>>>>>>> server process just echos it back in the response message. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if y= ou cannot use >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation us= ing a claim-check >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can impl= ement your own (in >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a re= sequencer [4] if >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, y= ou get the replies >>>>>>>>>>>>>>>> when they become available, and you can control that. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary. =A0I don't want t= o guarantee >>>>>>>>>>>>>>> the ordering. =A0I'm mostly interested in throughput here. = =A0So, if a >>>>>>>>>>>>>>> message comes in after another, but it can be processed fas= ter, so be >>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more= thought, but I >>>>>>>>>>>>>>>> hope this helps. >>>>>>>>>>>>>>>> Hadrian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> You have been very helpful. =A0Thank you for taking the tim= e! >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Willem >>>>>>>>>>>>>> ---------------------------------- >>>>>>>>>>>>>> FuseSource >>>>>>>>>>>>>> Web: http://www.fusesource.com >>>>>>>>>>>>>> Blog: =A0 =A0http://willemjiang.blogspot.com (English) >>>>>>>>>>>>>> =A0 =A0 =A0 =A0 http://jnn.javaeye.com (Chinese) >>>>>>>>>>>>>> Twitter: willemjiang >>>>>>>>>>>>>> Weibo: willemjiang >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >