cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Kulp <dk...@apache.org>
Subject Re: Jetty Continuations in CXF
Date Tue, 11 Nov 2008 17:31:24 GMT
On Friday 07 November 2008 8:56:46 am Sergey Beryozkin wrote:
> That said :-), even with this extremely small time window I'm getting log
> warnings. I guess it's due to a nature of my tests. I have contol threads
> waiting in the application code which do continuation.resume() as soon as
> they're notified that continuation.suspend() has occured.
>
> I have 10 threads involved, 5 control ones + 5 application ones, I see a
> loss of message approximately once in 5 cases. The fact that cont.resume()
> is done virtually immediately after cont.suspend() can explain it.

Without seeing your code, I cannot really offer valid suggestions, but I'll 
try....   :-)

One thought was in the Continuation object, record if "resume()" has been 
called and if it's been callled by the time the stack unwinds back into the 
Http transport, just re-dispatch immediately.   Either that or have the 
resume block until the http transport sets a "ready to resume" flag just 
before it allows the exception to flow back into jetty. 


Dan

>
> Cheers, Sergey
>
> > That said, I'm now trying to inject a message as a custom continuation
> > object (while preserving the original one if any, both ways) as early as
> > possible, in AbstractInvoker, so the time window at which the race
> > condition I talked about earlier can cause the loss of the original
> > message, is extremely small the time it taked for the
> > continuation.suspend() exception to reach a catch block in
> > AbstractInvoker.
> >
> > Cheers, Sergey
> >
> >> Hi,
> >>
> >> I did some system testing with Jetty continuations and it's going not
> >> too bad. Here's one issue which I've encountered which might or might
> >> not be a problem in cases where continuations are ustilized directly
> >> (that is without our wrappers), as in case of say ServiceMix CXF binding
> >> component.
> >>
> >> The problem is that when continuation.suspend(timeout) has been called,
> >> a resulting RuntimeException might not reach CXF JettyDestination (such
> >> that the original message with its phase chain can be preserved until
> >> the request is resumed) if some other application thread calls
> >> continuation.resume() or continuation suspend timeout expires.
> >>
> >> In case of ServiceMix the latter is a theoretical possibility at the
> >> least. I can see in its code this timeout is configured, but if this
> >> timeout is in the region of up to 1 sec or so then it's feasible that
> >> with a heavy  workload the race condition described above might come to
> >> life.
> >>
> >> That said, as part of my test, I found that even when such condition
> >> occurs, the 'worst' thing which can happen is that a new message and a
> >> new chain are created, that is, the request is not resumed from a
> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
> >> request alltogether, but it all works nonetheless, as all the stack
> >> variables used in various interceptors in my given test at least are all
> >> obtained from a message. The only downside is that that the work which
> >> has already been done earlier as part of handling the suspended request
> >> is repeated again by the interceptors. It can cause issues though in
> >> cases when some interceptors have sideeffects as part of handling a
> >> given input request, say modify a db, etc
> >>
> >> Now, this race condition can be safely avoided if a wrapper proposed by
> >> Dan is used by a server application code as the message can be preserved
> >> immediately at a point a user calls suspend on our wrapper, so without
> >> further doubts I've prototyped it too. It's not possible for SMX
> >> components though
> >>
> >> Comments ?
> >>
> >> Cheers, Sergey
> >>
> >>> I guess my thinking was to tie the continutations directly to the
> >>> PhaseInterceptorChain (since that is going to need to know about them
> >>> anyway).   However, I suppose it could easily be done with a new
> >>> interface. Probably the best thing to do is to stub out a sample
> >>> usecase.   So here goes.....
> >>>
> >>> Lets take a "GreetMe" web service that in the greetMe method will call
> >>> off asynchrously to some JMS service to actually get the result.
> >>>
> >>> @Resource(name = "jmsClient")
> >>> Greeter jmsGreeter
> >>> @Resource
> >>> WebServiceContext context;
> >>> public String greetMe(String arg) {
> >>>     ContinuationSupport contSupport = (ContinuationSupport)
> >>>              context.get(ContinuationSupport.class.getName());
> >>>     if (contSupport == null) {
> >>>          //continuations not supported, must wait
> >>>          return jmsGreeter.greetMe(arg);
> >>>     }
> >>>     Continuation cont = contSupport.getContinuation();
> >>>     if (cont.isResumed()) {
> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
> >>>        return handler.get().getReturn();
> >>>     } else {
> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
> >>>         jmsGreeter.greetMeAsync(arg, handler);
> >>>         cont.suspend(handler);
> >>> return null;   //won't actually get here as suspend will throw a
> >>> ContinuationException
> >>>     }
> >>> }
> >>>
> >>> The Handler would look something like:
> >>> class Handler implements AsyncHandler<GreetMeResponse> {
> >>> GreetMeResponse resp;
> >>>        Continuation cont;
> >>> public Handler(Continuation cont) {
> >>>            this.cont = cont;
> >>>        }
> >>>        public void handleResponse(Response<GreetMeLaterResponse>
> >>> response) { resp = response.get();
> >>>              cont.resume();
> >>>       }
> >>> }
> >>>
> >>> Basically, the HTTP/Jetty transport could provide an implementation of
> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could provide
> >>> one that's pretty much a null op.   Transports that cannot support it
> >>> (like servlet) just wouldn't provide an implementation.
> >>>
> >>>
> >>> Does that make sense?   Other ideas?
> >>>
> >>> Dan
> >>>
> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
> >>>> > No.   We don't want that.   Whatever we do should work for other
> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to jetty
> >>>> > continuations directly.
> >>>>
> >>>> No, I'm not suggesting to tie it up to jetty continuations.
> >>>> Ex.
> >>>>
> >>>> try {
> >>>>   invoke(); // continuation.suspend() somehow by the code being
> >>>> invoked upon }
> >>>> catch (RuntimeException ex) {
> >>>>
> >>>> if
> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
> >>>> throw new SuspendedFault(ex);
> >>>>     // or PhaseInterceptorChain.suspend()
> >>>> }
> >>>> }
> >>>>
> >>>> > Most likely, we could add a "suspend()" method to
> >>>> > PhaseInterceptorChain that would do something very similar and
throw
> >>>> > a "SuspendException" or something in the same package as
> >>>> > PhaseInterceptorChain.
> >>>>
> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though
?
> >>>>
> >>>> >   That would get propogated
> >>>> > back to the JettyDestination that could then call the jetty things.

> >>>> >  The JMS transport could just catch it and more or less ignore
it.  
> >>>> >  We'd then have to add a "resume()" method to the chain which would
> >>>> > call back onto a listener that the transport provides.   Jetty
would
> >>>> > just call the jetty resume stuff. JMS would probably put a runnable
> >>>> > on the workqueue to restart the chain.
> >>>>
> >>>> ok
> >>>>
> >>>> > Also, suspend() would need to check if there is a listener.  If
not,
> >>>> > it should not throw the exception.   Thus, the servlet transport
and
> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
it.
> >>>>
> >>>> ok, not sure I understand about the listener but I think I see what
> >>>> you mean...
> >>>>
> >>>> > Basically, this needs to be done in such a way that it CAN work
for
> >>>> > the non-jetty cases.   However, it also needs to be done in a way
> >>>> > that doesn't affect existing transports.
> >>>>
> >>>> +1
> >>>>
> >>>> Cheers, Sergey
> >>>>
> >>>> > Dan
> >>>> >
> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
> >>>> >> when the "trigger" to wake up the continuation occurs
> >>>> >>
> >>>> >> I think we can can do in JettyDestination omething similar
to what
> >>>> >> is done in SMX. When getting a SuspendedFault exception, we
can
> >>>> >> extract from it the original continuation instance or else
we can
> >>>> >> do ContinuationSupport.getContinuation(request) which should
return
> >>>> >> us the instance. At this point we can use it as a ket to store
the
> >>>> >> current exchange plus all the other info we may need.
> >>>> >>
> >>>> >> When the user/application code does continuation.resume(),
the
> >>>> >> Jetty thread will come back and we will use the
> >>>> >> ContinuationSupport.getContinuation(request) to get us the
active
> >>>> >> continuation and use it to extract the suspended exchange and
> >>>> >> proceed from there, say we'll call PhaseInterceptorPhase.resume(),
> >>>> >> etc, something along the lines you suggested
> >>>> >>
> >>>> >>
> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty
much
> >>>> >> everything to make sure nothing is stored on the stack and
is
> >>>> >> "resumable". Once that is done, the rest is relatively easy.
> >>>> >>
> >>>> >> Yea - probably can be the quite challenging
> >>>> >>
> >>>> >>
> >>>> >> Thoughts ?
> >>>> >>
> >>>> >> Cheers, Sergey
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >>>> >> [3]
> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
> >>>> >>42361 #ac tion_12642361
> >>>> >
> >>>> > --
> >>>> > Daniel Kulp
> >>>> > dkulp@apache.org
> >>>> > http://dankulp.com/blog
> >>>
> >>> --
> >>> Daniel Kulp
> >>> dkulp@apache.org
> >>> http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Mime
View raw message