ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r550631 - in /incubator/ode/branches/bart: bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
Date Mon, 25 Jun 2007 22:46:19 GMT
Author: mszefler
Date: Mon Jun 25 15:46:18 2007
New Revision: 550631

URL: http://svn.apache.org/viewvc?view=rev&rev=550631
Log:
BART

Modified:
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
    incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
Mon Jun 25 15:46:18 2007
@@ -113,6 +113,13 @@
     String getMessageExchangeId()
             throws BpelEngineException;
 
+
+    /**
+     * Get the invocation style for this message exchange. 
+     * @return
+     */
+    InvocationStyle getInvocationStyle();
+    
     /**
      * Get the name of the operation (WSDL 1.1) / message exchange (WSDL 1.2?).
      *

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
Mon Jun 25 15:46:18 2007
@@ -31,39 +31,13 @@
 public interface PartnerRoleChannel {
 
     /**
-     * Style of invocation supported on the given channel. 
+     * Get the supported invocation styles (see {@link InvocationStyle}) for invoking the
EPR provided 
+     * via this channel.
      * 
-     * @author Maciej Szefler
+     * @param partnerEpr partner's EPR
+     * @return supported invocation styles
      */
-    public enum InvocationStyle {
-        /** 
-         * The very ordinary blocking IO style --- the IL will block until the operation
is complete, or until
-         * a timeout is reached. 
-         */
-        BLOCKING, 
-        
-        /**
-         * Asynchrnous style -- the IL will "queue" the invocation, and call-back asynchrnously
when the response
-         * is available. 
-         */
-        ASYNC, 
-        
-        /**
-         * Reliable style -- the IL will queue the invocation using the current transaction.
The response will be
-         * delivered when available using a separate transaction. 
-         */
-        RELIABLE,
-        
-        
-        /**
-         * Transacted style -- the IL will enroll the operation with the current transaction.
The IL will block until the
-         * operation completes. 
-         */
-        TRANSACTED
-    }
-
-    
-    Set<InvocationStyle> getSupportedInvocationStyle();
+    Set<InvocationStyle> getSupportedInvocationStyle(EndpointReference partnerEpr);
     
     /**
      * Return the endpoint reference to the endpoint with which the

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
Mon Jun 25 15:46:18 2007
@@ -31,11 +31,6 @@
  */
 public interface PartnerRoleMessageExchange extends MessageExchange {
 
-    /**
-     * Get the invocation style for this message exchange. 
-     * @return
-     */
-    PartnerRoleChannel.InvocationStyle getInvocationStyle();
 
     /**
      * Get the identifier of the process that created this message exchange.

Modified: incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
(original)
+++ incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
Mon Jun 25 15:46:18 2007
@@ -18,12 +18,13 @@
  */
 package org.apache.ode.bpel.dao;
 
-import org.w3c.dom.Element;
-
-import javax.xml.namespace.QName;
 import java.util.Date;
 import java.util.Set;
 
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Element;
+
 /**
  * Data access object for a message exchange.
  */
@@ -41,6 +42,19 @@
     String getMessageExchangeId();
 
     /**
+     * Get the invocation style. 
+     * @return
+     */
+    String getInvocationStyle();
+    
+    /**
+     * Set the invocation style. 
+     * @param invocationStyle
+     */
+    void setInvocationStyle(String invocationStyle);
+    
+    
+    /**
      * Get output message (could be fault message)
      *
      * @return output message DAO
@@ -234,13 +248,16 @@
     PartnerLinkDAO getPartnerLink();
 
     /**
-     * Gets the mex id for the message exchange that has been piped with
-     * this one in a process to process interaction. 
-     * @return
-     */
-    String getPipedMessageExchangeId();
-    void setPipedMessageExchangeId(String mexId);
+     * Gets the message exchange that has been piped with this one in a process to process
interaction.
+     *  
+     * @return other side of the message pipe 
+     */
+    MessageExchangeDAO getPipedMessageExchange();
+    
+    void setPipedMessageExchange(MessageExchangeDAO mexId);
 
     void release();
+
+    void setFailureType(String failureType);
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Mon Jun 25 15:46:18 2007
@@ -337,7 +337,7 @@
                     if (__log.isDebugEnabled()) {
                         __log.debug("InvokeResponse event for iid " + we.getIID());
                     }
-                    processInstance.invocationResponse(we.getMexId(), we.getChannel());
+                    processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
                     processInstance.execute();
                     break;
                 case MATCHER:

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Mon Jun 25 15:46:18 2007
@@ -43,13 +43,14 @@
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
@@ -109,8 +110,9 @@
     /** JACOB ExecutionQueue (state) */
     protected ExecutionQueueImpl _soup;
 
-    private MyRoleMessageExchangeImpl _instantiatingMessageExchange;
+    private MessageExchangeDAO _instantiatingMessageExchange;
 
+    /** Object for keeping track of all the outstanding <invoke>s. */
     private OutstandingRequestManager _outstandingRequests;
 
     /** List of BLOCKING invocations that need to be deferred until the end of the current
TX */
@@ -127,7 +129,7 @@
 
 
     public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS
PROCESS,
-                                  MyRoleMessageExchangeImpl instantiatingMessageExchange)
{
+                                  MessageExchangeDAO instantiatingMessageExchange) {
         _bpelProcess = bpelProcess;
         _dao = dao;
         _iid = dao.getInstanceId();
@@ -544,61 +546,63 @@
         evt.setOperation(opName);
         evt.setPortType(plinkInstnace.partnerLink.myRolePortType.getQName());
 
-        MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
+        // Get the "my-role" mex from the DB. 
+        MessageExchangeDAO myrolemex = _dao.getConnection().getMessageExchange(mexRef);
 
-        MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
+        // TODO: add some checks here/could get npe
+        MessageDAO message = myrolemex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
                 .getMessage().getQName());
         message.setData(msg);
 
-        MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine,
mex);
-        _bpelProcess.initMyRoleMex(m);
-        m.setResponse(new MessageImpl(message));
+        myrolemex.setResponse(message);
 
+        Status status;
+        
         if (fault != null) {
-            mex.setStatus(MessageExchange.Status.FAULT.toString());
-            mex.setFault(fault);
+            status = Status.FAULT;
+            myrolemex.setFault(fault);
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
         } else {
-            mex.setStatus(MessageExchange.Status.RESPONSE.toString());
+            status = Status.RESPONSE;
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
+        
+        myrolemex.setStatus(status.toString());
+
+        
+        if (myrolemex.getPipedMessageExchange() != null) /* p2p case */ {
+            MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
 
-        if (mex.getPipedMessageExchangeId() != null) {
-            PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess
-                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
             if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
-            }
-            try {
-                switch (m.getStatus()) {
-                    case FAILURE:
-                        // We can't seem to get the failure out of the myrole mex?
-                        pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation
failed", null);
-                        break;
-                    case FAULT:
-                        Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
-                                .getMessage().getQName());
-                        faultRes.setMessage(m.getResponse().getMessage());
-                        pmex.replyWithFault(m.getFault(), faultRes);
-                        break;
-                    case RESPONSE:
-                        Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                        response.setMessage(m.getResponse().getMessage());
-                        pmex.reply(response);
-                        break;
-                    default:
-                        __log.warn("Unexpected state: " + m.getStatus());
-                        break;
-                }
-            } finally {
-                mex.release();
+                __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole
" + pmex);
             }
-        } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
 
-        // send event
+            // In the p2p case we copy the status/response from one mex object into the other.
+            pmex.setResponse(myrolemex.getResponse());
+            pmex.setStatus(myrolemex.getStatus());
+            continuePartnerReplied(pmex);
+            myrolemex.release();
+
+        } else /* IL-mediated communication */  {
+            // TODO: distinguish between different kinds of my-role mexss
+            MyRoleMessageExchangeImpl myRoleMex = new MyRoleMessageExchangeImpl();
+            _bpelProcess._engine._contexts.mexContext.onAsyncReply(myRoleMex);
+        }
+
+
         sendEvent(evt);
     }
 
+
+    /**
+     * Continue a process due to a reply being received from a partner. 
+     * 
+     * @param pmex partner-role message exchange where the response was received
+     */
+    private void continuePartnerReplied(MessageExchangeDAO pmex) {
+        
+    }
+
     /**
      * @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
      *      org.apache.ode.bpel.common.CorrelationKey)
@@ -699,16 +703,18 @@
 
     public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
                          InvokeResponseChannel channel) throws FaultException {
-
+        
+        // Get the Integration Layer's communication channel for the partnerlink.
+        PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
+        
         PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
-        // The target (partner endpoint) -- if it has not been explicitly
-        // initialized
-        // then use the value from bthe deployment descriptor ..
+        
         Element partnerEPR = plinkDAO.getPartnerEPR();
+        
         EndpointReference partnerEpr;
 
         if (partnerEPR == null) {
-            partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
+            partnerEpr = partnerRoleChannel.getInitialEndpointReference();
             // In this case, the partner link has not been initialized.
             if (partnerEpr == null)
                 throw new FaultException(partnerLink.partnerLink.getOwner().constants.qnUninitializedPartnerRole);
@@ -740,127 +746,135 @@
                 : MessageExchangePattern.REQUEST_ONLY).toString());
         mexDao.setChannel(channel == null ? null : channel.export());
 
-        PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
         
-//      PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao,
-//      partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint,
);
-
-        // Properties used by stateful-exchange protocol.
-        String mySessionId = plinkDAO.getMySessionId();
-        String partnerSessionId = plinkDAO.getPartnerSessionId();
-
-        if ( mySessionId != null )
-            mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
-        if ( partnerSessionId != null )
-            mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
-
-        if (__log.isDebugEnabled())
-            __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId="
+ partnerSessionId);
-
         MessageDAO message = mexDao.createMessage(operation.getInput().getMessage().getQName());
         mexDao.setRequest(message);
         message.setData(outgoingMessage);
         message.setType(operation.getInput().getMessage().getQName());
 
-        // Get he my-role EPR (if myrole exists) for optional use by partner
-        // (for callback mechanism).
-        EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
-                .getInitialMyRoleEPR(partnerLink.partnerLink) : null;
-
+        
         BpelProcess p2pProcess = null;
         Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
         if (partnerEndpoint != null)
             p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, new
MemBackedMessageImpl(message.getData(),message.getType(), true));
 
-        if (p2pProcess != null) {
-            // Creating a my mex using the same message id as partner mex to "pipe" them
-            MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
-                    mexDao.getMessageExchangeId(), partnerEndpoint.serviceName,
-                    operation.getName(), mexDao.getMessageExchangeId());
-
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Invoking in a p2p interaction, partnerrole " + mexDao.getMessageExchangeId()
+ " - myrole " + myRoleMex);
-            }
-
-            Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
-            odeRequest.setMessage(outgoingMessage);
-
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Setting myRoleMex session ids for p2p interaction, mySession
"
-                        + partnerSessionId + " - partnerSess " + mySessionId);
-            }
-            if ( partnerSessionId != null )
-                myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
-            if ( mySessionId != null )
-                myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID,
mySessionId);
-
-            mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
-            myRoleMex.invoke(odeRequest);
-        } else /* NOT p2p, need to call out to IL */ {
-            if (partnerEpr != null) {
-                // If we couldn't find the endpoint, then there is no sense
-                // in asking the IL to invoke.
-                mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
-                mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
-                Set<InvocationStyle> supportedStyles = partnerRoleChannel.getSupportedInvocationStyle();
-                if (!_bpelProcess.isInMemory()) {
-                    if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
-                        // If RELIABLE is supported, this is easy, we just do it in-line.
-                        _bpelProcess._engine._contexts.mexContext.invokePartner();
-                    } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)){
-                        // If TRANSACTED is supported, this is again easy, do it in-line.
-                        _bpelProcess._engine._contexts.mexContext.invokePartner(mex, MessageExchangeContext.InvocationStyle.TRANSACTED);
-                    } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
-                        // For BLOCKING invocation, we defer the call until after commit
(unless idempotent). 
-                        _todoBlockingCalls.add(mex);
-                    } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
-                        // For ASYNC style, we defer the call until after commit (unless
idempotent). 
-                        _todoAsyncCalls.add(mex);
-                    } else {
-                        // This really should not happen, indicates IL is screwy.
-                        __log.error("Integration layer did not agree to any known invocation
style for EPR " + DOMUtils.domToString(partnerEPR));
-                        mex.setFailure(FailureType.COMMUNICATION_ERROR, "NoMatchingStyle",partnerEPR);
-                    }
-                } else  /* in-memory */ {
-                    
-                }
-            } else {
-                __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
-                mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
-            }
-        }
-
         evt.setMexId(mexDao.getMessageExchangeId());
         sendEvent(evt);
 
-        // MEX pattern is request only, at this point the status can only be a one way
-        if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) {
-            mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
-            // This mex can now be released
-            mexDao.release();
-        }
-        // Check if there is a synchronous response, if so, we need to inject the
-        // message on the response channel.
-        switch (mex.getStatus()) {
-            case NEW:
-                throw new AssertionError("Impossible!");
+        if (p2pProcess != null) {
+            /* P2P (process-to-process) invocation, special logic */
+            invokeP2P(operation, outgoingMessage, mexDao);
+        } else {
+            /* NOT p2p, need to call out to IL */ 
+            invokeIL(partnerRoleChannel, partnerEpr, mexDao);
+        }
+
+        // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED
invocations, 
+        // we need to inject a message on the response channel, so that the process continues.

+        switch (Status.valueOf(mexDao.getStatus())){
             case ASYNC:
                 break;
             case RESPONSE:
             case FAULT:
             case FAILURE:
-                invocationResponse(mex);
+                injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
                 break;
             default:
-                __log.error("Partner did not acknowledge message exchange: " + mex);
-                mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null);
-                invocationResponse(mex);
+                __log.error("Partner did not acknowledge message exchange: " + mexDao.getMessageExchangeId());
+                mexDao.setStatus(Status.FAILURE.toString());
+                mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
+                injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
         }
 
+        
         return mexDao.getMessageExchangeId();
 
     }
 
+    private void invokeIL(
+            PartnerLinkInstance partnerLink, 
+            Operation operation, 
+            Element outgoingMessage,
+            PartnerRoleChannel partnerRoleChannel, 
+            EndpointReference partnerEpr, 
+            MessageExchangeDAO mexDao) {
+        if (partnerEpr != null) {
+            // If we couldn't find the endpoint, then there is no sense
+            // in asking the IL to invoke.
+            mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
+            mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
+            Set<InvocationStyle> supportedStyles = partnerRoleChannel.getSupportedInvocationStyle(partnerEpr);
+            if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+                // If RELIABLE is supported, this is easy, we just do it in-line.
+                throw new UnsupportedOperationException(); // TODO
+                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl();
+                _bpelProcess._engine._contexts.mexContext.invokePartner(reliableMex);
+            } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)){
+                // If TRANSACTED is supported, this is again easy, do it in-line.
+                throw new UnsupportedOperationException(); // TODO
+                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl();
+                _bpelProcess._engine._contexts.mexContext.invokePartner(transactedMex);
+            } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
+                // For BLOCKING invocation, we defer the call until after commit (unless
idempotent). 
+                BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl();
+                _todoBlockingCalls.add(blockingMex);
+            } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
+                // For ASYNC style, we defer the call until after commit (unless idempotent).
+                AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl();
+                _todoAsyncCalls.add(asyncMex);
+            } else {
+                // This really should not happen, indicates IL is screwy.
+                __log.error("Integration layer did not agree to any known invocation style
for EPR " + DOMUtils.domToString(partnerEPR));
+                mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+                mexDao.setStatus(Status.FAILURE.toString());
+                mexDao.setFaultExplanation("NoMatchingStyle");
+            }
+
+        } else {
+            __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
+            mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString());
+            mexDao.setFaultExplanation("UnknownEndpoint");
+            mexDao.setStatus(Status.FAILURE.toString());
+            //, partnerEPR);
+        }
+    }
+
+    private void invokeP2P(Operation operation, Element outgoingMessage, MessageExchangeDAO
mexDao) {
+        if (BpelProcess.__log.isDebugEnabled()) {
+            __log.debug("Invoking in a p2p interaction, partnerrole " + mexDao.getMessageExchangeId()
+ " - myrole " + myRoleMex);
+        }
+
+        // Properties used by stateful-exchange protocol.
+        String mySessionId = mexDao.getPartnerLink().getMySessionId();
+        String partnerSessionId = mexDao.getPartnerLink().getPartnerSessionId();
+
+
+        if ( mySessionId != null )
+            mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
+        if ( partnerSessionId != null )
+            mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
+
+        if (__log.isDebugEnabled())
+            __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId="
+ partnerSessionId);
+
+
+
+        Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
+        odeRequest.setMessage(outgoingMessage);
+
+        if (BpelProcess.__log.isDebugEnabled()) {
+            __log.debug("Setting myRoleMex session ids for p2p interaction, mySession "
+                    + partnerSessionId + " - partnerSess " + mySessionId);
+        }
+        if ( partnerSessionId != null )
+            myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
+        if ( mySessionId != null )
+            myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
+
+        mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
+        myRoleMex.invoke(odeRequest);
+    }
+
     void execute() {
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
         
@@ -911,7 +925,7 @@
         }
     }
 
-    void inputMsgMatch(final String responsechannel, final int idx, MyRoleMessageExchangeImpl
mex) {
+    void inputMsgMatch(final String responsechannel, final int idx, MessageExchangeDAO mexdao)
{
         // if we have a message match, this instance should be marked
         // active if it isn't already
         if (_dao.getState() == ProcessState.STATE_READY) {
@@ -928,9 +942,9 @@
             sendEvent(evt);
         }
 
-        _outstandingRequests.associate(responsechannel, mex.getMessageExchangeId());
+        _outstandingRequests.associate(responsechannel, mexdao.getMessageExchangeId());
 
-        final String mexId = mex.getMessageExchangeId();
+        final String mexId = mexdao.getMessageExchangeId();
         _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 3168964409165899533L;
 
@@ -980,11 +994,7 @@
         });
     }
 
-    void invocationResponse(PartnerRoleMessageExchangeImpl mex) {
-        invocationResponse(mex.getDAO().getMessageExchangeId(), mex.getDAO().getChannel());
-    }
-
-    void invocationResponse(final String mexid, final String responseChannelId) {
+    void injectPartnerResponse(final String mexid, final String responseChannelId) {
         if (responseChannelId == null)
             throw new NullPointerException("Null responseChannelId");
         if (mexid == null)
@@ -1365,9 +1375,7 @@
                         + " on CKEY " + ckey);
             }
 
-            MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine,
mexdao);
-
-            inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
+            inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mexdao);
             execute();
         } else {
             __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Mon Jun 25 15:46:18 2007
@@ -19,6 +19,7 @@
 
 package org.apache.ode.bpel.engine;
 
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -33,6 +34,7 @@
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.utils.msg.MessageBundle;
@@ -41,7 +43,7 @@
 /**
  * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed
to the Integration Layer (IL)
  * to allow it to implement incoming (via {@link MyRoleMessageExchangeImpl}) and outgoing
(via {@link PartnerRoleMessageExchangeImpl})
- * communications.
+ * communications. 
  * 
  * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It
is imperative that the integration layer
  * not attempt to use {@link MessageExchange} objects from multiple threads. 
@@ -95,6 +97,11 @@
     BpelEngineImpl _engine;
 
     boolean _associated;
+    
+    InvocationStyle _istyle;
+    
+    /** The point at which this message-exchange will time out. */
+    Date _timeout;
    
     enum Change { 
         EPR,
@@ -123,11 +130,11 @@
         _mexId = mexId;
     }
 
-    public MessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO dao) {
-        load(dao);
-    }
 
     void load(MessageExchangeDAO dao) {
+        if (dao.getMessageExchangeId().equals(_mexId))
+            throw new IllegalArgumentException("MessageExchangeId mismatch!");
+        
         if (_pattern == null)
             _pattern = MessageExchangePattern.valueOf(dao.getPattern());
         if (_opname == null)
@@ -144,11 +151,13 @@
             _status = Status.valueOf(dao.getStatus());
         if (_callee == null)
             _callee = dao.getCallee();
-        
+        if (_istyle == null)
+            _istyle = InvocationStyle.valueOf(dao.getInvocationStyle());
     }
     
     public void save(MessageExchangeDAO dao) {
         dao.setStatus(_status.toString());
+        dao.setInvocationStyle(_istyle.toString());
         dao.setFault(_fault);
         dao.setFaultExplanation(_explanation);
         //todo: set failureType
@@ -170,6 +179,10 @@
         }
 
     }
+    
+    public InvocationStyle getInvocationStyle() {
+        return _istyle;
+    }
 
     public boolean isSafe() {
         Object val = getOperation().getExtensionAttribute(SAFE_ATTRIBUTE);
@@ -209,10 +222,6 @@
 
     public String getFaultExplanation() {
         return _explanation;
-    }
-
-    public MessageExchangePattern getPattern() {
-        return _pattern;
     }
 
     public Status getStatus() {

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Mon Jun 25 15:46:18 2007
@@ -45,13 +45,14 @@
     
     PartnerRoleMessageExchangeImpl(
             BpelEngineImpl engine, 
+            String mexId,
             PortType portType,
             Operation operation, 
             boolean inMem,
             EndpointReference epr,
             EndpointReference myRoleEPR,
             PartnerRoleChannel channel) {
-        super(engine);
+        super(engine, mexId);
         _myRoleEPR = myRoleEPR;
         _channel = channel;
         _inMem = inMem;

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
Mon Jun 25 15:46:18 2007
@@ -1,5 +1,15 @@
 package org.apache.ode.bpel.engine;
 
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+
 public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl
{
+
+    ReliablePartnerRoleMessageExchangeImpl();
+        // TODO Auto-generated constructor stub
+    }
 
 }



Mime
View raw message