ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r550560 - in /incubator/ode/branches/bart: axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-epr/src/main/java/org/apache/ode/il/ bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/run...
Date Mon, 25 Jun 2007 17:34:21 GMT
Author: mszefler
Date: Mon Jun 25 10:34:20 2007
New Revision: 550560

URL: http://svn.apache.org/viewvc?view=rev&rev=550560
Log:
BART, initial changes.

Modified:
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
    incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java Mon Jun 25 10:34:20 2007
@@ -19,6 +19,12 @@
 
 package org.apache.ode.axis2;
 
+import java.util.concurrent.Callable;
+
+import javax.wsdl.Definition;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
@@ -38,20 +44,14 @@
 import org.apache.ode.bpel.iapi.BpelServer;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.utils.DOMUtils;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import javax.wsdl.Definition;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
 /**
  * Acts as a service not provided by ODE. Used mainly for invocation as a way to maintain the WSDL decription of used
  * services.
@@ -66,7 +66,6 @@
 
     private static ThreadLocal<CachedServiceClient> _cachedClients = new ThreadLocal<CachedServiceClient>();
 
-    private ExecutorService _executorService;
     private Definition _definition;
     private QName _serviceName;
     private String _portName;
@@ -76,12 +75,11 @@
     private Scheduler _sched;
     private BpelServer _server;
 
-    public ExternalService(Definition definition, QName serviceName, String portName, ExecutorService executorService,
+    public ExternalService(Definition definition, QName serviceName, String portName, 
             AxisConfiguration axisConfig, Scheduler sched, BpelServer server) throws AxisFault {
         _definition = definition;
         _serviceName = serviceName;
         _portName = portName;
-        _executorService = executorService;
         _axisConfig = axisConfig;
         _sched = sched;
         _converter = new SoapMessageConverter(definition, serviceName, portName, _isReplicateEmptyNS);
@@ -131,51 +129,23 @@
                 final String mexId = odeMex.getMessageExchangeId();
                 final Operation operation = odeMex.getOperation();
 
-                // Defer the invoke until the transaction commits.
-                _sched.registerSynchronizer(new Scheduler.Synchronizer() {
-
-                    public void afterCompletion(boolean success) {
-                        // If the TX is rolled back, then we don't send the request.
-                        if (!success)
-                            return;
-
-                        // The invocation must happen in a separate thread, holding on the afterCompletion
-                        // blocks other operations that could have been listed there as well.
-                        _executorService.submit(new Callable<Object>() {
-                            public Object call() throws Exception {
-                                try {
-                                    operationClient.execute(true);
-                                    MessageContext response = operationClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-                                    MessageContext flt = operationClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_FAULT_VALUE);
-                                    if (flt != null) {
-                                        reply(mexId, operation, flt, true);
-                                    } else {
-                                        reply(mexId, operation, response, false);
-                                    }
-                                } catch (Throwable t) {
-                                    String errmsg = "Error sending message (mex=" + odeMex + "): " + t.getMessage();
-                                    __log.error(errmsg, t);
-                                    replyWithFailure(mexId, MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null);
-                                }
-                                return null;
-                            }
-                        });
-                    }
-
-                    public void beforeCompletion() {
-                    }
-
-                });
-                odeMex.replyAsync();
 
-            } else /** one-way case * */
-            {
-                _executorService.submit(new Callable<Object>() {
-                    public Object call() throws Exception {
-                        operationClient.execute(false);
-                        return null;
+                try {
+                    operationClient.execute(true);
+                    MessageContext response = operationClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+                    MessageContext flt = operationClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_FAULT_VALUE);
+                    if (flt != null) {
+                        reply(mexId, operation, flt, true);
+                    } else {
+                        reply(mexId, operation, response, false);
                     }
-                });
+                } catch (Throwable t) {
+                    String errmsg = "Error sending message to Axis2 for ODE mex " + odeMex;
+                    __log.error(errmsg, t);
+                    replyWithFailure(mexId, MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null);
+                }
+            } else /* one-way case */{
+                operationClient.execute(false);
                 odeMex.replyOneWayOk();
             }
         } catch (AxisFault axisFault) {

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Mon Jun 25 10:34:20 2007
@@ -36,13 +36,10 @@
 
     private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
 
-    private ODEServer _server;
-
     public MessageExchangeContextImpl(ODEServer server) {
-        _server = server;
     }
 
-    public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
+    public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange, InvocationStyle style) throws ContextException {
         if (__log.isDebugEnabled())
             __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName());
 
@@ -58,5 +55,10 @@
 
         // Nothing to do, no callback is necessary, the client just synchornizes itself with the
         // mex reply when invoking the engine.
+    }
+
+    public boolean isStyleSupported(PartnerRoleMessageExchange mex, InvocationStyle style) {
+        // Currently, we only support BLOCKING invokes. 
+        return style == InvocationStyle.BLOCKING;
     }
 }

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Jun 25 10:34:20 2007
@@ -19,6 +19,17 @@
 
 package org.apache.ode.axis2;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.StringTokenizer;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.Definition;
+import javax.xml.namespace.QName;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.AxisService;
@@ -47,18 +58,6 @@
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.fs.TempFileManager;
 
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.StringTokenizer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * Server class called by our Axis hooks to handle all ODE lifecycle management.
  * 
@@ -87,8 +86,6 @@
 
     protected BpelDAOConnectionFactory _daoCF;
 
-    protected ExecutorService _executorService;
-
     protected Scheduler _scheduler;
 
     protected Database _db;
@@ -320,7 +317,7 @@
             return extService;
 
         try {
-            extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig, _scheduler, _server);
+            extService = new ExternalService(def, serviceName, portName, _axisConfig, _scheduler, _server);
         } catch (Exception ex) {
             __log.error("Could not create external service.", ex);
             throw new ContextException("Error creating external service.", ex);
@@ -429,7 +426,6 @@
 
     protected Scheduler createScheduler() {
         QuartzSchedulerImpl scheduler = new QuartzSchedulerImpl();
-        scheduler.setExecutorService(_executorService, 20);
         scheduler.setTransactionManager(_txMgr);
         scheduler.setDataSource(_db.getDataSource());
         scheduler.init();
@@ -440,10 +436,6 @@
         if (__log.isDebugEnabled()) {
             __log.debug("ODE initializing");
         }
-        if (_odeConfig.getThreadPoolMaxSize() == 0)
-            _executorService = Executors.newCachedThreadPool();
-        else
-            _executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize());
 
         _server = new BpelServerImpl();
         _scheduler = createScheduler();

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Mon Jun 25 10:34:20 2007
@@ -19,6 +19,17 @@
 
 package org.apache.ode.axis2;
 
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.TransactionManager;
+import javax.wsdl.Definition;
+import javax.wsdl.Port;
+import javax.wsdl.Service;
+import javax.wsdl.extensions.UnknownExtensibilityElement;
+import javax.wsdl.extensions.soap.SOAPAddress;
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
@@ -41,16 +52,6 @@
 import org.apache.ode.utils.Namespaces;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.wsdl.Port;
-import javax.wsdl.Service;
-import javax.wsdl.extensions.UnknownExtensibilityElement;
-import javax.wsdl.extensions.soap.SOAPAddress;
-import javax.xml.namespace.QName;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A running service, encapsulates the Axis service, its receivers and our

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java Mon Jun 25 10:34:20 2007
@@ -147,8 +147,12 @@
      */
     Message createMessage(QName msgType);
 
-    boolean isTransactionPropagated()
-            throws BpelEngineException;
+    /**
+     * Indicates whether a transactions in associated with the message exchange. If this is the case, then the object must be used
+     * from a context (i.e. thread) that is associated with the same transaction. 
+     * @return <code>true<code> if there is a transaction associated with the object, <code>false</code> otherwise.
+     */
+    boolean isTransactional();
 
     /**
      * Get the message exchange status.
@@ -223,6 +227,13 @@
      */
     public Set<String> getPropertyNames();
 
+    /**
+     * Report whether the operation is "safe" in the sense of the WSDL1.2 meaning of the term. That is,
+     * is the operation side-effect free?
+     * @return <code>true</code> if the operation is safe, <code>false</code> otherwise. 
+     */
+    public boolean isSafe();
+    
     /**
      * Should be called by the external partner when it's done with the
      * message exchange. Ncessary for a better resource management and

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java Mon Jun 25 10:34:20 2007
@@ -21,12 +21,10 @@
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.w3c.dom.Element;
 
-
 /**
  * <p>
- * Context provided by the integration layer exposing partner communication
- * to the BPEL engine. The BPEL engine may only invoke methods on this 
- * interface from a transactional context provided by the integration layer.
+ * Context provided by the integration layer exposing partner communication to the BPEL engine. The BPEL engine may only invoke
+ * methods on this interface from a transactional context provided by the integration layer.
  * </p>
  * 
  * <p>
@@ -36,66 +34,61 @@
  * <li>partner is accessible via an unreliable transport (e.g. HTTP)</li>
  * <li>partner participates in the transaction (e.g. WS-TX)</li>
  * </ol>
- * It is important to note that each usage scenario is identical from the
- * point of view of the BPEL engine. However, the integration layer must
- * handle each of these scenarios in a different manner. See the method 
- * documentation for details.
+ * It is important to note that each usage scenario is identical from the point of view of the BPEL engine. However, the integration
+ * layer must handle each of these scenarios in a different manner. See the method documentation for details.
  * </p>
  */
 public interface MessageExchangeContext {
 
-  /**
-   * <p>Invoke a partner. This method is invoked by the BPEL engine when an 
-   * <code>&lt;invoke&gt;</code> construct is encountered. The BPEL engine
-   * will only invoke this method from a transactional context. This method 
-   * MUST NOT block for extended periods (as it is called from within a 
-   * transaction): to this end, actual invocation may be deferred or a 
-   * synchronous operation may be decomposed into two asynchronous "legs".
-   * The integration layer must provide a response to the message exchange
-   * via the {@link PartnerRoleMessageExchange#reply(Message)}, 
-   * {@link PartnerRoleMessageExchange#replyOneWayOk()}, 
-   * {@link PartnerRoleMessageExchange#replyWithFailure(FailureType, String, Element)}
-   * {@link PartnerRoleMessageExchange#replyWithFault(javax.xml.namespace.QName, Message)},
-   * or {@link PartnerRoleMessageExchange#replyAsync()} methods. </p>
-   * 
-   * <p>Invocation of reliable, unreliable, and transactional transports should 
-   * be treated differently. A brief description of how each of these scenarios
-   * could be handled follows.</p>
-   * 
-   * <p>Reliable transports are transports such as JMS or WS-RM. For these
-   * transports, the request should be enrolled in the current transaction. This
-   * necessarily implies that the request is deferred until the transaction is
-   * committed. It follows that for reliable request-response invocations
-   * the response to the invocation will necessarily be processed in a separate 
-   * transaction. </p>
-   * 
-   * <p>Unreliable transports are transports such as HTTP. For these transports,
-   * where the operation is not idempotent it is typically required that "at 
-   * most once" semantics are achieved. To this end the invocation could be 
-   * noted and deferred until after the transaction is committed. </p> 
-   *  
-   * <p>Transactional transports are those transports that support transaction
-   * propagation. For these transports, the invocation can be processed
-   * immediately and the response provided to the engine via the 
-   * {@link PartnerRoleMessageExchange#reply(Message)} method. </p>
-   * 
-   * @param mex engine-provided partner role message exchange representation,
-   *        this object is valid only for the duration of the transaction 
-   *        from which the {@link #invokePartner(PartnerRoleMessageExchange)}
-   *        method is invoked
-   * @throws ContextException if the port does not support the
-   *         operation
-   */
-  void invokePartner(PartnerRoleMessageExchange mex)
-    throws ContextException;
-  
-  /**
-   * Method used to asynchronously deliver to the integration layer the BPEL 
-   * engine's response to an invocation that could not complete synchronously. 
-   * @see MyRoleMessageExchange#invoke(Message)
-   */
-  void onAsyncReply(MyRoleMessageExchange myRoleMex)
-    throws BpelEngineException; 
+    /**
+     * <p>
+     * Invoke a partner. This method is invoked by the BPEL engine when an <code>&lt;invoke&gt;</code> construct is encountered.
+     * The BPEL engine will only invoke this method from a transactional context. This method MUST NOT block for extended periods
+     * (as it is called from within a transaction): to this end, actual invocation may be deferred or a synchronous operation may be
+     * decomposed into two asynchronous "legs". The integration layer must provide a response to the message exchange via the
+     * {@link PartnerRoleMessageExchange#reply(Message)}, {@link PartnerRoleMessageExchange#replyOneWayOk()},
+     * {@link PartnerRoleMessageExchange#replyWithFailure(FailureType, String, Element)}
+     * {@link PartnerRoleMessageExchange#replyWithFault(javax.xml.namespace.QName, Message)}, or
+     * {@link PartnerRoleMessageExchange#replyAsync()} methods.
+     * </p>
+     * 
+     * <p>
+     * Invocation of reliable, unreliable, and transactional transports should be treated differently. A brief description of how
+     * each of these scenarios could be handled follows.
+     * </p>
+     * 
+     * <p>
+     * Reliable transports are transports such as JMS or WS-RM. For these transports, the request should be enrolled in the current
+     * transaction. This necessarily implies that the request is deferred until the transaction is committed. It follows that for
+     * reliable request-response invocations the response to the invocation will necessarily be processed in a separate transaction.
+     * </p>
+     * 
+     * <p>
+     * Unreliable transports are transports such as HTTP. For these transports, where the operation is not idempotent it is
+     * typically required that "at most once" semantics are achieved. To this end the invocation could be noted and deferred until
+     * after the transaction is committed.
+     * </p>
+     * 
+     * <p>
+     * Transactional transports are those transports that support transaction propagation. For these transports, the invocation can
+     * be processed immediately and the response provided to the engine via the {@link PartnerRoleMessageExchange#reply(Message)}
+     * method.
+     * </p>
+     * 
+     * @param mex
+     *            engine-provided partner role message exchange representation, this object is valid only for the duration of the
+     *            transaction from which the {@link #invokePartner(PartnerRoleMessageExchange)} method is invoked
+     * @throws ContextException
+     *             if the port does not support the operation
+     */
+    void invokePartner(PartnerRoleMessageExchange mex) throws ContextException;
 
+    /**
+     * Method used to asynchronously deliver to the integration layer the BPEL engine's response to an invocation that could not
+     * complete synchronously.
+     * 
+     * @see MyRoleMessageExchange#invoke(Message)
+     */
+    void onAsyncReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException;
 
 }

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java Mon Jun 25 10:34:20 2007
@@ -68,7 +68,7 @@
      * {@link MessageExchangeContext#onAsyncReply(MyRoleMessageExchange)} when
      * the response become available.
      */
-    Future invoke(Message request);
+    Future<MessageExchange.Status> invoke(Message request);
 
     /**
      * Complete the message, exchange: indicates that the client has receive the

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java Mon Jun 25 10:34:20 2007
@@ -19,6 +19,8 @@
 
 package org.apache.ode.bpel.iapi;
 
+import java.util.Set;
+
 /**
  * Representation of a communication link to a partner or partners. Objects of this
  * type generally represent a physical resource in the integration layer that is used
@@ -28,6 +30,41 @@
  */
 public interface PartnerRoleChannel {
 
+    /**
+     * Style of invocation supported on the given channel. 
+     * 
+     * @author Maciej Szefler
+     */
+    public enum InvocationStyle {
+        /** 
+         * The very ordinary blocking IO style --- the IL will block until the operation is complete, or until
+         * a timeout is reached. 
+         */
+        BLOCKING, 
+        
+        /**
+         * Asynchrnous style -- the IL will "queue" the invocation, and call-back asynchrnously when the response
+         * is available. 
+         */
+        ASYNC, 
+        
+        /**
+         * Reliable style -- the IL will queue the invocation using the current transaction. The response will be
+         * delivered when available using a separate transaction. 
+         */
+        RELIABLE,
+        
+        
+        /**
+         * Transacted style -- the IL will enroll the operation with the current transaction. The IL will block until the
+         * operation completes. 
+         */
+        TRANSACTED
+    }
+
+    
+    Set<InvocationStyle> getSupportedInvocationStyle();
+    
     /**
      * Return the endpoint reference to the endpoint with which the
      * channel was initialized or <code>null</code> if the channel

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java Mon Jun 25 10:34:20 2007
@@ -19,83 +19,89 @@
 
 package org.apache.ode.bpel.iapi;
 
-
 import javax.xml.namespace.QName;
 
 import org.w3c.dom.Element;
 
 /**
  * A message exchange orginating from the BPEL server and targeting some external partner.
+ * 
  * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
- *
+ * 
  */
 public interface PartnerRoleMessageExchange extends MessageExchange {
 
-
-  /**
-   * Get the identifier of the process that created this message exchange.
-   * @return
-   */
-  QName getCaller();
-  
-  /**
-   * Get the communication channel.
-   * @return communication channel
-   */
-  PartnerRoleChannel getChannel();
-  
-  
-  /**
-   * Indicate that the partner faulted in processing the message exchange.
-   * 
-   * @param faultType fault type
-   * @param outputFaultMessage the input message
-   *
-   * @throws IllegalStateException if delivering this type of message is
-   *         inappropriate at the present point.
-   */
-  void replyWithFault(QName faultType, Message outputFaultMessage)
-    throws BpelEngineException;
-
-  /**
-   * Indicate that the partner has responded to the message exchange.
-   * 
-   * @param response the response from the partner
-   *
-   * @throws IllegalStateException if delivering this type of message is
-   *         inappropriate at the present point.
-   */
-  void reply(Message response)
-    throws BpelEngineException;
-
-  /**
-   * Indicate that the partner has failed to process the message exchange. 
-   * 
-   * @param type type of failure
-   * @param description description of failure
-   */
-  void replyWithFailure(FailureType type, String description, Element details) 
-    throws BpelEngineException;  
-  
-  /**
-   * Indicate that the partner processed the one-way invocation successfully.
-   */
-  void replyOneWayOk();
-  
-  /**
-   * Indicate that the response to the request/response operation 
-   * is not yet available and that the response will be delivered
-   * asynchronously.
-   */
-  void replyAsync();
-
-  /**
-   * Get the {@link EndpointReference} associated with the my-role of the partner link to which this message
-   * exchange belongs. This method is typically used to provide protocol-specific "callback" mechanisms. 
-   * @return endpoint reference associate with the corresponding my-role, or null if no my-role is defined
-   */
-  EndpointReference getMyRoleEndpointReference();
-
-
+    /**
+     * Get the invocation style for this message exchange. 
+     * @return
+     */
+    PartnerRoleChannel.InvocationStyle getInvocationStyle();
+
+    /**
+     * Get the identifier of the process that created this message exchange.
+     * 
+     * @return
+     */
+    QName getCaller();
+
+    /**
+     * Get the communication channel.
+     * 
+     * @return communication channel, an object created by the IL
+     */
+    PartnerRoleChannel getChannel();
+
+    /**
+     * Indicate that the partner faulted in processing the message exchange.
+     * 
+     * @param faultType
+     *            fault type
+     * @param outputFaultMessage
+     *            the input message
+     * 
+     * @throws IllegalStateException
+     *             if delivering this type of message is inappropriate at the present point.
+     */
+    void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException;
+
+    /**
+     * Indicate that the partner has responded to the message exchange.
+     * 
+     * @param response
+     *            the response from the partner
+     * 
+     * @throws IllegalStateException
+     *             if delivering this type of message is inappropriate at the present point.
+     */
+    void reply(Message response) throws BpelEngineException;
+
+    /**
+     * Indicate that the partner has failed to process the message exchange.
+     * 
+     * @param type
+     *            type of failure
+     * @param description
+     *            description of failure
+     */
+    void replyWithFailure(FailureType type, String description, Element details) throws BpelEngineException;
+
+    /**
+     * Indicate that the partner processed the one-way invocation successfully.
+     */
+    void replyOneWayOk();
+
+    /**
+     * Indicate that the response to the request/response operation is not yet available and that the response will be delivered
+     * asynchronously.
+     */
+    void replyAsync();
+
+    /**
+     * Get the {@link EndpointReference} associated with the my-role of the partner link to which this message exchange belongs.
+     * This method is typically used to provide protocol-specific "callback" mechanisms.
+     * 
+     * @return endpoint reference associate with the corresponding my-role, or null if no my-role is defined
+     */
+    EndpointReference getMyRoleEndpointReference();
 
 }

Modified: incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Mon Jun 25 10:34:20 2007
@@ -176,6 +176,8 @@
                 throw new RuntimeException(e);
             }
         } else {
+            if (_transacted.get() == Boolean.TRUE)
+                throw new RuntimeException("Transaction active.");
             _synchros.get().clear();
         }
         _transacted.set(Boolean.TRUE);

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/elang/xpath20/runtime/XPath20ExpressionRuntime.java Mon Jun 25 10:34:20 2007
@@ -141,8 +141,8 @@
         try {
             return ISO8601DateParser.parseCal(literal);
         } catch (Exception ex) {
-            String errmsg = "Invalid date: " + literal;
-            __log.error(errmsg, ex);
+            String errmsg = "Invalid date format: " + literal;
+            __log.error(errmsg);
             throw new FaultException(cexp.getOwner().constants.qnInvalidExpressionValue,errmsg);
         }
     }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Jun 25 10:34:20 2007
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -158,7 +159,7 @@
                 return;
             }
 
-            mex.getDAO().setProcess(getProcessDAO());
+            getDAO(mex).setProcess(getProcessDAO());
 
             if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
                 __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
@@ -176,6 +177,10 @@
         if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
             mex.release();
         }
+    }
+
+    private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
+
     }
 
     private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Jun 25 10:34:20 2007
@@ -45,8 +45,11 @@
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel.InvocationStyle;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
@@ -75,6 +78,7 @@
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.Namespaces;
 import org.apache.ode.utils.ObjectPrinter;
+import org.omg.CORBA._PolicyStub;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
@@ -85,7 +89,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 class BpelRuntimeContextImpl implements BpelRuntimeContext {
 
@@ -107,11 +113,19 @@
 
     private OutstandingRequestManager _outstandingRequests;
 
+    /** List of BLOCKING invocations that need to be deferred until the end of the current TX */
+    private List<PartnerRoleMessageExchange> _todoBlockingCalls = new LinkedList<PartnerRoleMessageExchange>();
+    
+    /** List of ASYNC invocations that need to be deferred until the end of the current TX. */
+    private List<PartnerRoleMessageExchange> _todoAsyncCalls = new LinkedList<PartnerRoleMessageExchange>();
+    
     private BpelProcess _bpelProcess;
 
     /** Five second maximum for continous execution. */
     private long _maxReductionTimeMs = 2000000;
 
+
+
     public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
                                   MyRoleMessageExchangeImpl instantiatingMessageExchange) {
         _bpelProcess = bpelProcess;
@@ -726,6 +740,11 @@
                 : MessageExchangePattern.REQUEST_ONLY).toString());
         mexDao.setChannel(channel == null ? null : channel.export());
 
+        PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
+        
+//      PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao,
+//      partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, );
+
         // Properties used by stateful-exchange protocol.
         String mySessionId = plinkDAO.getMySessionId();
         String partnerSessionId = plinkDAO.getPartnerSessionId();
@@ -747,23 +766,20 @@
         // (for callback mechanism).
         EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
                 .getInitialMyRoleEPR(partnerLink.partnerLink) : null;
-        PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao,
-                partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, _bpelProcess
-                .getPartnerRoleChannel(partnerLink.partnerLink));
 
         BpelProcess p2pProcess = null;
         Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
         if (partnerEndpoint != null)
-            p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
+            p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, new MemBackedMessageImpl(message.getData(),message.getType(), true));
 
         if (p2pProcess != null) {
             // Creating a my mex using the same message id as partner mex to "pipe" them
             MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
-                    mex.getMessageExchangeId(), partnerEndpoint.serviceName,
-                    operation.getName(), mex.getMessageExchangeId());
+                    mexDao.getMessageExchangeId(), partnerEndpoint.serviceName,
+                    operation.getName(), mexDao.getMessageExchangeId());
 
             if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Invoking in a p2p interaction, partnerrole " + mex + " - myrole " + myRoleMex);
+                __log.debug("Invoking in a p2p interaction, partnerrole " + mexDao.getMessageExchangeId() + " - myrole " + myRoleMex);
             }
 
             Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
@@ -778,18 +794,36 @@
             if ( mySessionId != null )
                 myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
 
-            mex.setStatus(MessageExchange.Status.REQUEST);
+            mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
             myRoleMex.invoke(odeRequest);
-
-            // Can't expect any sync response
-            mex.replyAsync();
-        } else {
-            // If we couldn't find the endpoint, then there is no sense
-            // in asking the IL to invoke.
+        } else /* NOT p2p, need to call out to IL */ {
             if (partnerEpr != null) {
+                // If we couldn't find the endpoint, then there is no sense
+                // in asking the IL to invoke.
                 mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
-                mex.setStatus(MessageExchange.Status.REQUEST);
-                _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
+                mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
+                Set<InvocationStyle> supportedStyles = partnerRoleChannel.getSupportedInvocationStyle();
+                if (!_bpelProcess.isInMemory()) {
+                    if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+                        // If RELIABLE is supported, this is easy, we just do it in-line.
+                        _bpelProcess._engine._contexts.mexContext.invokePartner();
+                    } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)){
+                        // If TRANSACTED is supported, this is again easy, do it in-line.
+                        _bpelProcess._engine._contexts.mexContext.invokePartner(mex, MessageExchangeContext.InvocationStyle.TRANSACTED);
+                    } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
+                        // For BLOCKING invocation, we defer the call until after commit (unless idempotent). 
+                        _todoBlockingCalls.add(mex);
+                    } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
+                        // For ASYNC style, we defer the call until after commit (unless idempotent). 
+                        _todoAsyncCalls.add(mex);
+                    } else {
+                        // This really should not happen, indicates IL is screwy.
+                        __log.error("Integration layer did not agree to any known invocation style for EPR " + DOMUtils.domToString(partnerEPR));
+                        mex.setFailure(FailureType.COMMUNICATION_ERROR, "NoMatchingStyle",partnerEPR);
+                    }
+                } else  /* in-memory */ {
+                    
+                }
             } else {
                 __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
                 mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
@@ -829,10 +863,14 @@
 
     void execute() {
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
+        
+        // Execute the process state reductions
         boolean canReduce = true;
         while (ProcessState.canExecute(_dao.getState()) && System.currentTimeMillis() < maxTime && canReduce) {
             canReduce = _vpu.execute();
         }
+        
+        
         _dao.setLastActiveTime(new Date());
         if (!ProcessState.isFinished(_dao.getState())) {
             if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance " + _iid);

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Mon Jun 25 10:34:20 2007
@@ -19,6 +19,14 @@
 
 package org.apache.ode.bpel.engine;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageDAO;
@@ -30,95 +38,185 @@
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
 
-import javax.wsdl.Operation;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-import java.util.Set;
-
+/**
+ * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed to the Integration Layer (IL)
+ * to allow it to implement incoming (via {@link MyRoleMessageExchangeImpl}) and outgoing (via {@link PartnerRoleMessageExchangeImpl})
+ * communications.
+ * 
+ * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It is imperative that the integration layer
+ * not attempt to use {@link MessageExchange} objects from multiple threads. 
+ * 
+ * @author Maciej Szefler
+ *
+ */
 abstract class MessageExchangeImpl implements MessageExchange {
 
+    /** Namespace for WSDL (2.0) extenions; we adopt them in WSDL 1.1 as well. */
+    static final String WSDL2_EXTENSIONS_NS = "http://www.w3.org/ns/wsdl-extensions";
+
+    /** WSDL extension attribute indicating whether operation is "safe". */
+    static final QName SAFE_ATTRIBUTE = new QName(WSDL2_EXTENSIONS_NS, "safe");
+
     private static final Log __log = LogFactory.getLog(MessageExchangeImpl.class);
+
     protected static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
-    /** Process-Instance identifier.*/
-    protected Long _iid;
+    /** Instance identifier. */
+    Long _iid;
 
-    protected PortType _portType;
-    protected Operation _operation;
+    PortType _portType;
 
-    protected final BpelEngineImpl _engine;
+    Operation _operation;
 
-    protected EndpointReference _epr;
-
-    protected MessageExchangeDAO _dao;
-
-    /**
-     * Constructor: requires the minimal information for a message exchange.
-     */
-    MessageExchangeImpl(BpelEngineImpl engine,
-                        MessageExchangeDAO dao,
-                        MessageExchangePattern pattern,
-                        String opname,
-                        EndpointReference epr) {
-        _engine = engine;
-        _dao = dao;
-        _epr = epr;
+    EndpointReference _epr;
 
-        getDAO().setPattern(pattern.toString());
-        getDAO().setOperation(opname);
-        if (epr != null)
-            getDAO().setEPR(epr.toXML().getDocumentElement());
+    Status _status;
+
+    MessageExchangePattern _pattern;
+
+    String _opname;
+
+    String _mexId;
+
+    Boolean _txflag;
+
+    QName _fault;
+
+    String _explanation;
+
+    MessageImpl _response;
+
+    MessageImpl _request;
+
+    Contexts _contexts;
+
+    QName _callee;
+    
+    BpelEngineImpl _engine;
+
+    boolean _associated;
+   
+    enum Change { 
+        EPR,
+        RESPONSE, 
+        RELEASE
     }
 
-    public MessageExchangeImpl(BpelEngineImpl engine,
-                               MessageExchangeDAO dao) {
+    final HashSet<Change> _changes = new HashSet<Change>();
+    
+    /** Properties that have been retrieved from the database. */
+    final HashMap<String, String> _properties = new HashMap<String,String>();
+    
+    /** Names of properties that have been retrieved from the database. */
+    final HashSet<String> _loadedProperties = new HashSet<String>();
+
+    /** Names of proprties that have been modified. */
+    final HashSet<String> _modifiedProperties = new HashSet<String>();
+    
+    private FailureType _failureType;
+
+    private Set<String> _propNames;
+    
+    public MessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+        _contexts = engine._contexts;
         _engine = engine;
-        _dao = dao;
+        _mexId = mexId;
+    }
+
+    public MessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO dao) {
+        load(dao);
     }
 
+    void load(MessageExchangeDAO dao) {
+        if (_pattern == null)
+            _pattern = MessageExchangePattern.valueOf(dao.getPattern());
+        if (_opname == null)
+            _opname = dao.getOperation();
+        if (_mexId == null)
+            _mexId = dao.getMessageExchangeId();
+        if (_txflag == null)
+            _txflag = dao.getPropagateTransactionFlag();
+        if (_fault == null)
+            _fault = dao.getFault();
+        if (_explanation == null)
+            _explanation = dao.getFaultExplanation();
+        if (_status == null)
+            _status = Status.valueOf(dao.getStatus());
+        if (_callee == null)
+            _callee = dao.getCallee();
+        
+    }
+    
+    public void save(MessageExchangeDAO dao) {
+        dao.setStatus(_status.toString());
+        dao.setFault(_fault);
+        dao.setFaultExplanation(_explanation);
+        //todo: set failureType
+        
+        if (_changes.contains(Change.RESPONSE)) {
+            MessageDAO responseDao = dao.createMessage(_response.getType());
+            responseDao.setData(_response.getMessage());
+        }
+        
+        if (_changes.contains(Change.EPR)) {
+            if (_epr != null)
+                dao.setEPR(_epr.toXML().getDocumentElement());
+            else
+                dao.setEPR(null);
+        }
+        
+        for (String modprop : _modifiedProperties) {
+            dao.setProperty(modprop, _properties.get(modprop));
+        }
+
+    }
+
+    public boolean isSafe() {
+        Object val = getOperation().getExtensionAttribute(SAFE_ATTRIBUTE);
+        if (val == null)
+            return false;
+        try {
+            return new Boolean(val.toString());
+        } catch (Exception ex) {
+            return false;
+        }
+
+    }
 
     public String getMessageExchangeId() throws BpelEngineException {
-        return getDAO().getMessageExchangeId();
+        return _mexId;
     }
 
     public String getOperationName() throws BpelEngineException {
-        return getDAO().getOperation();
+        return _opname;
     }
 
     public MessageExchangePattern getMessageExchangePattern() {
-        return MessageExchangePattern.valueOf(getDAO().getPattern());
-    }
-
-    public boolean isTransactionPropagated() throws BpelEngineException {
-        return getDAO().getPropagateTransactionFlag();
+        return _pattern;
     }
 
-    public Message getResponse() {
-        return new MessageImpl(getDAO().getResponse());
+    public boolean isTransactional() throws BpelEngineException {
+        return _txflag;
     }
 
     public QName getFault() {
-        return getDAO().getFault();
+        return _fault;
     }
 
     public Message getFaultResponse() {
-        return getResponse();
+        return _fault == null ? null : getResponse();
     }
 
     public String getFaultExplanation() {
-        return getDAO().getFaultExplanation();
+        return _explanation;
     }
 
     public MessageExchangePattern getPattern() {
-        return MessageExchangePattern.valueOf(getDAO().getPattern());
+        return _pattern;
     }
 
     public Status getStatus() {
-        return Status.valueOf(getDAO().getStatus());
-    }
-
-    public Message getRequest() {
-        return new MessageImpl(getDAO().getRequest());
+        return _status;
     }
 
     public Operation getOperation() {
@@ -129,112 +227,156 @@
         return _portType;
     }
 
-    /**
-     * Update the pattern of this message exchange.
-     * @param pattern
-     */
-    void setPattern(MessageExchangePattern pattern) {
-        if (__log.isTraceEnabled())
-            __log.trace("Mex[" + getMessageExchangeId() + "].setPattern("+pattern+")");
-        getDAO().setPattern(pattern.toString());
+    QName getServiceName() {
+        return _callee;
+    }
+    public Message getRequest() {
+        if (_request != null)
+            return _request;
+
+        return _request = doInDb(new InDbAction<MessageImpl>() {
+            public MessageImpl call(MessageExchangeDAO dao) {
+                MessageDAO req = dao.getRequest();
+                if (req == null)
+                    return null;
+                return new MemBackedMessageImpl(req.getData(),req.getType(),true);
+            }
+        });
+
     }
 
+    public Message getResponse() {
+        if (_response != null)
+            return _response;
+
+        return _response = doInDb(new InDbAction<MessageImpl>() {
+            public MessageImpl call(MessageExchangeDAO dao) {
+                MessageDAO req = dao.getResponse();
+                if (req == null)
+                    return null;
+                return new MemBackedMessageImpl(req.getData(),req.getType(),true);
+                
+            }
+        });
+    }
 
     void setPortOp(PortType portType, Operation operation) {
         if (__log.isTraceEnabled())
-            __log.trace("Mex[" + getMessageExchangeId()  + "].setPortOp("+portType+","+operation+")");
+            __log.trace("Mex[" + getMessageExchangeId() + "].setPortOp(" + portType + "," + operation + ")");
         _portType = portType;
         _operation = operation;
     }
 
-    MessageExchangeDAO getDAO() {
-        return _dao;
-    }
-
     void setFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
         setStatus(Status.FAULT);
-        getDAO().setFault(faultType);
-        getDAO().setResponse(((MessageImpl)outputFaultMessage)._dao);
+        _fault = faultType;
+        _response = (MessageImpl) outputFaultMessage;
+        
+        _changes.add(Change.RESPONSE);
     }
 
     void setFaultExplanation(String explanation) {
-        getDAO().setFaultExplanation(explanation);
+        _explanation = explanation;
     }
 
     void setResponse(Message outputMessage) throws BpelEngineException {
-        if (getStatus() != Status.REQUEST && getStatus()!=Status.ASYNC)
+        if (getStatus() != Status.REQUEST && getStatus() != Status.ASYNC)
             throw new IllegalStateException("Not in REQUEST state!");
 
         setStatus(Status.RESPONSE);
-        getDAO().setFault(null);
-        getDAO().setResponse(((MessageImpl)outputMessage)._dao);
-
-        // Meant to be overriden by subclasses when needed
-        responseReceived();
+        _fault = null;
+        _explanation = null;
+        _response = (MessageImpl) outputMessage;
+        _response.makeReadOnly();
+        _changes.add(Change.RESPONSE);
+        
     }
 
     void setFailure(FailureType type, String reason, Element details) throws BpelEngineException {
         // TODO not using FailureType, nor details
         setStatus(Status.FAILURE);
-        getDAO().setFaultExplanation(reason);
+        _failureType = type;
+        _explanation = reason;
+        
+        _changes.add(Change.RESPONSE);
     }
 
     void setStatus(Status status) {
-        getDAO().setStatus(status.toString());
+        _status = status;
     }
 
     public Message createMessage(javax.xml.namespace.QName msgType) {
-        MessageDAO mdao = getDAO().createMessage(msgType);
-        return new MessageImpl(mdao);
+        return new MemBackedMessageImpl(null,msgType,false);
     }
 
     public void setEndpointReference(EndpointReference ref) {
         _epr = ref;
-        if (ref != null)
-            getDAO().setEPR(ref.toXML().getDocumentElement());
+        _changes.add(Change.EPR);
     }
 
     public EndpointReference getEndpointReference() throws BpelEngineException {
-        if (_epr != null) return _epr;
-        if (getDAO().getEPR() == null)
-            return null;
+        if (_epr != null)
+            return _epr;
 
-        return _epr = _engine._contexts.eprContext.resolveEndpointReference(getDAO().getEPR());
-    }
+        return _epr = doInDb(new InDbAction<EndpointReference>() {
 
+            public EndpointReference call(MessageExchangeDAO mexdao) {
+                Element eprdao = mexdao.getEPR();
+                return _epr = eprdao == null ? null : _contexts.eprContext.resolveEndpointReference(mexdao.getEPR());
+            }
+
+        });
 
-    QName getServiceName() {
-        return getDAO().getCallee();
     }
 
-    public String getProperty(String key) {
-        String val = getDAO().getProperty(key);
-        if (__log.isDebugEnabled())
-            __log.debug("GET MEX property " + key + " = " + val);
-        return val;
+
+    public String getProperty(final String key) {
+        if (!_loadedProperties.contains(key)) {
+            _properties.put(key, doInDb(new InDbAction<String> () {
+                public String call(MessageExchangeDAO mexdao) {
+                    return mexdao.getProperty(key);
+                }
+                
+            }));
+            _loadedProperties.add(key);
+        }
+
+        return _properties.get(key);
     }
 
     public void setProperty(String key, String value) {
-        getDAO().setProperty(key,value);
-        if (__log.isDebugEnabled())
-            __log.debug("SET MEX property " + key + " = " + value);
+        _properties.put(key,value);
+        _loadedProperties.add(key);
+        _modifiedProperties.add(key);
     }
 
     public Set<String> getPropertyNames() {
-        return getDAO().getPropertyNames();
+        if (_propNames != null)
+            return _propNames;
+        
+        return _propNames = doInDb(new InDbAction<Set<String>>() {
+            public Set<String> call(MessageExchangeDAO mexdao) {
+                return mexdao.getPropertyNames();
+            }
+        });
+        
     }
 
     public void release() {
         __log.debug("Releasing mex " + getMessageExchangeId());
-        _dao.release();
-        _dao = null;
+        _changes.add(Change.RELEASE);
     }
 
     public String toString() {
-        return "MEX["+getDAO().getMessageExchangeId() +"]";
+        return "MEX[" + _mexId + "]";
+    }
+
+    protected <T> T doInDb(InDbAction<T> name) {
+        throw new UnsupportedOperationException();
     }
 
-    protected void responseReceived() {
-        // Nothing to do here, just opening the possibility of overriding
+    interface InDbAction<T> {
+        public T call(MessageExchangeDAO mexdao);
     }
+    
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageImpl.java Mon Jun 25 10:34:20 2007
@@ -31,51 +31,55 @@
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.iapi.Message;
 
-public class MessageImpl implements Message {
+/**
+ * Implementation of the {@link Message} interface. 
+ * 
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+abstract class MessageImpl implements Message {
+
+    boolean _readOnly = false;
+
+    public Element getPart(String partName) {
+        Element message = getMessage();
+        NodeList eltList = message.getElementsByTagName(partName);
+        if (eltList.getLength() == 0)
+            return null;
+        else
+            return (Element) eltList.item(0);
+    }
+
+    public void setMessagePart(String partName, Element content) {
+        Element message = getMessage();
+        message.appendChild(message.getOwnerDocument().importNode(content, true));
+        setMessage(message);
+    }
+
+    public abstract void setMessage(Element msg);
 
-  MessageDAO _dao;
+    public abstract Element getMessage();
 
-  public MessageImpl(MessageDAO message) {
-    if (message == null)
-      throw new NullPointerException("null message!");
-    _dao = message;
-  }
-
-  public Element getPart(String partName) {
-    Element message = getMessage();
-    NodeList eltList = message.getElementsByTagName(partName);
-    if (eltList.getLength() == 0) return null;
-    else return (Element) eltList.item(0);
-  }
-
-  public void setMessagePart(String partName, Element content) {
-    Element message = getMessage();
-    message.appendChild(message.getOwnerDocument().importNode(content, true));
-    setMessage(message);
-  }
-
-  public void setMessage(Element msg) {
-    _dao.setData(msg);
-  }
-
-  public Element getMessage() {
-    return _dao.getData();
-  }
-
-  public QName getType() {
-    return _dao.getType();
-  }
-
-  public List<String> getParts() {
-    ArrayList<String> parts = new ArrayList<String>();
-    Element message = getMessage();
-    NodeList nodeList = message.getChildNodes();
-    for (int m = 0; m < nodeList.getLength(); m++) {
-      Node node = nodeList.item(m);
-      if (node.getNodeType() == Node.ELEMENT_NODE)
-        parts.add(node.getLocalName());
+    public abstract QName getType();
+
+    public List<String> getParts() {
+        ArrayList<String> parts = new ArrayList<String>();
+        Element message = getMessage();
+        NodeList nodeList = message.getChildNodes();
+        for (int m = 0; m < nodeList.getLength(); m++) {
+            Node node = nodeList.item(m);
+            if (node.getNodeType() == Node.ELEMENT_NODE)
+                parts.add(node.getLocalName());
+        }
+        return parts;
     }
-    return parts;
-  }
 
+    
+    protected void makeReadOnly() {
+        _readOnly = true;
+    }
+    
+    protected void checkWrite() {
+        if (_readOnly)
+            throw new IllegalStateException("write attempted to read-only message.");
+    }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Mon Jun 25 10:34:20 2007
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
@@ -45,11 +46,11 @@
     private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
     public static final int TIMEOUT = 2 * 60 * 1000;
 
-    private static Map<String, ResponseCallback> _waitingCallbacks =
-            new ConcurrentHashMap<String, ResponseCallback>();
+    private static Map<String, ResponseFuture> _waitingFutures =
+            new ConcurrentHashMap<String, ResponseFuture>();
 
 
-    public MyRoleMessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO mexdao) {
+    public MyRoleMessageExchangeImpl() {
         super(engine, mexdao);
     }
 
@@ -98,7 +99,7 @@
         return true;
     }
 
-    public Future invoke(Message request) {
+    public Future<MessageExchange.Status> invoke(Message request) {
         if (request == null) {
             String errmsg = "Must pass non-null message to invoke()!";
             __log.fatal(errmsg);
@@ -106,23 +107,27 @@
         }
 
         _dao.setRequest(((MessageImpl) request)._dao);
-        _dao.setStatus(MessageExchange.Status.REQUEST.toString());
+        setStatus(MessageExchange.Status.REQUEST);
 
-        if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked))
-            return null;
+        if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked)) {
+            throw new BpelEngineException("Intercepted.");
+        }
 
         BpelProcess target = _engine.route(getDAO().getCallee(), request);
 
         if (__log.isDebugEnabled())
             __log.debug("invoke() EPR= " + _epr + " ==> " + target);
 
+        
+        ResponseFuture future = new ResponseFuture();
+        
         if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));
 
             setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
             setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, null);
-            return null;
+            future.done(_lastStatus);
         } else {
             // Schedule a new job for invocation
             WorkEvent we = new WorkEvent();
@@ -131,18 +136,23 @@
             we.setProcessId(target.getPID());
             we.setMexId(getDAO().getMessageExchangeId());
 
+            setStatus(Status.ASYNC);
+
             if (getOperation().getOutput() != null) {
-                ResponseCallback callback = new ResponseCallback();
-                _waitingCallbacks.put(getClientId(), callback);
+                _waitingFutures.put(getMessageExchangeId(), future);
+            } else {
+                future.done(_lastStatus);
             }
 
-            setStatus(Status.ASYNC);
+
             if (target.isInMemory())
                 _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
             else
                 _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-            return new ResponseFuture(getClientId());
+
         }
+
+        return future;
     }
 
     public void complete() {
@@ -173,80 +183,68 @@
         return true;
     }
 
-    static class ResponseFuture implements Future {
-        private String _clientId;
-        private boolean _done = false;
 
-        public ResponseFuture(String clientId) {
-            _clientId = clientId;
-        }
+    protected void responseReceived() {
+        final String mexid = getMessageExchangeId();
+        _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+            public void afterCompletion(boolean success) {
+                __log.debug("Received myrole mex response callback");
+                ResponseFuture callback = _waitingFutures.remove(mexid);
+                callback.done(_lastStatus);
+            }
+            public void beforeCompletion() {
+            }
+        });
+    }
+    
+    private static class ResponseFuture implements Future<Status> {
+        private Status _status;
 
         public boolean cancel(boolean mayInterruptIfRunning) {
-            throw new UnsupportedOperationException();
+            return false;
         }
-        public Object get() throws InterruptedException, ExecutionException {
+        
+        public Status get() throws InterruptedException, ExecutionException {
             try {
                 return get(0, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
                 // If it's thrown it's definitely a bug
-                throw new ExecutionException(e);
+                throw new RuntimeException(e);
             }
         }
-        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-            ResponseCallback callback = _waitingCallbacks.get(_clientId);
-            if (callback != null) {
-                callback.waitResponse(timeout);
-                _done = true;
-                if (callback._timedout)
-                    throw new TimeoutException("Message exchange " + this + " timed out when waiting for a response!");
+        
+        public Status get(long timeout, TimeUnit unit) 
+            throws InterruptedException, ExecutionException, TimeoutException {
+            
+            
+            synchronized(this) {
+                if (_status != null)
+                    return _status;
+                
+                while (_status == null) {
+                    this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+                }
+    
+                if (_status == null)
+                    throw new TimeoutException();
+                
+                return _status;
             }
-            return null;
         }
+
         public boolean isCancelled() {
             return false;
         }
+        
         public boolean isDone() {
-            return _done;
+            return _status != null;
         }
-    }
-
-    protected void responseReceived() {
-        final String cid = getClientId();
-        _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-            public void afterCompletion(boolean success) {
-                __log.debug("Received myrole mex response callback");
-                ResponseCallback callback = _waitingCallbacks.remove(cid);
-                if (callback != null) callback.responseReceived();
-            }
-            public void beforeCompletion() {
-            }
-        });
-    }
-
-    static class ResponseCallback {
-        private boolean _timedout;
-        private boolean _waiting = true;
-
-        synchronized boolean responseReceived() {
-            if (_timedout) {
-                return false;
-            }
-            _waiting = false;
-            this.notify();
-            return true;
-        }
-
-        synchronized void waitResponse(long timeout) {
-            long etime = timeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
-            long ctime;
-            try {
-                while (_waiting && (ctime = System.currentTimeMillis()) < etime) {
-                    this.wait(etime - ctime);
-                }
-            } catch (InterruptedException ie) {
-                // ignore
+        
+        void done(Status status) {
+            synchronized(this) {
+                _status = status;
+                this.notifyAll();
             }
-            _timedout = _waiting;
         }
     }
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Mon Jun 25 10:34:20 2007
@@ -37,18 +37,35 @@
 class PartnerRoleMessageExchangeImpl extends MessageExchangeImpl implements PartnerRoleMessageExchange {
     private static final Log LOG = LogFactory.getLog(PartnerRoleMessageExchangeImpl.class);
 
-    private PartnerRoleChannel _channel;
+    private final PartnerRoleChannel _channel;
     private EndpointReference _myRoleEPR;
+    private boolean _inMem;
+
+    private QName _caller;
     
-    PartnerRoleMessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO dao, PortType portType,
+    PartnerRoleMessageExchangeImpl(
+            BpelEngineImpl engine, 
+            PortType portType,
             Operation operation, 
+            boolean inMem,
             EndpointReference epr,
             EndpointReference myRoleEPR,
             PartnerRoleChannel channel) {
-        super(engine, dao);
+        super(engine);
         _myRoleEPR = myRoleEPR;
-        setPortOp(portType, operation);
         _channel = channel;
+        _inMem = inMem;
+        setPortOp(portType, operation);    
+    }
+
+    @Override
+    void load(MessageExchangeDAO dao) {
+        super.load(dao);
+    }
+
+    @Override
+    public void save(MessageExchangeDAO dao) {
+        super.save(dao);
     }
 
     public void replyOneWayOk() {
@@ -113,16 +130,16 @@
             LOG.debug("create work event for mex=" + getMessageExchangeId());
         }
         WorkEvent we = new WorkEvent();
-        we.setIID(getDAO().getInstance().getInstanceId());
+        we.setIID(_iid);
         we.setType(Type.INVOKE_RESPONSE);
-        if (_engine._activeProcesses.get(getDAO().getProcess().getProcessId()).isInMemory())
+        if (_inMem)
             we.setInMem(true);
         we.setChannel(getDAO().getChannel());
-        we.setMexId(getDAO().getMessageExchangeId());
-        if (we.isInMem())
-            _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+        we.setMexId(_mexId);
+        if (_inMem)
+            _contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
         else
-            _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+            _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
     }
 
     /**
@@ -135,13 +152,13 @@
     }
 
     public QName getCaller() {
-        return _dao.getProcess().getProcessId();
+        return _caller;
     }
 
     public String toString() {
         try {
-            return "{PartnerRoleMex#" + getMessageExchangeId() + " [PID " + getCaller() + "] calling " + _epr + "."
-                    + getOperationName() + "(...)}";
+            return "{PartnerRoleMex#" + _mexId  + " [PID " + getCaller() + "] calling " + _epr + "."
+                    + _opname + "(...)}";
 
         } catch (Throwable t) {
             return "{PartnerRoleMex#????}";

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=550560&r1=550559&r2=550560
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Mon Jun 25 10:34:20 2007
@@ -50,7 +50,6 @@
     protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer, PartnerLinkDAO>();
     private Map<QName, ProcessDaoImpl> _store;
     private BpelDAOConnectionImpl _conn;
-    private int _executionCount = 0;
     private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>();
 
     private String _guid;
@@ -107,7 +106,6 @@
                 _instances.put(newInstance.getInstanceId(), newInstance);
             }
         });
-        _executionCount++;
         return newInstance;
     }
 
@@ -174,8 +172,7 @@
     }
 
     public int getNumInstances() {
-        // Instances are removed after execution, using a counter instead
-        return _executionCount;
+        return _instances.size();
     }
 
     public ProcessInstanceDAO getInstanceWithLock(Long iid) {



Mime
View raw message