cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Seumas Soltysik <SSOLT...@progress.com>
Subject RE: Support for using JMS MessageID as CorrelationID
Date Sat, 17 Apr 2010 02:04:45 GMT
There are a couple of elements in the JMSConduit code on the trunk that are troublesome.

First of all there is the use of the outstandingAsync data member. Effectively it keeps track
of outstanding async calls. If the number of outstanding async calls is 0 then it proceeds
to shutdown the asyn JMSListener. So if a client were to make a series of async call that
did not overlap, each call would result in the creation and destruction of the JMSListener
which seems a bit excessive. I am not sure why this counter was introduced but I think it
may have to do with trying to avoid conflicts with the jmsTemplate.receiveSelected() call
for synchronous invocations. I think the idea was to try to avoid having two listeners open
that might interfere with each other if an async and sync call were made consecutively. Just
a guess.

The other issue relates to when the MessageID is used as the CorrelationID. The following
code determines whether the MessageID will be used or not:
            } else if (!jmsConfig.isSetConduitSelectorPrefix()
                       && (exchange.isSynchronous() || exchange.isOneWay())
                       && (!jmsConfig.isSetUseConduitIdSelector() 
                           || !jmsConfig.isUseConduitIdSelector())) {
                messageIdPattern = true;
            } 

It turns out that with absolutely no client config set, messageIdPattern resolves to true.
This means that the default case is to use the MessageID as the CorrelationID as opposed to
a generated CorrelationID. I am not sure that this is a bad thing but I am just checking whether
it is intentional or not.

This may have been on purpose because as a result of this logic, most of the time jmsTemplate.receiveSelected()
is using the messageID in the selection string. This ensures that if a JMSListener is active
because of an ongoing async call, the JMSListener which is looking for a JMS messages with
a specific prefix, will not grab the message away from the JMSTemplate receiving thread. However,
in the event that the JMSTemplate is using a message selector based upon the conduit ID and
there is a JMSListener active, that the listener could grab the returning message first.

At this point it all "works" but it does seem a bit convoluted. To ensure that there is no
conflict between the two listeners, I think that the MessageID should always be used as the
CorrelationID except when the user specifies a correlation ID and with async calls. This assumes
that MessageIDs are truly unique.


________________________________________
From: Seumas Soltysik [SSOLTYSI@progress.com]
Sent: Friday, April 16, 2010 2:31 PM
To: Daniel Kulp; dev@cxf.apache.org
Subject: RE: Support for using JMS MessageID as CorrelationID

Dan,
Understood. However, in the case when the MessageID is used as the CorrelationID you must
have a listener per invocation to support the asynch case as you must select specifically
for the exact MessageID of the invocation.
So it seems that we do one of the following 3 things:

1)Not support MessageID as CorrelationID for the asynch case.

2)Have a special case where a listener thread is created for each request when the request
is asynch and MessageID is being used as CorrelationID.

3)Have a special listener that does not cache the consumer of the JMSListener and rotates
through the correlationMap updating the message selector of the JMSListener, essentially periodically
polling for asynch messages based upon what is in the correlation map.

Seumas
________________________________________
From: Daniel Kulp [dkulp@apache.org]
Sent: Friday, April 16, 2010 1:03 PM
To: dev@cxf.apache.org
Cc: Seumas Soltysik
Subject: Re: Support for using JMS MessageID as CorrelationID

Seumas,

The point is that in an async case, we do NOT want threads being consumed for
a receive if at all possible.   At Progress, we had a particular customer that
would have several THOUSAND outstanding async calls at any given time.   We
could not have threads and such stuck on those.   That is why we don't use the
recieve method for the async calls.

Basically, if at all possible, for async calls, we want to make sure there are
not threads sitting around waiting (other than a couple threads that the JMS
listener itself may have).

Dan


On Friday 16 April 2010 10:30:04 am Seumas Soltysik wrote:
> Actually, if you set the JMSListener to the appropriate cache level, it
> will not cache the consumer which means a consumer is created for every
> receive(). This allows for dynamically modifying the message selector
> which is used when a new consumer is created. There is a bit of overhead
> to do it this way, but it is probably better than creating an entirely new
> JMSListener for each async request. Perhaps this is a debateable issue.
>
> You raise an interesting point regarding the use of a workqueue to handle
> the recieve() call for asynch situations. For each message sent  in
> sendExchange(), a JMSTemplate is created to send message. Why isn't this
> same template object used for the recieve()? Currently on the trunk, the
> JMSTemplate is being used to receive synchronous responses. It could be
> just as easily used for asynch responses by putting template on workqueue
> and calling receive(). Doing it this way was would seem to do away with
> the necessity of having a JMSListener for the JMSConduit. This seems like
> it would make the JMSConduit code much simpler.
> ________________________________________
> From: Willem Jiang [willem.jiang@gmail.com]
> Sent: Friday, April 16, 2010 9:48 AM
> To: dev@cxf.apache.org
> Subject: Re: Support for using JMS MessageID as CorrelationID
>
> If the JMSListener is the Spring DefaultMessageListenerContainer, I
> doubt you can change the listener's message selector at the runtime.
>
> So I suggest to create a receive task with jms message selector to
> receive the response, and use a thread pool to run these receive task.
>
> Willem
>
> Seumas Soltysik wrote:
> > Hi Willem,
> > One more thing. With respect to using a workqueue I don't think it really
> > solves the issue. The JMSListener essentially already uses its own
> > workqueue to listen asynchronously for replies. The real issue is to
> > minimize the number of JMSListeners required to handle the asynch
> > scenario. As opposed to having a listener per thread, it would be better
> > to have a pool of listeners which could be allocated as individual
> > threads make asych calls. A workqueue is not going to help with this
> > issue. Regards,
> > Seumas
> > ________________________________________
> > From: Willem Jiang [willem.jiang@gmail.com]
> > Sent: Thursday, April 15, 2010 10:24 PM
> > To: dev@cxf.apache.org
> > Subject: Re: Support for using JMS MessageID as CorrelationID
> >
> > Hi Seumas,
> >
> > Please see my comments in the mail.
> >
> > Seumas Soltysik wrote:
> >> I am trying to get support for using the JMS MessageID as the JMS
> >> CorrelationID as specified in
> >> https://issues.apache.org/jira/browse/CXF-2760 . After putting some
> >> work/thought into this issue, I became aware that this feature is
> >> available on the trunk but was not back-merged to the 2.1.x and 2.2.x
> >> branches. I am in the process of trying take what is done on trunk
> >> implement something similar on 2.1 and 2.2. However I have a  couple of
> >> issues with the implementation on trunk that I want to sort out before
> >> back-porting.
> >>
> >> 1)There is no attribute in the clientConfig schema to specify that the
user wants to use the MessageID in lieu of the CorrelationID. Currently the
logic for deciding whether to use the MessageID instead of a generated
CorrelationID looks like this:
> >>             } else if (!jmsConfig.isSetConduitSelectorPrefix()
> >>
> >>                        && (exchange.isSynchronous() ||
> >>                        exchange.isOneWay()) &&
> >>                        (!jmsConfig.isSetUseConduitIdSelector()
> >>
> >>                            || !jmsConfig.isUseConduitIdSelector())) {
> >>
> >>                 messageIdPattern = true;
> >>
> >> This is quite a bit of mumbo-jumbo which could be sorted out by
> >> specifying a config attribute.
> >
> > Yes, a simple config attribute could help us.
> >
> >> 2)There is a bit of code which seem left over from a previous
implementation that has no value:
> >>             if (exchange.isSynchronous()) {
> >>
> >>                 synchronized (exchange) {
> >>
> >>                     exchange.put(CORRELATED, Boolean.TRUE);
> >>                     exchange.notifyAll();
> >>
> >>                 }
> >>
> >>             }
> >>
> >> I don't see the current purpose of this as I don't see any code which
> >> has another thread waiting on the exchange mutex.
> >
> > It's useless,
> >
> >> 3)The biggest issue with the current implementation on the trunk is the
fact that using the MessageID as CorrelationID is not supported for
asynchronous calls. I don't know if this was purposeful or not but the
MessageID as CorrelationID paradigm is only implemented for synchronous calls.
Here is the source of the problem:
> >>         if (!exchange.isOneWay()) {
> >>
> >>             synchronized (exchange) {
> >>
> >>                 jmsTemplate.send(jmsConfig.getTargetDestination(),
> >>                 messageCreator); if (messageIdPattern) {
> >>
> >>                     correlationId = messageCreator.getMessageID();
> >>
> >>                 }
> >>                 headers.setJMSMessageID(messageCreator.getMessageID());
> >>
> >>                 final String messageSelector = "JMSCorrelationID = '" +
> >>                 correlationId + "'"; if (exchange.isSynchronous()) {
> >>
> >>                     javax.jms.Message replyMessage =
> >>                     jmsTemplate.receiveSelected(replyToDestination,
> >>
> >>
messageSelector);
> >>
> >>                     if (replyMessage == null) {
> >>
> >>                         throw new RuntimeException("Timeout receiving
> >>                         message with correlationId "
> >>
> >>                                                    + correlationId);
> >>
> >>                     } else {
> >>
> >>                         doReplyMessage(exchange, replyMessage);
> >>
> >>                     }
> >>
> >>                 }
> >>
> >>             }
> >>
> >> In this situation the MessageID is never put into the correlationMap for
> >> future correlation in onMessage(). Furthermore if the call is async,
> >> there is no JMSListener set up to receive the reply using a selector
> >> which selects for the CorrrelationID equal to the MessageID. So the
> >> JMSConduit will never receive the async callback. In order to support
> >> the async scenario, the JMSListener needs to dynamically set the
> >> MessageSelector after the message is sent and the MessageID is
> >> available. Furthermore, in a multi-threaded environment, there has to
> >> be one of these listeners per thread so that threads don't modify the
> >> same message selector when making concurrent calls.
> >
> > I recalled we make the JMSConduit simple and also want to support the
> > messageIdPattern last summer, we changed the code like this and we don't
> > support the async call for the messageIdPattern.
> >
> > If you take a look at the first huge if condition checking again, you
> > can see that.
> > I don't like the way to implement the listener per thread to the async
> > call with the messageIdPattern by using the thread local, it looks a
> > litter mass. How about a using a work queue to take the response
> > receiving job ?
> >
> >> Feedback on these issues is appreciated so that I can move ahead with
> >> modifying trunk/2.2.x/2.1.x.
> >>
> >> Regards,
> >> Seumas
> >
> > Willem

--
Daniel Kulp
dkulp@apache.org
http://dankulp.com/blog

Mime
View raw message