ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r551260 - in /incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi: MessageExchangeContextImpl.java OdeConsumer.java OdeConsumerAsync.java OdeConsumerSync.java OdeLifeCycle.java OdeService.java
Date Wed, 27 Jun 2007 18:35:18 GMT
Author: mszefler
Date: Wed Jun 27 11:35:17 2007
New Revision: 551260

URL: http://svn.apache.org/viewvc?view=rev&rev=551260
Log:
refactored JBI to support new(est) BART contracts

Removed:
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java
Modified:
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java

Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java?view=diff&rev=551260&r1=551259&r2=551260
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
(original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
Wed Jun 27 11:35:17 2007
@@ -19,44 +19,82 @@
 
 package org.apache.ode.jbi;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 
+
 /**
- * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext}
- * interface. This class is used by the ODE engine to make invocation on JBI
- * services provided by other engines (i.e. the BPEL engine is acting as
- * client/consumer of services). 
+ * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext} interface.
This class is used by the ODE engine
+ * to make invocation on JBI services provided by other engines (i.e. the BPEL engine is
acting as client/consumer of services).
  */
 public class MessageExchangeContextImpl implements MessageExchangeContext {
 
-  private static final Log __log = LogFactory
-      .getLog(MessageExchangeContextImpl.class);
+    private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
+
+    private OdeContext _ode;
+
+    /** Supported invocation styles. For now this is fixed. */
+    private static final Set<InvocationStyle> __supported;
+    static {
+        HashSet<InvocationStyle> supported = new HashSet<InvocationStyle>();
+        supported.add(InvocationStyle.BLOCKING);
+        supported.add(InvocationStyle.ASYNC);
+        __supported = Collections.unmodifiableSet(supported);
+    }
+
+    public MessageExchangeContextImpl(OdeContext ode) {
+        _ode = ode;
+    }
+
+    public void onAsyncReply(MyRoleMessageExchange myrolemex) throws BpelEngineException
{
+        __log.error("Unexpected onAsyncReply notification: " + myrolemex);
+        // Due to JBI limitiations (i.e. we cannot recover a JBI message-exchange object)
, we don't support ASYNC invocations
+    }
+
+    public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException
{
+        _ode._consumer.invokePartner(mex);
+    }
+
+    public void invokePartnerBlocking(PartnerRoleMessageExchange mex) throws ContextException
{
+        _ode._consumer.invokePartner(mex);
+    }
 
-  private OdeContext _ode;
+    public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException
{
+        throw new ContextException("Unsupported.");
+
+    }
+
+    public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException
{
+        throw new ContextException("Unsupported.");
+
+    }
+
+    public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException
{
+        __log.error("Unexpected onRepliabeReply notification: " + myRoleMex);
+
+    }
+
+    public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+        // What can we do in JBI to cancel? --- not much. 
+        
+    }
+
+    public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc,
EndpointReference partnerEpr) {
+        return __supported ;
+    }
+    
 
-  public MessageExchangeContextImpl(OdeContext ode) {
-    _ode = ode;
-  }
-
-  public void onAsyncReply(MyRoleMessageExchange myrolemex)
-      throws BpelEngineException {
-    OdeService ode = _ode.getService(myrolemex.getServiceName());
-    if (ode !=  null)
-      ode.onResponse(myrolemex);
-    else {
-      __log.error("No active service for message exchange: "  + myrolemex);
-    }
-  }
-
-  public void invokePartner(PartnerRoleMessageExchange mex) throws ContextException {
-    _ode._consumer.invokePartner(mex);
-  }
+    
 }

Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=551260&r1=551259&r2=551260
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Wed
Jun 27 11:35:17 2007
@@ -18,40 +18,37 @@
  */
 package org.apache.ode.jbi;
 
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
-import org.apache.ode.jbi.msgmap.Mapper;
-import org.apache.ode.jbi.msgmap.MessageTranslationException;
-
 import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jbi.messaging.*;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.jbi.msgmap.Mapper;
+import org.apache.ode.jbi.msgmap.MessageTranslationException;
 
 /**
  * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles
all communications initiated by ODE
- * that is destined for other JBI providers. 
+ * that is destined for other JBI providers.
  */
-abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
+class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
     private static final Log __log = LogFactory.getLog(OdeConsumer.class);
-    private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout",
2 * 60 * 1000L);
 
     protected OdeContext _ode;
 
-    protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
-
-
-    protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String,
PartnerRoleMessageExchange>();
-
     OdeConsumer(OdeContext ode) {
         _ode = ode;
     }
@@ -100,35 +97,15 @@
                 NormalizedMessage nmsg = inonly.createMessage();
                 mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(),
null);
                 inonly.setInMessage(nmsg);
-                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                    public void afterCompletion(boolean success) {
-                        if (success) {
-                            doSendOneWay(odeMex, inonly);
-                        }
-                    }
-                    public void beforeCompletion() {
-                    }
-
-                });
+                doSendJBI(odeMex, inonly);
                 odeMex.replyOneWayOk();
             } else {
                 final InOut inout = (InOut) jbiMex;
                 NormalizedMessage nmsg = inout.createMessage();
                 mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(),
null);
                 inout.setInMessage(nmsg);
-                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                    public void afterCompletion(boolean success) {
-                        if (success) {
-                            doSendTwoWay(odeMex, inout);
-                        }
-                    }
-
-                    public void beforeCompletion() {
-                    }
-
-                });
-
-                odeMex.replyAsync();
+                doSendJBI(odeMex, inout);
+                odeMex.replyAsync(inout.getExchangeId());
             }
         } catch (MessagingException me) {
             String errmsg = "JBI messaging error for ODE MEX " + odeMex;
@@ -142,14 +119,9 @@
 
     }
 
-    protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);
-
-    protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);
-
-
     public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
-        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
-            !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)
+                && !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
             __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported
pattern " + jbiMex.getPattern());
             return;
         }
@@ -162,60 +134,42 @@
         } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
             outFailure((InOut) jbiMex);
         } else if (jbiMex.getStatus() == ExchangeStatus.DONE) {
-            _outstandingExchanges.remove(jbiMex.getExchangeId());
+            ; // anything todo here? 
         } else {
             __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange:
" + jbiMex.getExchangeId());
         }
     }
 
     private void outFailure(final InOut jbiMex) {
-        final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId());
+        PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
         if (pmex == null) {
-            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+            __log.warn("Received a response for unknown partner role message exchange " +
pmex.getMessageExchangeId());
             return;
         }
-
-        try {
-            _ode._scheduler.execTransaction(new Callable<Boolean>() {
-                public Boolean call() throws Exception {
-                    pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(),
null);
-                    return null;
-                }
-            });
-        } catch (Exception ex) {
-            __log.error("error delivering failure: ", ex);
-        }
-
+        
+        pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
     }
 
     private void outResponse(final InOut jbiMex) {
-        final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId());
-        if (outstanding == null) {
-            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+
+        PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
+        if (pmex == null) {
+            __log.warn("Received a response for unknown partner role message exchange " +
pmex.getMessageExchangeId());
             return;
         }
-
-        try {
-            _ode._scheduler.execTransaction(new Callable<Boolean>() {
-                @SuppressWarnings("unchecked")
-                public Boolean call() throws Exception {
-                    // need to reload mex since we're in a different transaction
-                    PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId());
-                    if (pmex == null) {
-                        __log.warn("Received a response for unknown partner role message
exchange " + pmex.getMessageExchangeId());
-                        return Boolean.FALSE;
-                    }
-                    String mapperName = pmex.getProperty(Mapper.class.getName());
-                    Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
-                    if (mapper == null) {
-                        String errmsg = "Mapper not found.";
-                        __log.error(errmsg);
-                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
-                    } else {
-                        try {
-                            Fault jbiFlt = jbiMex.getFault();
-                            if (jbiFlt != null) {
-                                javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>)
pmex.getOperation().getFaults().values());
+     
+        String mapperName = pmex.getProperty(Mapper.class.getName());
+        Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
+        if (mapper == null) {
+            String errmsg = "Mapper not found.";
+            __log.error(errmsg);
+            pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
+        } else {
+            try {
+                Fault jbiFlt = jbiMex.getFault();
+                if (jbiFlt != null) {
+                    javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>)
pmex
+                            .getOperation().getFaults().values());
                                 if (wsdlFlt == null) {
                                     pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized
fault message.", null);
                                 } else {
@@ -231,30 +185,34 @@
                                                 + wsdlFlt.getName(), null);
                                     }
                                 }
-                            } else {
-                                Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                                mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
-                                pmex.reply(response);
-                            }
-                        } catch (MessageTranslationException mte) {
-                            __log.error("Error translating message.", mte);
-                            pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(),
null);
-                        }
-                    }
-                    return null;
+                } else {
+                    Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                    mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
+                    pmex.reply(response);
                 }
-            });
-        } catch (Exception ex) {
-            __log.error("error delivering RESPONSE: ", ex);
-
+            } catch (MessageTranslationException mte) {
+                __log.error("Error translating message.", mte);
+                pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
+            }
         }
     }
 
-    public void setResponseTimeout(long timeout) {
-    	_responseTimeout = timeout;
+    protected void doSendJBI(final PartnerRoleMessageExchange odeMex, final MessageExchange
jbiMex) {
+        try {
+            switch (odeMex.getInvocationStyle()) {
+            case ASYNC:
+                _ode.getChannel().send(jbiMex);
+                break;
+            case BLOCKING:
+                _ode.getChannel().sendSync(jbiMex, odeMex.getTimeout());
+                break;
+            default:
+                throw new ContextException("Unsupported Invocation Style: " + odeMex.getInvocationStyle());
+            }
+        } catch (MessagingException e) {
+            String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex;
+            __log.error(errmsg, e);
+        }
     }
 
-    public long getResponseTimeout() {
-    	return _responseTimeout;
-    }
 }

Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java?view=diff&rev=551260&r1=551259&r2=551260
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java (original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java Wed
Jun 27 11:35:17 2007
@@ -84,12 +84,8 @@
             _ode = OdeContext.getInstance();
             _ode.setContext(context);
             
-            // Use system property to determine if DeliveryChannel.sendSync or DeliveryChannel.send
is used.
-            if (Boolean.getBoolean("org.apache.ode.jbi.sendSynch"))
-                _ode._consumer = new OdeConsumerSync(_ode);
-            else 
-                _ode._consumer = new OdeConsumerAsync(_ode);
-
+            _ode._consumer = new OdeConsumer(_ode);
+            
             if (_ode.getContext().getWorkspaceRoot() != null)
                 TempFileManager.setWorkingDirectory(new File(_ode.getContext().getWorkspaceRoot()));
 

Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?view=diff&rev=551260&r1=551259&r2=551260
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java (original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java Wed Jun
27 11:35:17 2007
@@ -18,16 +18,6 @@
  */
 package org.apache.ode.jbi;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.jbi.msgmap.Mapper;
-import org.apache.ode.jbi.msgmap.MessageTranslationException;
-import org.w3c.dom.Element;
-
 import javax.jbi.JBIException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
@@ -37,8 +27,17 @@
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
-import java.util.HashMap;
-import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.jbi.msgmap.Mapper;
+import org.apache.ode.jbi.msgmap.MessageTranslationException;
+import org.w3c.dom.Element;
 
 /**
  * Bridge JBI (consumer) to ODE (provider).
@@ -47,9 +46,6 @@
 
     private static final Log __log = LogFactory.getLog(OdeService.class);
 
-    /** utility for tracking outstanding JBI message exchanges. */
-    private final JbiMexTracker _jbiMexTracker = new JbiMexTracker();
-
     /** JBI-Generated Endpoint */
     private ServiceEndpoint _internal;
 
@@ -111,14 +107,12 @@
 
         if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) {
             // We can forget about the exchange.
-            __log.debug("Consuming MEX tracker " + jbiMex.getExchangeId());
-            _jbiMexTracker.consume(jbiMex.getExchangeId());
             return;
         }
 
         if (jbiMex.getOperation() == null) {
-            throw new IllegalArgumentException("Null operation in JBI message exchange id="
+ jbiMex.getExchangeId()
-                                                + " endpoint=" + _endpoint);
+            throw new IllegalArgumentException("Null operation in JBI message exchange id="
+ jbiMex.getExchangeId() + " endpoint="
+                    + _endpoint);
         }
 
         if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY))
{
@@ -154,18 +148,17 @@
                 __log.error("Unexpected error invoking ODE.", t);
                 err = new RuntimeException(t);
             } finally {
-                // If we got an error that wasn't sent.  
+                // If we got an error that wasn't sent.
                 if (jbiMex.getStatus() == ExchangeStatus.ACTIVE && !success) {
-                    if (err != null && jbiMex.getError() != null)  {
+                    if (err != null && jbiMex.getError() != null) {
                         jbiMex.setError(err);
                     }
-                    jbiMex.setStatus(ExchangeStatus.ERROR);     
-                    _ode.getChannel().send(jbiMex);         
-                }       
+                    jbiMex.setStatus(ExchangeStatus.ERROR);
+                    _ode.getChannel().send(jbiMex);
+                }
             }
         } else {
-            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported
pattern "
-                    + jbiMex.getPattern());
+            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported
pattern " + jbiMex.getPattern());
             jbiMex.setStatus(ExchangeStatus.ERROR);
             jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern()));
         }
@@ -173,20 +166,12 @@
     }
 
     /**
-     * Called from
-     * {@link MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}
+     * Called from {@link MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}
      * 
      * @param mex
      *            message exchenge
      */
-    public void onResponse(MyRoleMessageExchange mex) {
-        __log.debug("Consuming MEX tracker " + mex.getClientId());
-        javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId());
-        if (jbiMex == null) {
-            __log.warn("Ignoring unknown async reply: " + mex);
-            return;
-        }
-
+    public void onResponse(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange
jbiMex) {
         switch (mex.getStatus()) {
         case FAULT:
             outResponseFault(mex, jbiMex);
@@ -210,70 +195,47 @@
      */
     private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, NormalizedMessage
request) throws Exception {
 
-        // If this has already been tracked, we will not invoke!
-        if (_jbiMexTracker.track(jbiMex)) {
-            __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already received!");
-            return;
+        MyRoleMessageExchange odeMex;
+        if (__log.isDebugEnabled()) {
+            __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint="
+ _endpoint + " operation="
+                    + jbiMex.getOperation());
         }
 
-        _ode.getTransactionManager().begin();
+        odeMex = _ode._server.createMessageExchange(InvocationStyle.BLOCKING, _endpoint.serviceName,
jbiMex.getOperation()
+                .getLocalPart(), jbiMex.getExchangeId());
 
-        boolean success = false;
-        MyRoleMessageExchange odeMex = null;
-        try {
-            if (__log.isDebugEnabled()) {
-                __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint="
+ _endpoint
-                        + " operation=" + jbiMex.getOperation());
-            }
-            odeMex = _ode._server.getEngine().createMessageExchange(jbiMex.getExchangeId(),
_endpoint.serviceName,
-                    jbiMex.getOperation().getLocalPart());
-
-            if (odeMex.getOperation() != null) {
-                copyMexProperties(odeMex, jbiMex);
-                javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage();
-                Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
-                Mapper mapper = _ode.findMapper(request, odeMex.getOperation());
-                if (mapper == null) {
-                    String errmsg = "Could not find a mapper for request message for JBI
MEX " + jbiMex.getExchangeId()
-                            + "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed.
";
-                    __log.error(errmsg);
-                    throw new MessageTranslationException(errmsg);
-
-                }
-                odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());
-                mapper.toODE(odeRequest, request, msgdef);
-                odeMex.invoke(odeRequest);
-
-                // Handle the response if it is immediately available.
-                if (odeMex.getStatus() != Status.ASYNC) {
-                    __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY.");
-                    onResponse(odeMex);
-                    _jbiMexTracker.consume(jbiMex.getExchangeId());
-                } else {
-                    __log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY.");
-                }
-            } else {
-                __log.error("ODE MEX " + odeMex + " was unroutable.");
-                sendError(jbiMex, new IllegalArgumentException("Unroutable invocation."));
-            }
+        if (odeMex.getOperation() == null) {
+            __log.error("ODE MEX " + odeMex + " was unroutable.");
+            sendError(jbiMex, new IllegalArgumentException("Unroutable invocation."));
+            return;
+        }
 
-            success = true;
-            // For one-way invocation we do not need to maintain the association
-            if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY))
{
-                __log.debug("Consuming non Req/Res MEX tracker " + jbiMex.getExchangeId()
+ " with pattern " + jbiMex.getPattern());
-                _jbiMexTracker.consume(jbiMex.getExchangeId());
-            }
+        copyMexProperties(odeMex, jbiMex);
+        javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage();
+        Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+        Mapper mapper = _ode.findMapper(request, odeMex.getOperation());
+        if (mapper == null) {
+            String errmsg = "Could not find a mapper for request message for JBI MEX " +
jbiMex.getExchangeId() + "; ODE MEX "
+                    + odeMex.getMessageExchangeId() + " is failed. ";
+            __log.error(errmsg);
+            throw new MessageTranslationException(errmsg);
 
-        } finally {
-            if (success) {
-                __log.debug("Commiting ODE MEX " + odeMex);
-                _ode.getTransactionManager().commit();
+        }
+        odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());
+        mapper.toODE(odeRequest, request, msgdef);
+        odeMex.setRequest(odeRequest);
+        try {
+            odeMex.invokeBlocking();
+            // Handle the response if it is immediately available.
+            if (odeMex.getStatus() != Status.ASYNC) {
+                __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY.");
+                onResponse(odeMex, jbiMex);
             } else {
-                __log.debug("Rolling back ODE MEX " + odeMex);
-                _jbiMexTracker.consume(jbiMex.getExchangeId());
-                _ode.getTransactionManager().rollback();
-
+                __log.fatal("ODE MEX " + odeMex + " unexpectedly completed ASYNCHRONOUSLY.");
             }
+        } catch (Exception ex) {
+            __log.error("ODE MEX " + odeMex + " resulted in an error.");
+            sendError(jbiMex, ex);
         }
 
     }
@@ -335,7 +297,7 @@
             QName fault = mex.getFault();
             javax.wsdl.Fault wsdlFault = mex.getOperation().getFault(fault.getLocalPart());
             if (wsdlFault == null) {
-               sendError(jbiMex, new MessageTranslationException("Unmapped Fault : " + fault
+ ": " + mex.getFaultExplanation()));
+                sendError(jbiMex, new MessageTranslationException("Unmapped Fault : " + fault
+ ": " + mex.getFaultExplanation()));
             } else {
                 mapper.toNMS(flt, mex.getFaultResponse(), wsdlFault.getMessage(), fault);
                 inout.setFault(flt);
@@ -349,7 +311,7 @@
             sendError(jbiMex, mte);
         }
     }
-    
+
     private void sendError(javax.jbi.messaging.MessageExchange jbiMex, Exception error) {
         try {
             jbiMex.setError(error);
@@ -364,25 +326,4 @@
         return _endpoint;
     }
 
-    /**
-     * Class for tracking outstanding message exchanges from JBI.
-     */
-    private static class JbiMexTracker {
-        /**
-         * Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI
-         * MEX
-         */
-        private Map<String, javax.jbi.messaging.MessageExchange> _outstandingJbiExchanges
= new HashMap<String, javax.jbi.messaging.MessageExchange>();
-
-        synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) {
-            boolean found = _outstandingJbiExchanges.containsKey(jbiMex.getExchangeId());
-            _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex);
-            return found;
-        }
-
-        synchronized javax.jbi.messaging.MessageExchange consume(String clientId) {
-            return _outstandingJbiExchanges.remove(clientId);
-        }
-
-    }
 }



Mime
View raw message