cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Kulp <dk...@apache.org>
Subject Re: NPE in system JMS test
Date Tue, 11 Nov 2008 18:20:53 GMT

Yea, I think it's caused by the move from Spring 2.5.4 to 2.5.6.    The Spring 
JMS stuff doesn't seem to tolerate an ungraceful shutdown which is what I 
think is happening.   I want to dig in a bit furthur and possibly log a bug 
with Spring, just haven't had the time.

Dan


On Tuesday 11 November 2008 1:15:43 pm Sergey Beryozkin wrote:
> Hi, seeing this NPE with the latest trunk :
>
>  T E S T S
> -------------------------------------------------------
> Running
> org.apache.cxf.systest.multitransport.MultiTransportClientServerTest
> Exception in thread "DefaultMessageListenerContainer-1"
> java.lang.NullPointerExc eption
>         at java.lang.String.indexOf(String.java:1564)
>         at java.lang.String.indexOf(String.java:1546)
>         at
> org.springframework.jms.support.JmsUtils.buildExceptionMessage(JmsUti
> ls.java:255)
>         at
> org.springframework.jms.listener.DefaultMessageListenerContainer.refr
> eshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:799) at
> org.springframework.jms.listener.DefaultMessageListenerContainer.reco
> verAfterListenerSetupFailure(DefaultMessageListenerContainer.java:767) at
> org.springframework.jms.listener.DefaultMessageListenerContainer$Asyn
> cMessageListenerInvoker.run(DefaultMessageListenerContainer.java:898) at
> java.lang.Thread.run(Thread.java:595)
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.425 sec
>
>
> looks like it's swallowed - can someone else see it ?
>
> Sergey
>
>
> ----- Original Message -----
> From: "Sergey Beryozkin" <sergey.beryozkin@progress.com>
> To: "Daniel Kulp" <dkulp@apache.org>; <dev@cxf.apache.org>
> Sent: Tuesday, November 11, 2008 5:51 PM
> Subject: Re: Jetty Continuations in CXF
>
> > Hi
> >
> >>> I have 10 threads involved, 5 control ones + 5 application ones, I see
> >>> a loss of message approximately once in 5 cases. The fact that
> >>> cont.resume() is done virtually immediately after cont.suspend() can
> >>> explain it.
> >>
> >> Without seeing your code, I cannot really offer valid suggestions, but
> >> I'll try....   :-)
> >
> > I guess having it all on a branch would be handy then :-)
> >
> >> One thought was in the Continuation object, record if "resume()" has
> >> been called and if it's been callled by the time the stack unwinds back
> >> into the Http transport, just re-dispatch immediately.   Either that or
> >> have the resume block until the http transport sets a "ready to resume"
> >> flag just before it allows the exception to flow back into jetty.
> >
> > I have 2 tests.
> >
> > In one test an application server code interacts with a wrapper, both
> > when getting a continuation instance and when calling suspend/resume on
> > it (as suggested by yourself earlier in this thread). In this case, under
> > the hood, an inbound message is associated with a continuation instance
> > before suspend() is called on it. Thus even if the resulting exception
> > does not reach Jetty Destination in time before continuation.resume() is
> > called by a control thread, the message is not lost when the HTTP request
> > is resumed as that HTTP request had this continuation instance associated
> > with it at a time ContinuationsSupport.getContinuations(request) was
> > called.
> >
> > In other test which I believe represents an integration scenario with SMX
> > better, an application server code calls Jetty
> > ContinuationsSupport.getContinuations(request) followed by
> > continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
> > runtime exception reaches a catch block in AbstractInvoker (where I try
> > to associate a message with continuation), one or two control threads
> > manage to squeeze in and call resume() before catch block has even been
> > processed. So by the time the wrapped exception reaches JettyDestination
> > a request with a resumed continuation has already come back...
> >
> > Does this explanation for a second case and the associated race condition
> > sounds reasonable ?
> >
> > Cheers, Sergey
> >
> >> Dan
> >>
> >>> Cheers, Sergey
> >>>
> >>> > That said, I'm now trying to inject a message as a custom
> >>> > continuation object (while preserving the original one if any, both
> >>> > ways) as early as possible, in AbstractInvoker, so the time window
at
> >>> > which the race condition I talked about earlier can cause the loss
of
> >>> > the original message, is extremely small the time it taked for the
> >>> > continuation.suspend() exception to reach a catch block in
> >>> > AbstractInvoker.
> >>> >
> >>> > Cheers, Sergey
> >>> >
> >>> >> Hi,
> >>> >>
> >>> >> I did some system testing with Jetty continuations and it's going
> >>> >> not too bad. Here's one issue which I've encountered which might
or
> >>> >> might not be a problem in cases where continuations are ustilized
> >>> >> directly (that is without our wrappers), as in case of say
> >>> >> ServiceMix CXF binding component.
> >>> >>
> >>> >> The problem is that when continuation.suspend(timeout) has been
> >>> >> called, a resulting RuntimeException might not reach CXF
> >>> >> JettyDestination (such that the original message with its phase
> >>> >> chain can be preserved until the request is resumed) if some other
> >>> >> application thread calls continuation.resume() or continuation
> >>> >> suspend timeout expires.
> >>> >>
> >>> >> In case of ServiceMix the latter is a theoretical possibility at
the
> >>> >> least. I can see in its code this timeout is configured, but if
this
> >>> >> timeout is in the region of up to 1 sec or so then it's feasible
> >>> >> that with a heavy  workload the race condition described above
might
> >>> >> come to life.
> >>> >>
> >>> >> That said, as part of my test, I found that even when such condition
> >>> >> occurs, the 'worst' thing which can happen is that a new message
and
> >>> >> a new chain are created, that is, the request is not resumed from
a
> >>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was
a new
> >>> >> request alltogether, but it all works nonetheless, as all the stack
> >>> >> variables used in various interceptors in my given test at least
are
> >>> >> all obtained from a message. The only downside is that that the
work
> >>> >> which has already been done earlier as part of handling the
> >>> >> suspended request is repeated again by the interceptors. It can
> >>> >> cause issues though in cases when some interceptors have sideeffects
> >>> >> as part of handling a given input request, say modify a db, etc
> >>> >>
> >>> >> Now, this race condition can be safely avoided if a wrapper proposed
> >>> >> by Dan is used by a server application code as the message can
be
> >>> >> preserved immediately at a point a user calls suspend on our
> >>> >> wrapper, so without further doubts I've prototyped it too. It's
not
> >>> >> possible for SMX components though
> >>> >>
> >>> >> Comments ?
> >>> >>
> >>> >> Cheers, Sergey
> >>> >>
> >>> >>> I guess my thinking was to tie the continutations directly
to the
> >>> >>> PhaseInterceptorChain (since that is going to need to know
about
> >>> >>> them anyway).   However, I suppose it could easily be done
with a
> >>> >>> new interface. Probably the best thing to do is to stub out
a
> >>> >>> sample usecase.   So here goes.....
> >>> >>>
> >>> >>> Lets take a "GreetMe" web service that in the greetMe method
will
> >>> >>> call off asynchrously to some JMS service to actually get the
> >>> >>> result.
> >>> >>>
> >>> >>> @Resource(name = "jmsClient")
> >>> >>> Greeter jmsGreeter
> >>> >>> @Resource
> >>> >>> WebServiceContext context;
> >>> >>> public String greetMe(String arg) {
> >>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
> >>> >>>              context.get(ContinuationSupport.class.getName());
> >>> >>>     if (contSupport == null) {
> >>> >>>          //continuations not supported, must wait
> >>> >>>          return jmsGreeter.greetMe(arg);
> >>> >>>     }
> >>> >>>     Continuation cont = contSupport.getContinuation();
> >>> >>>     if (cont.isResumed()) {
> >>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
> >>> >>>        return handler.get().getReturn();
> >>> >>>     } else {
> >>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
> >>> >>>         jmsGreeter.greetMeAsync(arg, handler);
> >>> >>>         cont.suspend(handler);
> >>> >>> return null;   //won't actually get here as suspend will throw
a
> >>> >>> ContinuationException
> >>> >>>     }
> >>> >>> }
> >>> >>>
> >>> >>> The Handler would look something like:
> >>> >>> class Handler implements AsyncHandler<GreetMeResponse>
{
> >>> >>> GreetMeResponse resp;
> >>> >>>        Continuation cont;
> >>> >>> public Handler(Continuation cont) {
> >>> >>>            this.cont = cont;
> >>> >>>        }
> >>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
> >>> >>> response) { resp = response.get();
> >>> >>>              cont.resume();
> >>> >>>       }
> >>> >>> }
> >>> >>>
> >>> >>> Basically, the HTTP/Jetty transport could provide an implementation
> >>> >>> of ContinuationSupport that wrappers the jetty stuff.    JMS
could
> >>> >>> provide one that's pretty much a null op.   Transports that
cannot
> >>> >>> support it (like servlet) just wouldn't provide an implementation.
> >>> >>>
> >>> >>>
> >>> >>> Does that make sense?   Other ideas?
> >>> >>>
> >>> >>> Dan
> >>> >>>
> >>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
> >>> >>>> > No.   We don't want that.   Whatever we do should
work for other
> >>> >>>> > transports as well like JMS.  Thus, this shouldn't
be tied to
> >>> >>>> > jetty continuations directly.
> >>> >>>>
> >>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
> >>> >>>> Ex.
> >>> >>>>
> >>> >>>> try {
> >>> >>>>   invoke(); // continuation.suspend() somehow by the code
being
> >>> >>>> invoked upon }
> >>> >>>> catch (RuntimeException ex) {
> >>> >>>>
> >>> >>>> if
> >>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"
> >>> >>>>)) throw new SuspendedFault(ex);
> >>> >>>>     // or PhaseInterceptorChain.suspend()
> >>> >>>> }
> >>> >>>> }
> >>> >>>>
> >>> >>>> > Most likely, we could add a "suspend()" method to
> >>> >>>> > PhaseInterceptorChain that would do something very
similar and
> >>> >>>> > throw a "SuspendException" or something in the same
package as
> >>> >>>> > PhaseInterceptorChain.
> >>> >>>>
> >>> >>>> When do we trigger this PhaseInterceptorChain.suspend()
call
> >>> >>>> though ?
> >>> >>>>
> >>> >>>> >   That would get propogated
> >>> >>>> > back to the JettyDestination that could then call
the jetty
> >>> >>>> > things. The JMS transport could just catch it and
more or less
> >>> >>>> > ignore it. We'd then have to add a "resume()" method
to the
> >>> >>>> > chain which would call back onto a listener that the
transport
> >>> >>>> > provides.   Jetty would just call the jetty resume
stuff. JMS
> >>> >>>> > would probably put a runnable on the workqueue to
restart the
> >>> >>>> > chain.
> >>> >>>>
> >>> >>>> ok
> >>> >>>>
> >>> >>>> > Also, suspend() would need to check if there is a
listener.  If
> >>> >>>> > not, it should not throw the exception.   Thus, the
servlet
> >>> >>>> > transport and CORBA stuff that couldn't do this would
pretty
> >>> >>>> > much just ignore it.
> >>> >>>>
> >>> >>>> ok, not sure I understand about the listener but I think
I see
> >>> >>>> what you mean...
> >>> >>>>
> >>> >>>> > Basically, this needs to be done in such a way that
it CAN work
> >>> >>>> > for the non-jetty cases.   However, it also needs
to be done in
> >>> >>>> > a way that doesn't affect existing transports.
> >>> >>>>
> >>> >>>> +1
> >>> >>>>
> >>> >>>> Cheers, Sergey
> >>> >>>>
> >>> >>>> > Dan
> >>> >>>> >
> >>> >>>> >> 2. Now, if the above can be figured out, the next
problem
> >>> >>>> >> arises: when the "trigger" to wake up the continuation
occurs
> >>> >>>> >>
> >>> >>>> >> I think we can can do in JettyDestination omething
similar to
> >>> >>>> >> what is done in SMX. When getting a SuspendedFault
exception,
> >>> >>>> >> we can extract from it the original continuation
instance or
> >>> >>>> >> else we can do ContinuationSupport.getContinuation(request)
> >>> >>>> >> which should return us the instance. At this point
we can use
> >>> >>>> >> it as a ket to store the current exchange plus
all the other
> >>> >>>> >> info we may need.
> >>> >>>> >>
> >>> >>>> >> When the user/application code does continuation.resume(),
the
> >>> >>>> >> Jetty thread will come back and we will use the
> >>> >>>> >> ContinuationSupport.getContinuation(request) to
get us the
> >>> >>>> >> active continuation and use it to extract the
suspended
> >>> >>>> >> exchange and proceed from there, say we'll call
> >>> >>>> >> PhaseInterceptorPhase.resume(), etc, something
along the lines
> >>> >>>> >> you suggested
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> 3. Basically, to do this "right", we'd need to
audit pretty
> >>> >>>> >> much everything to make sure nothing is stored
on the stack and
> >>> >>>> >> is "resumable". Once that is done, the rest is
relatively easy.
> >>> >>>> >>
> >>> >>>> >> Yea - probably can be the quite challenging
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> Thoughts ?
> >>> >>>> >>
> >>> >>>> >> Cheers, Sergey
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
> >>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
> >>> >>>> >> [3]
> >>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId
> >>> >>>> >>=126 42361 #ac tion_12642361
> >>> >>>> >
> >>> >>>> > --
> >>> >>>> > Daniel Kulp
> >>> >>>> > dkulp@apache.org
> >>> >>>> > http://dankulp.com/blog
> >>> >>>
> >>> >>> --
> >>> >>> Daniel Kulp
> >>> >>> dkulp@apache.org
> >>> >>> http://dankulp.com/blog
> >>
> >> --
> >> Daniel Kulp
> >> dkulp@apache.org
> >> http://dankulp.com/blog



-- 
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Mime
View raw message