ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r550650 - in /incubator/ode/branches/bart: bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
Date Tue, 26 Jun 2007 00:58:16 GMT
Author: mszefler
Date: Mon Jun 25 17:58:15 2007
New Revision: 550650

URL: http://svn.apache.org/viewvc?view=rev&rev=550650
Log:
BART

Added:
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
      - copied, changed from r550560, incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Removed:
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Modified:
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
Mon Jun 25 17:58:15 2007
@@ -19,7 +19,6 @@
 
 package org.apache.ode.bpel.iapi;
 
-import javax.xml.namespace.QName;
 
 /**
  * Interface exposing the BPEL "engine". Basically, this interface facilitates
@@ -34,26 +33,5 @@
  */
 public interface BpelEngine extends Scheduler.JobProcessor {
 
-    /**
-     * Create a "my role" message exchange for invoking a BPEL process.
-     * 
-     * @param serviceId
-     *            the service id of the process being called, if known
-     * @param operation
-     *            name of the operation
-     * 
-     * @return {@link MyRoleMessageExchange} the newly created message exchange
-     */
-    MyRoleMessageExchange createMessageExchange(String clientKey, QName serviceId, String
operation)
-            throws BpelEngineException;
-
-    /**
-     * Retrieve a message identified by the given identifer.
-     * 
-     * @param mexId
-     *            message exhcange identifier
-     * @return associated message exchange
-     */
-    MessageExchange getMessageExchange(String mexId);
-
+   
 }

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
Mon Jun 25 17:58:15 2007
@@ -18,12 +18,13 @@
  */
 package org.apache.ode.bpel.iapi;
 
+import java.util.Set;
+
 import javax.xml.namespace.QName;
 
 
 /**
- * Interface implemented by the BPEL server. Provides methods for
- * life-cycle management.
+ * Interface implemented by the BPEL server. Provides methods for life-cycle management and
process invocation. 
  * 
  * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
  */
@@ -116,5 +117,39 @@
      * @throws BpelEngineException
      */
     void unregister(QName pid) throws BpelEngineException;
+
+    
+    /**
+     * Inquire of the engine the invocation styles that are supported for a given service.

+     * @param serviceId service identifier 
+     * @return set of supported {@link InvocationStyle}s
+     */
+    Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId);
+    
+    /**
+     * Create a "my role" message exchange for invoking a BPEL process.
+     * 
+     * @param serviceId
+     *            the service id of the process being called, if known
+     * @param operation
+     *            name of the operation
+     * 
+     * @return {@link MyRoleMessageExchange} the newly created message exchange
+     */
+    MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, 
+            QName serviceId, 
+            String operation,
+            String clientKey)
+            throws BpelEngineException;
+
+    /**
+     * Retrieve a message identified by the given identifer.
+     * 
+     * @param mexId
+     *            message exhcange identifier
+     * @return associated message exchange
+     */
+    MessageExchange getMessageExchange(String mexId) 
+        throws BpelEngineException;
 
 }

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
(original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
Mon Jun 25 17:58:15 2007
@@ -56,6 +56,8 @@
      */
     CorrelationStatus getCorrelationStatus();
 
+    void setRequest(Message request);
+    
     /**
      * "Invoke" a process hosted by the BPEL engine. The state of the invocation
      * may be obtained by a call to the {@link MessageExchange#getStatus()}
@@ -68,20 +70,19 @@
      * {@link MessageExchangeContext#onAsyncReply(MyRoleMessageExchange)} when
      * the response become available.
      */
-    Future<MessageExchange.Status> invoke(Message request);
+    void invokeBlocking();
 
+    void invokeReliable();
+    
+    void invokeAsync();
+    
+    void invokeTransacted();
+    
     /**
      * Complete the message, exchange: indicates that the client has receive the
      * response (if any).
      */
     void complete();
-
-    /**
-     * Associate a client key with this message exchange.
-     * 
-     * @param clientKey
-     */
-    void setClientId(String clientKey);
 
     /**
      * Get the previously associated client key for this exchange.

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Mon Jun 25 17:58:15 2007
@@ -29,6 +29,7 @@
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
@@ -106,10 +107,13 @@
         _contexts = contexts;
     }
 
-    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,
-                                                       String operation, String pipedMexId)
+    MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName targetService,
String operation, String clientKey)
             throws BpelEngineException {
 
+        // TODO: for now, invocation of the engine is only supported in RELIABLE mode.
+        if (istyle != InvocationStyle.RELIABLE)
+            throw new BpelEngineException("Unsupported InvocationStyle: " + istyle);
+        
         BpelProcess target = route(targetService, null);
 
         MessageExchangeDAO dao;
@@ -125,7 +129,7 @@
         dao.setStatus(Status.NEW.toString());
         dao.setOperation(operation);
         dao.setPipedMessageExchangeId(pipedMexId);
-        MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao);
+        ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this,
dao);
 
         if (target != null) {
             target.initMyRoleMex(mex);
@@ -134,11 +138,7 @@
         return mex;
     }
 
-    public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,
String operation) {
-        return createMessageExchange(clientKey, targetService, operation, null);        
-    }
-
-    public MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
+    MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
         MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
         if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
         if (mexdao == null)
@@ -167,7 +167,7 @@
             }
             break;
         case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-            mex = new MyRoleMessageExchangeImpl(this, mexdao);
+            mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao);
             if (process != null) {
                 OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
                 PortType ptype = plink.myRolePortType;

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Mon Jun 25 17:58:15 2007
@@ -148,7 +148,7 @@
      * 
      * @param mex
      */
-    void invokeProcess(MyRoleMessageExchangeImpl mex) {
+    void invokeProcess(ReliableMyRoleMessageExchangeImpl mex) {
         _hydrationLatch.latch(1);
         try {
             PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
@@ -179,7 +179,7 @@
         }
     }
 
-    private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
+    private MessageExchangeDAO getDAO(ReliableMyRoleMessageExchangeImpl mex) {
 
     }
 
@@ -191,7 +191,7 @@
         return null;
     }
 
-    void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
+    void initMyRoleMex(ReliableMyRoleMessageExchangeImpl mex) {
         markused();
         PartnerLinkMyRoleImpl target = null;
         for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
@@ -274,7 +274,7 @@
      *            message exchange
      * @return <code>true</code> if execution should continue, <code>false</code>
otherwise
      */
-    boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker)
{
+    boolean processInterceptors(ReliableMyRoleMessageExchangeImpl mex, InterceptorInvoker
invoker) {
         InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
getProcessDAO(), _pconf);
 
         for (MessageExchangeInterceptor i : _mexInterceptors)
@@ -307,7 +307,7 @@
                 if (__log.isDebugEnabled()) {
                     __log.debug("InvokeInternal event for mexid " + we.getMexId());
                 }
-                MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
+                ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl)
_engine.getMessageExchange(we.getMexId());
                 invokeProcess(mex);
             } else {
                 // Instance level events
@@ -612,7 +612,7 @@
 
     /** Create a version-appropriate runtime context. */
     BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
-            MyRoleMessageExchangeImpl instantiatingMessageExchange) {
+            ReliableMyRoleMessageExchangeImpl instantiatingMessageExchange) {
         return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
 
     }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Mon Jun 25 17:58:15 2007
@@ -591,7 +591,7 @@
 
         } else /* IL-mediated communication */  {
             // TODO: distinguish between different kinds of my-role mexss
-            MyRoleMessageExchangeImpl myRoleMex = new MyRoleMessageExchangeImpl();
+            ReliableMyRoleMessageExchangeImpl myRoleMex = new ReliableMyRoleMessageExchangeImpl();
             _bpelProcess._engine._contexts.mexContext.onAsyncReply(myRoleMex);
         }
 
@@ -1121,7 +1121,7 @@
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
+                ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
                 switch (mex.getStatus()) {
                     case ASYNC:
                     case RESPONSE:
@@ -1146,7 +1146,7 @@
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
+                ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
                 _bpelProcess.initMyRoleMex(mex);
 
                 Message message = mex.createMessage(faultData.getFaultName());
@@ -1165,7 +1165,7 @@
         String[] mexRefs = _outstandingRequests.releaseAll();
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
-            MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
+            ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine,
mexDao);
             _bpelProcess.initMyRoleMex(mex);
             mex.setFailure(FailureType.OTHER, "No response.", null);
             _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Mon Jun 25 17:58:15 2007
@@ -42,7 +42,7 @@
 
 /**
  * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed
to the Integration Layer (IL)
- * to allow it to implement incoming (via {@link MyRoleMessageExchangeImpl}) and outgoing
(via {@link PartnerRoleMessageExchangeImpl})
+ * to allow it to implement incoming (via {@link ReliableMyRoleMessageExchangeImpl}) and
outgoing (via {@link PartnerRoleMessageExchangeImpl})
  * communications. 
  * 
  * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It
is imperative that the integration layer
@@ -236,9 +236,6 @@
         return _portType;
     }
 
-    QName getServiceName() {
-        return _callee;
-    }
     public Message getRequest() {
         if (_request != null)
             return _request;
@@ -379,9 +376,18 @@
     public String toString() {
         return "MEX[" + _mexId + "]";
     }
+    
+    protected void assertTransaction() {
+        if (!_contexts.scheduler.isTransacted())
+            throw new BpelEngineException("Operation must be performed in a transaction!");
+    }
 
-    protected <T> T doInDb(InDbAction<T> name) {
-        throw new UnsupportedOperationException();
+    protected <T> T doInDb(InDbAction<T> action) {
+        if (_txflag) {
+            MessageExchangeDAO mexDao;
+            action.call(mexDao);
+        } else {
+        }
     }
 
     interface InDbAction<T> {

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
Mon Jun 25 17:58:15 2007
@@ -83,7 +83,7 @@
      * @param mex
      *            exchange to which the message is related
      */
-    public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
+    public void invokeMyRole(ReliableMyRoleMessageExchangeImpl mex) {
         if (__log.isTraceEnabled()) {
             __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[]
{
                     "messageExchange", mex }));
@@ -266,7 +266,7 @@
         return op;
     }
 
-    private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) {
+    private CorrelationKey[] computeCorrelationKeys(ReliableMyRoleMessageExchangeImpl mex)
{
         Operation operation = mex.getOperation();
         Element msg = mex.getRequest().getMessage();
         javax.wsdl.Message msgDescription = operation.getInput().getMessage();

Copied: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
(from r550560, incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java)
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=550650&p1=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java&r1=550560&p2=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java&r2=550650
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
Mon Jun 25 17:58:15 2007
@@ -21,12 +21,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -41,25 +44,49 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange
{
+/**
+ * Provides an implementation of the {@link MyRoleMessageExchange} inteface for interactions
performed in the
+ * {@link InvocationStyle#RELIABLE} style.
+ * 
+ * @author Maciej Szefler
+ */
+class ReliableMyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange
{
+
+    private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
 
-    private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
     public static final int TIMEOUT = 2 * 60 * 1000;
 
-    private static Map<String, ResponseFuture> _waitingFutures =
-            new ConcurrentHashMap<String, ResponseFuture>();
+    private static Map<String, ResponseFuture> _waitingFutures = new ConcurrentHashMap<String,
ResponseFuture>();
+
+    private CorrelationStatus _cstatus;
+
+    private String _clientId;
 
+    public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+        super(engine, mexId);
 
-    public MyRoleMessageExchangeImpl() {
-        super(engine, mexdao);
+        // RELIABLE means we are bound to a transaction
+        _txflag = true;
     }
 
     public CorrelationStatus getCorrelationStatus() {
-        return CorrelationStatus.valueOf(getDAO().getCorrelationStatus());
+        return _cstatus;
     }
 
-    void setCorrelationStatus(CorrelationStatus status) {
-        getDAO().setCorrelationStatus(status.toString());
+    @Override
+    void load(MessageExchangeDAO dao) {
+        super.load(dao);
+        if (_cstatus == null)
+            _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
+        if (_clientId == null)
+            _clientId = dao.getCorrelationId();
+    }
+
+    @Override
+    public void save(MessageExchangeDAO dao) {
+        super.save(dao);
+        dao.setCorrelationStatus(_cstatus.toString());
+        dao.setCorrelationId(_clientId);
     }
 
     /**
@@ -67,21 +94,19 @@
      * 
      * @param mex
      *            message exchange
-     * @return <code>true</code> if execution should continue,
-     *         <code>false</code> otherwise
+     * @return <code>true</code> if execution should continue, <code>false</code>
otherwise
      */
-    private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker
invoker) {
-        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),

-                mex._dao.getProcess(), null);
+    private boolean processInterceptors(InterceptorInvoker invoker, MessageExchangeDAO mexDao)
{
+        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
mexDao.getProcess(), null);
 
         for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
-            if (!processInterceptor(i, mex, ictx, invoker))
+            if (!processInterceptor(i, this, ictx, invoker))
                 return false;
 
         return true;
     }
 
-    boolean processInterceptor(MessageExchangeInterceptor i, MyRoleMessageExchangeImpl mex,
InterceptorContext ictx,
+    boolean processInterceptor(MessageExchangeInterceptor i, ReliableMyRoleMessageExchangeImpl
mex, InterceptorContext ictx,
             InterceptorInvoker invoker) {
         __log.debug(invoker + "--> interceptor " + i);
         try {
@@ -99,40 +124,54 @@
         return true;
     }
 
-    public Future<MessageExchange.Status> invoke(Message request) {
+    public Future<MessageExchange.Status> invoke(final Message request) {
         if (request == null) {
             String errmsg = "Must pass non-null message to invoke()!";
-            __log.fatal(errmsg);
             throw new NullPointerException(errmsg);
         }
 
-        _dao.setRequest(((MessageImpl) request)._dao);
-        setStatus(MessageExchange.Status.REQUEST);
-
-        if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked)) {
-            throw new BpelEngineException("Intercepted.");
-        }
-
-        BpelProcess target = _engine.route(getDAO().getCallee(), request);
-
-        if (__log.isDebugEnabled())
-            __log.debug("invoke() EPR= " + _epr + " ==> " + target);
+        // For reliable, we MUST HAVE A TRANSACTION!
+        assertTransaction();
 
-        
-        ResponseFuture future = new ResponseFuture();
-        
+        BpelProcess target = _engine.route(_callee, request);
         if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));
 
-            setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
+            ResponseFuture future = new ResponseFuture();
+
+            _cstatus = MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT;
             setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, null);
-            future.done(_lastStatus);
-        } else {
+            future.done(_status);
+
+            return future;
+        }
+
+        doInDb(new InDbAction<Void>() {
+
+            public Void call(MessageExchangeDAO mexdao) {
+                // TODO: perhaps we should check if already backed by DB?
+                MessageDAO msgDao = mexdao.createMessage(request.getType());
+                msgDao.setData(request.getMessage());
+                setStatus(MessageExchange.Status.REQUEST);
+
+                if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked))
{
+                    throw new BpelEngineException("Intercepted.");
+                }
+
+                if (__log.isDebugEnabled())
+                    __log.debug("invoke() EPR= " + _epr + " ==> " + target);
+
+            }
+
+        });
+
+        {
             // Schedule a new job for invocation
             WorkEvent we = new WorkEvent();
             we.setType(WorkEvent.Type.INVOKE_INTERNAL);
-            if (target.isInMemory()) we.setInMem(true);
+            if (target.isInMemory())
+                we.setInMem(true);
             we.setProcessId(target.getPID());
             we.setMexId(getDAO().getMessageExchangeId());
 
@@ -144,7 +183,6 @@
                 future.done(_lastStatus);
             }
 
-
             if (target.isInMemory())
                 _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
             else
@@ -158,16 +196,8 @@
     public void complete() {
     }
 
-    public QName getServiceName() {
-        return getDAO().getCallee();
-    }
-
-    public void setClientId(String clientKey) {
-        getDAO().setCorrelationId(clientKey);
-    }
-
     public String getClientId() {
-        return getDAO().getCorrelationId();
+        return _clientId;
     }
 
     public String toString() {
@@ -179,11 +209,6 @@
         }
     }
 
-    public boolean isAsynchronous() {
-        return true;
-    }
-
-
     protected void responseReceived() {
         final String mexid = getMessageExchangeId();
         _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
@@ -192,18 +217,19 @@
                 ResponseFuture callback = _waitingFutures.remove(mexid);
                 callback.done(_lastStatus);
             }
+
             public void beforeCompletion() {
             }
         });
     }
-    
+
     private static class ResponseFuture implements Future<Status> {
         private Status _status;
 
         public boolean cancel(boolean mayInterruptIfRunning) {
             return false;
         }
-        
+
         public Status get() throws InterruptedException, ExecutionException {
             try {
                 return get(0, TimeUnit.MILLISECONDS);
@@ -212,22 +238,20 @@
                 throw new RuntimeException(e);
             }
         }
-        
-        public Status get(long timeout, TimeUnit unit) 
-            throws InterruptedException, ExecutionException, TimeoutException {
-            
-            
-            synchronized(this) {
+
+        public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+
+            synchronized (this) {
                 if (_status != null)
                     return _status;
-                
+
                 while (_status == null) {
                     this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
                 }
-    
+
                 if (_status == null)
                     throw new TimeoutException();
-                
+
                 return _status;
             }
         }
@@ -235,13 +259,13 @@
         public boolean isCancelled() {
             return false;
         }
-        
+
         public boolean isDone() {
             return _status != null;
         }
-        
+
         void done(Status status) {
-            synchronized(this) {
+            synchronized (this) {
                 _status = status;
                 this.notifyAll();
             }



Mime
View raw message