cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sergey Beryozkin" <sergey.beryoz...@progress.com>
Subject Re: Jetty Continuations in CXF
Date Wed, 12 Nov 2008 10:21:31 GMT
Hi,

I have had a look. At the moment I don't see why we would have to do this sort of sophisticated
handling of continuations in CXF 
JettyDestination. With CXF, it's the the code being invoked further down the line (be it SMX
CXF binding components or application 
code) which needs to worry about doing either suspending or resuming continuations.

As far as CXF is concerned, it only needs to be able to associate a given inbound message
with a continuation instance. I reckon 
saving it as a continuation user object (preserving the previously set one if any) is a lighter/simpler
alternative than introducing 
maps in the JettyDestination.

However, as I said few times earlier in this thread, there's a race condition which I observe
in certain conditions. Specifically, I 
have a test where a continuation is resumed virtually immediately after it's been suspended
so before the code dealing with 
associating this suspended continuation with the inbound message has a chance to do it, the
continuation.resume() has already 
occured. In CXF case I believe it can happen irrespectively of how we write the code dealing
with continuations under the hood. It 
won't happen if continuation wrappers are used by the application code.

Do you have any comments about this race condition ? Or how a code you linked to can help
to avoid it ?

Cheers, Sergey




>I would really encourage you to take a look at the smx code for
> handling continuations.
> We've had quite a hard time to handle race conditions, timeouts etc...
> because the continuation has a timeout and when the message is
> received back around the timeout, things can become a bit tricky.
>
> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>
> We use one concurrent hash map to associate a message id to a
> continuation and multiple synchronization blocks on the continuation
> itself.
> Also the above code can be used with standard servlet servers (i.e.
> when the continuation is a blocking continuation) which is imho a good
> thing.
>
> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
> <sergey.beryozkin@progress.com> wrote:
>> Hi
>>
>>>>
>>>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>>>> loss of message approximately once in 5 cases. The fact that
>>>> cont.resume()
>>>> is done virtually immediately after cont.suspend() can explain it.
>>>
>>> Without seeing your code, I cannot really offer valid suggestions, but
>>> I'll
>>> try....   :-)
>>
>> I guess having it all on a branch would be handy then :-)
>>
>>>
>>> One thought was in the Continuation object, record if "resume()" has been
>>> called and if it's been callled by the time the stack unwinds back into
>>> the
>>> Http transport, just re-dispatch immediately.   Either that or have the
>>> resume block until the http transport sets a "ready to resume" flag just
>>> before it allows the exception to flow back into jetty.
>>
>> I have 2 tests.
>>
>> In one test an application server code interacts with a wrapper, both when
>> getting a continuation instance and when calling suspend/resume on it (as
>> suggested by yourself earlier in this thread). In this case, under the hood,
>> an inbound message is associated with a continuation instance before
>> suspend() is called on it. Thus even if the resulting exception does not
>> reach Jetty Destination in time before continuation.resume() is called by a
>> control thread, the message is not lost when the HTTP request is resumed as
>> that HTTP request had this continuation instance associated with it at a
>> time ContinuationsSupport.getContinuations(request) was called.
>>
>> In other test which I believe represents an integration scenario with SMX
>> better, an application server code calls Jetty
>> ContinuationsSupport.getContinuations(request) followed by
>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>> runtime exception reaches a catch block in AbstractInvoker (where I try to
>> associate a message with continuation), one or two control threads manage to
>> squeeze in and call resume() before catch block has even been processed. So
>> by the time the wrapped exception reaches JettyDestination a request with a
>> resumed continuation has already come back...
>>
>> Does this explanation for a second case and the associated race condition
>> sounds reasonable ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>>
>>>
>>>
>>> Dan
>>>
>>>>
>>>> Cheers, Sergey
>>>>
>>>> > That said, I'm now trying to inject a message as a custom continuation
>>>> > object (while preserving the original one if any, both ways) as early
>>>> > as
>>>> > possible, in AbstractInvoker, so the time window at which the race
>>>> > condition I talked about earlier can cause the loss of the original
>>>> > message, is extremely small the time it taked for the
>>>> > continuation.suspend() exception to reach a catch block in
>>>> > AbstractInvoker.
>>>> >
>>>> > Cheers, Sergey
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> I did some system testing with Jetty continuations and it's going
not
>>>> >> too bad. Here's one issue which I've encountered which might or
might
>>>> >> not be a problem in cases where continuations are ustilized directly
>>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>>> >> binding
>>>> >> component.
>>>> >>
>>>> >> The problem is that when continuation.suspend(timeout) has been
>>>> >> called,
>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>> >> (such
>>>> >> that the original message with its phase chain can be preserved
until
>>>> >> the request is resumed) if some other application thread calls
>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>> >>
>>>> >> In case of ServiceMix the latter is a theoretical possibility at
the
>>>> >> least. I can see in its code this timeout is configured, but if
this
>>>> >> timeout is in the region of up to 1 sec or so then it's feasible
that
>>>> >> with a heavy  workload the race condition described above might
come
>>>> >> to
>>>> >> life.
>>>> >>
>>>> >> That said, as part of my test, I found that even when such condition
>>>> >> occurs, the 'worst' thing which can happen is that a new message
and a
>>>> >> new chain are created, that is, the request is not resumed from
a
>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a
new
>>>> >> request alltogether, but it all works nonetheless, as all the stack
>>>> >> variables used in various interceptors in my given test at least
are
>>>> >> all
>>>> >> obtained from a message. The only downside is that that the work
which
>>>> >> has already been done earlier as part of handling the suspended
>>>> >> request
>>>> >> is repeated again by the interceptors. It can cause issues though
in
>>>> >> cases when some interceptors have sideeffects as part of handling
a
>>>> >> given input request, say modify a db, etc
>>>> >>
>>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>>> >> by
>>>> >> Dan is used by a server application code as the message can be
>>>> >> preserved
>>>> >> immediately at a point a user calls suspend on our wrapper, so without
>>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>>> >> components though
>>>> >>
>>>> >> Comments ?
>>>> >>
>>>> >> Cheers, Sergey
>>>> >>
>>>> >>> I guess my thinking was to tie the continutations directly to
the
>>>> >>> PhaseInterceptorChain (since that is going to need to know about
them
>>>> >>> anyway).   However, I suppose it could easily be done with a
new
>>>> >>> interface. Probably the best thing to do is to stub out a sample
>>>> >>> usecase.   So here goes.....
>>>> >>>
>>>> >>> Lets take a "GreetMe" web service that in the greetMe method
will
>>>> >>> call
>>>> >>> off asynchrously to some JMS service to actually get the result.
>>>> >>>
>>>> >>> @Resource(name = "jmsClient")
>>>> >>> Greeter jmsGreeter
>>>> >>> @Resource
>>>> >>> WebServiceContext context;
>>>> >>> public String greetMe(String arg) {
>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>> >>>     if (contSupport == null) {
>>>> >>>          //continuations not supported, must wait
>>>> >>>          return jmsGreeter.greetMe(arg);
>>>> >>>     }
>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>> >>>     if (cont.isResumed()) {
>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>> >>>        return handler.get().getReturn();
>>>> >>>     } else {
>>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>> >>>         cont.suspend(handler);
>>>> >>> return null;   //won't actually get here as suspend will throw
a
>>>> >>> ContinuationException
>>>> >>>     }
>>>> >>> }
>>>> >>>
>>>> >>> The Handler would look something like:
>>>> >>> class Handler implements AsyncHandler<GreetMeResponse>
{
>>>> >>> GreetMeResponse resp;
>>>> >>>        Continuation cont;
>>>> >>> public Handler(Continuation cont) {
>>>> >>>            this.cont = cont;
>>>> >>>        }
>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>> >>> response) { resp = response.get();
>>>> >>>              cont.resume();
>>>> >>>       }
>>>> >>> }
>>>> >>>
>>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>>> >>> of
>>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>>> >>> provide
>>>> >>> one that's pretty much a null op.   Transports that cannot support
it
>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>> >>>
>>>> >>>
>>>> >>> Does that make sense?   Other ideas?
>>>> >>>
>>>> >>> Dan
>>>> >>>
>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>>> >>>> > No.   We don't want that.   Whatever we do should work
for other
>>>> >>>> > transports as well like JMS.  Thus, this shouldn't
be tied to
>>>> >>>> > jetty
>>>> >>>> > continuations directly.
>>>> >>>>
>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>> >>>> Ex.
>>>> >>>>
>>>> >>>> try {
>>>> >>>>   invoke(); // continuation.suspend() somehow by the code
being
>>>> >>>> invoked upon }
>>>> >>>> catch (RuntimeException ex) {
>>>> >>>>
>>>> >>>> if
>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>> >>>> throw new SuspendedFault(ex);
>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>> >>>> }
>>>> >>>> }
>>>> >>>>
>>>> >>>> > Most likely, we could add a "suspend()" method to
>>>> >>>> > PhaseInterceptorChain that would do something very
similar and
>>>> >>>> > throw
>>>> >>>> > a "SuspendException" or something in the same package
as
>>>> >>>> > PhaseInterceptorChain.
>>>> >>>>
>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend()
call though
>>>> >>>> ?
>>>> >>>>
>>>> >>>> >   That would get propogated
>>>> >>>> > back to the JettyDestination that could then call the
jetty
>>>> >>>> > things.
>>>> >>>> >  The JMS transport could just catch it and more or
less ignore it.
>>>> >>>> >  We'd then have to add a "resume()" method to the chain
which
>>>> >>>> > would
>>>> >>>> > call back onto a listener that the transport provides.
  Jetty
>>>> >>>> > would
>>>> >>>> > just call the jetty resume stuff. JMS would probably
put a
>>>> >>>> > runnable
>>>> >>>> > on the workqueue to restart the chain.
>>>> >>>>
>>>> >>>> ok
>>>> >>>>
>>>> >>>> > Also, suspend() would need to check if there is a listener.
 If
>>>> >>>> > not,
>>>> >>>> > it should not throw the exception.   Thus, the servlet
transport
>>>> >>>> > and
>>>> >>>> > CORBA stuff that couldn't do this would pretty much
just ignore
>>>> >>>> > it.
>>>> >>>>
>>>> >>>> ok, not sure I understand about the listener but I think
I see what
>>>> >>>> you mean...
>>>> >>>>
>>>> >>>> > Basically, this needs to be done in such a way that
it CAN work
>>>> >>>> > for
>>>> >>>> > the non-jetty cases.   However, it also needs to be
done in a way
>>>> >>>> > that doesn't affect existing transports.
>>>> >>>>
>>>> >>>> +1
>>>> >>>>
>>>> >>>> Cheers, Sergey
>>>> >>>>
>>>> >>>> > Dan
>>>> >>>> >
>>>> >>>> >> 2. Now, if the above can be figured out, the next
problem arises:
>>>> >>>> >> when the "trigger" to wake up the continuation
occurs
>>>> >>>> >>
>>>> >>>> >> I think we can can do in JettyDestination omething
similar to
>>>> >>>> >> what
>>>> >>>> >> is done in SMX. When getting a SuspendedFault exception,
we can
>>>> >>>> >> extract from it the original continuation instance
or else we can
>>>> >>>> >> do ContinuationSupport.getContinuation(request)
which should
>>>> >>>> >> return
>>>> >>>> >> us the instance. At this point we can use it as
a ket to store
>>>> >>>> >> the
>>>> >>>> >> current exchange plus all the other info we may
need.
>>>> >>>> >>
>>>> >>>> >> When the user/application code does continuation.resume(),
the
>>>> >>>> >> Jetty thread will come back and we will use the
>>>> >>>> >> ContinuationSupport.getContinuation(request) to
get us the active
>>>> >>>> >> continuation and use it to extract the suspended
exchange and
>>>> >>>> >> proceed from there, say we'll call
>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>> >>>> >> etc, something along the lines you suggested
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> 3. Basically, to do this "right", we'd need to
audit pretty much
>>>> >>>> >> everything to make sure nothing is stored on the
stack and is
>>>> >>>> >> "resumable". Once that is done, the rest is relatively
easy.
>>>> >>>> >>
>>>> >>>> >> Yea - probably can be the quite challenging
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> Thoughts ?
>>>> >>>> >>
>>>> >>>> >> Cheers, Sergey
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >>
>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>> >>>> >> [3]
>>>> >>>> >>
>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>> >>>> >>42361 #ac tion_12642361
>>>> >>>> >
>>>> >>>> > --
>>>> >>>> > Daniel Kulp
>>>> >>>> > dkulp@apache.org
>>>> >>>> > http://dankulp.com/blog
>>>> >>>
>>>> >>> --
>>>> >>> Daniel Kulp
>>>> >>> dkulp@apache.org
>>>> >>> http://dankulp.com/blog
>>>
>>>
>>>
>>> --
>>> Daniel Kulp
>>> dkulp@apache.org
>>> http://dankulp.com/blog
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com 


Mime
View raw message