cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sergey Beryozkin" <sergey.beryoz...@progress.com>
Subject Re: Jetty Continuations in CXF
Date Wed, 12 Nov 2008 10:51:01 GMT
Hi


> You need to associate the continuation with the message before the
> suspend() method is called, so that whenever the message is ready, it
> will work.

Yes - agreed. But as I said we have two cases.

Case1.

Jetty -> CXF -> test code/some other code does direct jetty continuation.suspend()

In this case it's essentially a user code which does it and a user code has no notion of internal
CXF Message class. It just invokes 
on a jetty continuation. It's this code which will do suspend/resume and it's in this case
when there's a race condition between the 
moment a user (my test) code does continuation.suspend() (on one thread) and immediately after
that continuation.resume() on the 
other one.

See what I mean ? is it how we can expect the SMX CXF binding component interacting with Jetty
continuations (apart from it not 
doing resume() immediately I guess) ?

So before a suspended runtime exception reaches the nearest catch block in the CXF code where
we can get a chance to do something to 
preserve the state of the given invocation, resume() might've alreadty occurred.
Case 2.

Jetty -> CXF -> test code/some other code interacts with continuations in a transport-neutral
way through CXF provided wrappers. 
Now, in this case what happens is that we do preseve the message before doing suspend(), as
you suggested, so everything goes fine.

> Also, I think you will have to care about timeouts ...

Why ? It's a CXF user code which calls suspend(). In CXF Jetty Destination I attempt to get
a message from a 
ContinuationSupport.getContinuation(). If the returned continuation is not new and it has
no message associated with it then there's 
really nothing CXF can do but to procede with a new invocation, irrespectively of wheteher
this continuation was resumed or 
timed-out. It may throw an exception in this case but for now I prefer to log a warning as
the things seems to be working anyway - 
it will be up to a user code to do some more drastic actions.

It's likely I'm missing some subtle or even obvious details but for now things seem to be
quote clear to me.

> Another thing: it would be nice if you could create a branch and
> commit your ongoing work there so that we can have something more
> tangible to discuss on ... ;-)  We may has well just drop it later, it
> does not really matter.

sorry - I see it would really help to discuss things better

Cheers, Sergey

>
> On Wed, Nov 12, 2008 at 11:21 AM, Sergey Beryozkin
> <sergey.beryozkin@progress.com> wrote:
>> Hi,
>>
>> I have had a look. At the moment I don't see why we would have to do this
>> sort of sophisticated handling of continuations in CXF JettyDestination.
>> With CXF, it's the the code being invoked further down the line (be it SMX
>> CXF binding components or application code) which needs to worry about doing
>> either suspending or resuming continuations.
>>
>> As far as CXF is concerned, it only needs to be able to associate a given
>> inbound message with a continuation instance. I reckon saving it as a
>> continuation user object (preserving the previously set one if any) is a
>> lighter/simpler alternative than introducing maps in the JettyDestination.
>>
>> However, as I said few times earlier in this thread, there's a race
>> condition which I observe in certain conditions. Specifically, I have a test
>> where a continuation is resumed virtually immediately after it's been
>> suspended so before the code dealing with associating this suspended
>> continuation with the inbound message has a chance to do it, the
>> continuation.resume() has already occured. In CXF case I believe it can
>> happen irrespectively of how we write the code dealing with continuations
>> under the hood. It won't happen if continuation wrappers are used by the
>> application code.
>>
>> Do you have any comments about this race condition ? Or how a code you
>> linked to can help to avoid it ?
>>
>> Cheers, Sergey
>>
>>
>>
>>
>>> I would really encourage you to take a look at the smx code for
>>> handling continuations.
>>> We've had quite a hard time to handle race conditions, timeouts etc...
>>> because the continuation has a timeout and when the message is
>>> received back around the timeout, things can become a bit tricky.
>>>
>>>
>>> https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
>>>
>>> We use one concurrent hash map to associate a message id to a
>>> continuation and multiple synchronization blocks on the continuation
>>> itself.
>>> Also the above code can be used with standard servlet servers (i.e.
>>> when the continuation is a blocking continuation) which is imho a good
>>> thing.
>>>
>>> On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
>>> <sergey.beryozkin@progress.com> wrote:
>>>>
>>>> Hi
>>>>
>>>>>>
>>>>>> I have 10 threads involved, 5 control ones + 5 application ones,
I see
>>>>>> a
>>>>>> loss of message approximately once in 5 cases. The fact that
>>>>>> cont.resume()
>>>>>> is done virtually immediately after cont.suspend() can explain it.
>>>>>
>>>>> Without seeing your code, I cannot really offer valid suggestions, but
>>>>> I'll
>>>>> try....   :-)
>>>>
>>>> I guess having it all on a branch would be handy then :-)
>>>>
>>>>>
>>>>> One thought was in the Continuation object, record if "resume()" has
>>>>> been
>>>>> called and if it's been callled by the time the stack unwinds back into
>>>>> the
>>>>> Http transport, just re-dispatch immediately.   Either that or have the
>>>>> resume block until the http transport sets a "ready to resume" flag just
>>>>> before it allows the exception to flow back into jetty.
>>>>
>>>> I have 2 tests.
>>>>
>>>> In one test an application server code interacts with a wrapper, both
>>>> when
>>>> getting a continuation instance and when calling suspend/resume on it (as
>>>> suggested by yourself earlier in this thread). In this case, under the
>>>> hood,
>>>> an inbound message is associated with a continuation instance before
>>>> suspend() is called on it. Thus even if the resulting exception does not
>>>> reach Jetty Destination in time before continuation.resume() is called by
>>>> a
>>>> control thread, the message is not lost when the HTTP request is resumed
>>>> as
>>>> that HTTP request had this continuation instance associated with it at a
>>>> time ContinuationsSupport.getContinuations(request) was called.
>>>>
>>>> In other test which I believe represents an integration scenario with SMX
>>>> better, an application server code calls Jetty
>>>> ContinuationsSupport.getContinuations(request) followed by
>>>> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
>>>> runtime exception reaches a catch block in AbstractInvoker (where I try
>>>> to
>>>> associate a message with continuation), one or two control threads manage
>>>> to
>>>> squeeze in and call resume() before catch block has even been processed.
>>>> So
>>>> by the time the wrapped exception reaches JettyDestination a request with
>>>> a
>>>> resumed continuation has already come back...
>>>>
>>>> Does this explanation for a second case and the associated race condition
>>>> sounds reasonable ?
>>>>
>>>> Cheers, Sergey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> Dan
>>>>>
>>>>>>
>>>>>> Cheers, Sergey
>>>>>>
>>>>>> > That said, I'm now trying to inject a message as a custom
>>>>>> > continuation
>>>>>> > object (while preserving the original one if any, both ways)
as early
>>>>>> > as
>>>>>> > possible, in AbstractInvoker, so the time window at which the
race
>>>>>> > condition I talked about earlier can cause the loss of the original
>>>>>> > message, is extremely small the time it taked for the
>>>>>> > continuation.suspend() exception to reach a catch block in
>>>>>> > AbstractInvoker.
>>>>>> >
>>>>>> > Cheers, Sergey
>>>>>> >
>>>>>> >> Hi,
>>>>>> >>
>>>>>> >> I did some system testing with Jetty continuations and it's
going
>>>>>> >> not
>>>>>> >> too bad. Here's one issue which I've encountered which might
or
>>>>>> >> might
>>>>>> >> not be a problem in cases where continuations are ustilized
directly
>>>>>> >> (that is without our wrappers), as in case of say ServiceMix
CXF
>>>>>> >> binding
>>>>>> >> component.
>>>>>> >>
>>>>>> >> The problem is that when continuation.suspend(timeout) has
been
>>>>>> >> called,
>>>>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>>>>> >> (such
>>>>>> >> that the original message with its phase chain can be preserved
>>>>>> >> until
>>>>>> >> the request is resumed) if some other application thread
calls
>>>>>> >> continuation.resume() or continuation suspend timeout expires.
>>>>>> >>
>>>>>> >> In case of ServiceMix the latter is a theoretical possibility
at the
>>>>>> >> least. I can see in its code this timeout is configured,
but if this
>>>>>> >> timeout is in the region of up to 1 sec or so then it's
feasible
>>>>>> >> that
>>>>>> >> with a heavy  workload the race condition described above
might come
>>>>>> >> to
>>>>>> >> life.
>>>>>> >>
>>>>>> >> That said, as part of my test, I found that even when such
condition
>>>>>> >> occurs, the 'worst' thing which can happen is that a new
message and
>>>>>> >> a
>>>>>> >> new chain are created, that is, the request is not resumed
from a
>>>>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if
it was a new
>>>>>> >> request alltogether, but it all works nonetheless, as all
the stack
>>>>>> >> variables used in various interceptors in my given test
at least are
>>>>>> >> all
>>>>>> >> obtained from a message. The only downside is that that
the work
>>>>>> >> which
>>>>>> >> has already been done earlier as part of handling the suspended
>>>>>> >> request
>>>>>> >> is repeated again by the interceptors. It can cause issues
though in
>>>>>> >> cases when some interceptors have sideeffects as part of
handling a
>>>>>> >> given input request, say modify a db, etc
>>>>>> >>
>>>>>> >> Now, this race condition can be safely avoided if a wrapper
proposed
>>>>>> >> by
>>>>>> >> Dan is used by a server application code as the message
can be
>>>>>> >> preserved
>>>>>> >> immediately at a point a user calls suspend on our wrapper,
so
>>>>>> >> without
>>>>>> >> further doubts I've prototyped it too. It's not possible
for SMX
>>>>>> >> components though
>>>>>> >>
>>>>>> >> Comments ?
>>>>>> >>
>>>>>> >> Cheers, Sergey
>>>>>> >>
>>>>>> >>> I guess my thinking was to tie the continutations directly
to the
>>>>>> >>> PhaseInterceptorChain (since that is going to need to
know about
>>>>>> >>> them
>>>>>> >>> anyway).   However, I suppose it could easily be done
with a new
>>>>>> >>> interface. Probably the best thing to do is to stub
out a sample
>>>>>> >>> usecase.   So here goes.....
>>>>>> >>>
>>>>>> >>> Lets take a "GreetMe" web service that in the greetMe
method will
>>>>>> >>> call
>>>>>> >>> off asynchrously to some JMS service to actually get
the result.
>>>>>> >>>
>>>>>> >>> @Resource(name = "jmsClient")
>>>>>> >>> Greeter jmsGreeter
>>>>>> >>> @Resource
>>>>>> >>> WebServiceContext context;
>>>>>> >>> public String greetMe(String arg) {
>>>>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>>>>> >>>              context.get(ContinuationSupport.class.getName());
>>>>>> >>>     if (contSupport == null) {
>>>>>> >>>          //continuations not supported, must wait
>>>>>> >>>          return jmsGreeter.greetMe(arg);
>>>>>> >>>     }
>>>>>> >>>     Continuation cont = contSupport.getContinuation();
>>>>>> >>>     if (cont.isResumed()) {
>>>>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>>>>> >>>        return handler.get().getReturn();
>>>>>> >>>     } else {
>>>>>> >>>         AsyncHandler<GreetMeResponse> handler
= new Handler(cont);
>>>>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>>>>> >>>         cont.suspend(handler);
>>>>>> >>> return null;   //won't actually get here as suspend
will throw a
>>>>>> >>> ContinuationException
>>>>>> >>>     }
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>> The Handler would look something like:
>>>>>> >>> class Handler implements AsyncHandler<GreetMeResponse>
{
>>>>>> >>> GreetMeResponse resp;
>>>>>> >>>        Continuation cont;
>>>>>> >>> public Handler(Continuation cont) {
>>>>>> >>>            this.cont = cont;
>>>>>> >>>        }
>>>>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>>>>> >>> response) { resp = response.get();
>>>>>> >>>              cont.resume();
>>>>>> >>>       }
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>> Basically, the HTTP/Jetty transport could provide an
implementation
>>>>>> >>> of
>>>>>> >>> ContinuationSupport that wrappers the jetty stuff. 
  JMS could
>>>>>> >>> provide
>>>>>> >>> one that's pretty much a null op.   Transports that
cannot support
>>>>>> >>> it
>>>>>> >>> (like servlet) just wouldn't provide an implementation.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Does that make sense?   Other ideas?
>>>>>> >>>
>>>>>> >>> Dan
>>>>>> >>>
>>>>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin
wrote:
>>>>>> >>>> > No.   We don't want that.   Whatever we do
should work for other
>>>>>> >>>> > transports as well like JMS.  Thus, this shouldn't
be tied to
>>>>>> >>>> > jetty
>>>>>> >>>> > continuations directly.
>>>>>> >>>>
>>>>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>>>>> >>>> Ex.
>>>>>> >>>>
>>>>>> >>>> try {
>>>>>> >>>>   invoke(); // continuation.suspend() somehow by
the code being
>>>>>> >>>> invoked upon }
>>>>>> >>>> catch (RuntimeException ex) {
>>>>>> >>>>
>>>>>> >>>> if
>>>>>> >>>>
>>>>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>>>>> >>>> throw new SuspendedFault(ex);
>>>>>> >>>>     // or PhaseInterceptorChain.suspend()
>>>>>> >>>> }
>>>>>> >>>> }
>>>>>> >>>>
>>>>>> >>>> > Most likely, we could add a "suspend()" method
to
>>>>>> >>>> > PhaseInterceptorChain that would do something
very similar and
>>>>>> >>>> > throw
>>>>>> >>>> > a "SuspendException" or something in the same
package as
>>>>>> >>>> > PhaseInterceptorChain.
>>>>>> >>>>
>>>>>> >>>> When do we trigger this PhaseInterceptorChain.suspend()
call
>>>>>> >>>> though
>>>>>> >>>> ?
>>>>>> >>>>
>>>>>> >>>> >   That would get propogated
>>>>>> >>>> > back to the JettyDestination that could then
call the jetty
>>>>>> >>>> > things.
>>>>>> >>>> >  The JMS transport could just catch it and
more or less ignore
>>>>>> >>>> > it.
>>>>>> >>>> >  We'd then have to add a "resume()" method
to the chain which
>>>>>> >>>> > would
>>>>>> >>>> > call back onto a listener that the transport
provides.   Jetty
>>>>>> >>>> > would
>>>>>> >>>> > just call the jetty resume stuff. JMS would
probably put a
>>>>>> >>>> > runnable
>>>>>> >>>> > on the workqueue to restart the chain.
>>>>>> >>>>
>>>>>> >>>> ok
>>>>>> >>>>
>>>>>> >>>> > Also, suspend() would need to check if there
is a listener.  If
>>>>>> >>>> > not,
>>>>>> >>>> > it should not throw the exception.   Thus,
the servlet transport
>>>>>> >>>> > and
>>>>>> >>>> > CORBA stuff that couldn't do this would pretty
much just ignore
>>>>>> >>>> > it.
>>>>>> >>>>
>>>>>> >>>> ok, not sure I understand about the listener but
I think I see
>>>>>> >>>> what
>>>>>> >>>> you mean...
>>>>>> >>>>
>>>>>> >>>> > Basically, this needs to be done in such a
way that it CAN work
>>>>>> >>>> > for
>>>>>> >>>> > the non-jetty cases.   However, it also needs
to be done in a
>>>>>> >>>> > way
>>>>>> >>>> > that doesn't affect existing transports.
>>>>>> >>>>
>>>>>> >>>> +1
>>>>>> >>>>
>>>>>> >>>> Cheers, Sergey
>>>>>> >>>>
>>>>>> >>>> > Dan
>>>>>> >>>> >
>>>>>> >>>> >> 2. Now, if the above can be figured out,
the next problem
>>>>>> >>>> >> arises:
>>>>>> >>>> >> when the "trigger" to wake up the continuation
occurs
>>>>>> >>>> >>
>>>>>> >>>> >> I think we can can do in JettyDestination
omething similar to
>>>>>> >>>> >> what
>>>>>> >>>> >> is done in SMX. When getting a SuspendedFault
exception, we can
>>>>>> >>>> >> extract from it the original continuation
instance or else we
>>>>>> >>>> >> can
>>>>>> >>>> >> do ContinuationSupport.getContinuation(request)
which should
>>>>>> >>>> >> return
>>>>>> >>>> >> us the instance. At this point we can use
it as a ket to store
>>>>>> >>>> >> the
>>>>>> >>>> >> current exchange plus all the other info
we may need.
>>>>>> >>>> >>
>>>>>> >>>> >> When the user/application code does continuation.resume(),
the
>>>>>> >>>> >> Jetty thread will come back and we will
use the
>>>>>> >>>> >> ContinuationSupport.getContinuation(request)
to get us the
>>>>>> >>>> >> active
>>>>>> >>>> >> continuation and use it to extract the
suspended exchange and
>>>>>> >>>> >> proceed from there, say we'll call
>>>>>> >>>> >> PhaseInterceptorPhase.resume(),
>>>>>> >>>> >> etc, something along the lines you suggested
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> 3. Basically, to do this "right", we'd
need to audit pretty
>>>>>> >>>> >> much
>>>>>> >>>> >> everything to make sure nothing is stored
on the stack and is
>>>>>> >>>> >> "resumable". Once that is done, the rest
is relatively easy.
>>>>>> >>>> >>
>>>>>> >>>> >> Yea - probably can be the quite challenging
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> Thoughts ?
>>>>>> >>>> >>
>>>>>> >>>> >> Cheers, Sergey
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>>>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>>>>> >>>> >> [3]
>>>>>> >>>> >>
>>>>>> >>>> >>
>>>>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>>>>> >>>> >>42361 #ac tion_12642361
>>>>>> >>>> >
>>>>>> >>>> > --
>>>>>> >>>> > Daniel Kulp
>>>>>> >>>> > dkulp@apache.org
>>>>>> >>>> > http://dankulp.com/blog
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Daniel Kulp
>>>>>> >>> dkulp@apache.org
>>>>>> >>> http://dankulp.com/blog
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Daniel Kulp
>>>>> dkulp@apache.org
>>>>> http://dankulp.com/blog
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> Guillaume Nodet
>>> ------------------------
>>> Blog: http://gnodet.blogspot.com/
>>> ------------------------
>>> Open Source SOA
>>> http://fusesource.com
>>
>>
>
>
>
> -- 
> Cheers,
> Guillaume Nodet
> ------------------------
> Blog: http://gnodet.blogspot.com/
> ------------------------
> Open Source SOA
> http://fusesource.com 


Mime
View raw message