ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r550983 - /incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
Date Tue, 26 Jun 2007 23:21:17 GMT
Author: mszefler
Date: Tue Jun 26 16:21:01 2007
New Revision: 550983

URL: http://svn.apache.org/viewvc?view=rev&rev=550983
Log:
Implement MyRole MessageExchange objects. 

Added:
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
  (with props)
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
  (with props)
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
  (with props)
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
  (with props)
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
  (with props)
Modified:
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java

Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=auto&rev=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
(added)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -0,0 +1,105 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+
+/**
+ * For invoking the engine using ASYNC style.
+ * 
+ * @author Maciej Szefler
+ * 
+ */
+public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+    private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
+
+    private static Map<String, ResponseFuture> _waitingFutures = new ConcurrentHashMap<String,
ResponseFuture>();
+
+    public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+        super(engine, mexId);
+    }
+
+    public Future<Status> invokeAsync() {
+        ResponseFuture future = new ResponseFuture();
+
+        BpelProcess target = _engine.route(_callee, _request);
+        if (target == null) {
+            if (__log.isWarnEnabled())
+                __log.warn(__msgs.msgUnknownEPR("" + _epr));
+
+            _cstatus = CorrelationStatus.UKNOWN_ENDPOINT;
+            setFailure(FailureType.UNKNOWN_ENDPOINT, null, null);
+            future.done(_status);
+
+            return future;
+        }
+
+        scheduleInvoke(target);
+      
+        if (getOperation().getOutput() != null) {
+            _waitingFutures.put(getMessageExchangeId(), future);
+        } else {
+            future.done(getStatus());
+        }
+
+        return future;
+
+    }
+
+    private static class ResponseFuture implements Future<Status> {
+        private Status _status;
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        public Status get() throws InterruptedException, ExecutionException {
+            try {
+                return get(0, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                // If it's thrown it's definitely a bug
+                throw new RuntimeException(e);
+            }
+        }
+
+        public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+
+            synchronized (this) {
+                if (_status != null)
+                    return _status;
+
+                while (_status == null) {
+                    this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+                }
+
+                if (_status == null)
+                    throw new TimeoutException();
+
+                return _status;
+            }
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return _status != null;
+        }
+
+        void done(Status status) {
+            synchronized (this) {
+                _status = status;
+                this.notifyAll();
+            }
+        }
+    }
+
+}

Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=auto&rev=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
(added)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -0,0 +1,100 @@
+package org.apache.ode.bpel.engine;
+
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.w3c.dom.Element;
+
+/**
+ * Implementation of the {@link PartnerRoleMessageExchange} interface that is used when the
ASYNC invocation 
+ * style is being used (see {@link InvocationStyle#ASYNC}). The basic idea here is that with
this style, the
+ * IL does not get the "message" (i.e. this object) until the ODE transaction has committed,
and it does not
+ * block during the performance of the operation. Hence, when a reply becomes available,
we'll need to 
+ * schedule a transaction to process it. 
+ * 
+ * @author Maciej Szefler
+ *
+ */
+public class AsyncPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
+
+    private static final Log __log = LogFactory.getLog(AsyncPartnerRoleMessageExchangeImpl.class);
+    
+    AsyncPartnerRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId, PortType portType,
Operation operation, boolean inMem, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel
channel) {
+        super(engine, mexId, portType, operation, inMem, epr, myRoleEPR, channel);
+    }
+    
+    public void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException
{
+        if(!isAsync())
+            throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC
state!");
+        
+        super.replyWithFault(faultType,outputFaultMessage);
+        scheduleContinuation();
+    }
+
+    public void reply(Message response) throws BpelEngineException {
+        if(!isAsync())
+            throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC
state!");
+
+        super.reply(response);
+        scheduleContinuation();
+
+    }
+
+    public void replyWithFailure(FailureType type, String description, Element details) throws
BpelEngineException {
+        if(!isAsync())
+            throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC
state!");
+        super.replyWithFailure(type, description, details);
+        scheduleContinuation();
+    }
+        
+
+    /**
+     * Check if we are in the ASYNC state. 
+     * 
+     * @return
+     */
+    private boolean isAsync() {
+        return getStatus() == Status.ASYNC;
+    }
+
+
+    /**
+     * Continue from the ASYNC state by scheduling a continuation to process a response/fault/failure.

+     */
+    private void scheduleContinuation() {
+        // If there is no channel waiting for us, there is nothing to do.
+        if (getPartnerRoleChannel() == null) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("no channel on mex=" + getMessageExchangeId());
+            }
+            return;
+        }
+        
+
+        WorkEvent we = new WorkEvent();
+        we.setIID(_iid);
+        we.setType(WorkEvent.Type.INVOKE_RESPONSE);
+        we.setInMem(_inMem);
+        we.setChannel(_responseChannel);
+        we.setMexId(_mexId);
+
+        if (__log.isDebugEnabled()) {
+            __log.debug("scheduleContinuation: scheduling WorkEvent " + we);
+        }
+        
+        if (_inMem)
+            _contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+        else
+            _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+    }
+
+}

Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=auto&rev=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
(added)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -0,0 +1,34 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+
+public class BlockingMyRoleMessageExchangeImpl extends AsyncMyRoleMessageExchangeImpl {
+    Future<Status> _future;
+    boolean _done = false;
+    
+    public BlockingMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+        super(engine, mexId);
+    }
+
+    @Override
+    public Future<Status> invokeAsync() {
+        throw new BpelEngineException("Invalid invocation style, use invokeBlocking() instead.");
+    }
+
+    @Override
+    public Status invokeBlocking() throws BpelEngineException, TimeoutException {
+        if (_done) 
+            return _status;
+        if (_future != null)
+            _future.get();
+        Future<Status> future = super.invokeAsync();
+        
+        future.get(timeout, unit)
+    }
+
+    
+}

Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java?view=auto&rev=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
(added)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -0,0 +1,27 @@
+package org.apache.ode.bpel.engine;
+
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+
+/**
+ * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the
IL when the 
+ * BLOCKING invocation style is used (see {@link InvocationStyle#BLOCKING}). The basic idea
here is that 
+ * with this style, the IL performs the operation while blocking in the 
+ * {@link MessageExchangeContext#invokePartner(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}
method.
+ *
+ * This InvocationStyle makes this class rather trivial. 
+ *  
+ * @author Maciej Szefler
+ *
+ */
+public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl
{
+
+    BlockingPartnerRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId, PortType
portType, Operation operation, boolean inMem, EndpointReference epr, EndpointReference myRoleEPR,
PartnerRoleChannel channel) {
+        super(engine, mexId, portType, operation, inMem, epr, myRoleEPR, channel);
+    }
+
+}

Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=550983&r1=550982&r2=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Tue Jun 26 16:21:01 2007
@@ -54,12 +54,13 @@
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Implementation of the {@link BpelEngine} interface: provides the server methods that should
be invoked in the context of a
  * transaction.
- *
+ * 
  * @author mszefler
  * @author Matthieu Riou <mriou at apache dot org>
  */
@@ -107,40 +108,89 @@
         _contexts = contexts;
     }
 
-    MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName targetService,
String operation, String clientKey)
-            throws BpelEngineException {
+    MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName
targetService, final String operation,
+            final String clientKey) throws BpelEngineException {
 
-        // TODO: for now, invocation of the engine is only supported in RELIABLE mode.
-        if (istyle != InvocationStyle.RELIABLE)
-            throw new BpelEngineException("Unsupported InvocationStyle: " + istyle);
-        
-        BpelProcess target = route(targetService, null);
-
-        MessageExchangeDAO dao;
-        if (target == null || target.isInMemory()) {
-            dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-        } else {
-            dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-        }
-        dao.setCorrelationId(clientKey);
-        dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
-        dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
-        dao.setCallee(targetService);
-        dao.setStatus(Status.NEW.toString());
-        dao.setOperation(operation);
-        dao.setPipedMessageExchangeId(pipedMexId);
-        ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this,
dao);
+        final BpelProcess target = route(targetService, null);
+
+        if (target == null)
+            throw new BpelEngineException("NoSuchService: " + targetService);
+
+        Callable<String> createDao = new Callable<String>() {
+
+            public String call() throws Exception {
+                MessageExchangeDAO dao;
+                if (target.isInMemory()) {
+                    dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+                } else {
+                    dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+                }
+                dao.setInvocationStyle(istyle.toString());
+                dao.setCorrelationId(clientKey);
+                dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
+                dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
+                dao.setCallee(targetService);
+                dao.setStatus(Status.NEW.toString());
+                dao.setOperation(operation);
+                return dao.getMessageExchangeId();
+            }
+
+        };
 
-        if (target != null) {
-            target.initMyRoleMex(mex);
+        MyRoleMessageExchangeImpl mex;
+        String mexId;
+        switch (istyle) {
+        case ASYNC:
+            try {
+                mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+            } catch (Exception e) {
+                __log.error("Internal Error: could not execute isolated transaction.", e);
+                throw new BpelEngineException("Internal Error", e);
+            }
+            mex = new AsyncMyRoleMessageExchangeImpl(this, mexId);
+            break;
+        case BLOCKING:
+            try {
+                mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+            } catch (Exception e) {
+                __log.error("Internal Error: could not execute isolated transaction.", e);
+                throw new BpelEngineException("Internal Error", e);
+            }
+            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId);
+            break;
+            
+        case RELIABLE:
+            assertTransaction();
+            try {
+                mexId = createDao.call();
+            } catch (Exception e) {
+                __log.error("Internal Error: could not execute DB calls.", e);
+                throw new BpelEngineException("Internal Error", e);
+            }
+            mex = new ReliableMyRoleMessageExchangeImpl(this, mexId);
+            break;
+        case TRANSACTED:
+            assertTransaction();
+            try {
+                mexId = createDao.call();
+            } catch (Exception e) {
+                __log.error("Internal Error: could not execute DB calls.", e);
+                throw new BpelEngineException("Internal Error", e);
+            }
+            mex = new TransactedMyRoleMessageExchangeImpl(this, mexId);
+        default:
+            throw new Error("Internal Error: unknown InvocationStyle: " + istyle);
         }
 
+        target.initMyRoleMex(mex);
+
         return mex;
     }
 
     MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
         MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
-        if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
+        if (mexdao == null)
+            mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
         if (mexdao == null)
             return null;
 
@@ -203,7 +253,9 @@
 
     /**
      * Register a process with the engine.
-     * @param process the process to register
+     * 
+     * @param process
+     *            the process to register
      */
     void registerProcess(BpelProcess process) {
         _activeProcesses.put(process.getPID(), process);
@@ -217,7 +269,7 @@
     /**
      * Route to a process using the service id. Note, that we do not need the endpoint name
here, we are assuming that two processes
      * would not be registered under the same service qname but different endpoint.
-     *
+     * 
      * @param service
      *            target service id
      * @param request
@@ -260,7 +312,9 @@
                 public void afterCompletion(boolean success) {
                     _instanceLockManager.unlock(we.getIID());
                 }
-                public void beforeCompletion() { }
+
+                public void beforeCompletion() {
+                }
             });
         } catch (InterruptedException e) {
             // Retry later.
@@ -284,8 +338,10 @@
                 process = _activeProcesses.get(we.getProcessId());
             } else {
                 ProcessInstanceDAO instance;
-                if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
-                else instance = _contexts.dao.getConnection().getInstance(we.getIID());
+                if (we.isInMem())
+                    instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
+                else
+                    instance = _contexts.dao.getConnection().getInstance(we.getIID());
 
                 if (instance == null) {
                     __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
@@ -344,13 +400,13 @@
                     _contexts.scheduler.execIsolatedTransaction(new Callable<Void>()
{
                         public Void call() throws Exception {
                             jobInfo.jobDetail.put("final", true);
-                            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,
-                                    new Date(System.currentTimeMillis() + 60 * 1000));
+                            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new
Date(
+                                    System.currentTimeMillis() + 60 * 1000));
                             return null;
                         }
                     });
             } catch (Exception ex) {
-                __log.error("Error rescheduling problematic job: " + jobInfo,ex);
+                __log.error("Error rescheduling problematic job: " + jobInfo, ex);
                 saveToDisk = true;
             }
         } else {
@@ -363,12 +419,11 @@
                 ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f));
                 fos.writeObject(jobInfo);
                 fos.close();
-                __log.error("Saved problematic job to disk (last resort): " + jobInfo +"
in file " + f);
+                __log.error("Saved problematic job to disk (last resort): " + jobInfo + "
in file " + f);
             } catch (Exception ex) {
                 __log.error("Could not save bad job; it will be lost: " + jobInfo, ex);
             }
 
-
         // No more retries.
         return false;
     }
@@ -408,11 +463,16 @@
 
     /**
      * Get the list of globally-registered message-exchange interceptors.
-     *
+     * 
      * @return list
      */
     List<MessageExchangeInterceptor> getGlobalInterceptors() {
         return _contexts.globalIntereceptors;
+    }
+
+    protected void assertTransaction() {
+        if (!_contexts.scheduler.isTransacted())
+            throw new BpelEngineException("Operation must be performed in a transaction!");
     }
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550983&r1=550982&r2=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Tue Jun 26 16:21:01 2007
@@ -191,7 +191,7 @@
         return null;
     }
 
-    void initMyRoleMex(ReliableMyRoleMessageExchangeImpl mex) {
+    void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
         markused();
         PartnerLinkMyRoleImpl target = null;
         for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550983&r1=550982&r2=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
@@ -41,15 +42,15 @@
 import org.w3c.dom.Element;
 
 /**
- * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed
to the Integration Layer (IL)
- * to allow it to implement incoming (via {@link ReliableMyRoleMessageExchangeImpl}) and
outgoing (via {@link PartnerRoleMessageExchangeImpl})
- * communications. 
+ * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed
to the Integration Layer (IL) to allow
+ * it to implement incoming (via {@link ReliableMyRoleMessageExchangeImpl}) and outgoing
(via
+ * {@link PartnerRoleMessageExchangeImpl}) communications.
  * 
  * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It
is imperative that the integration layer
- * not attempt to use {@link MessageExchange} objects from multiple threads. 
+ * not attempt to use {@link MessageExchange} objects from multiple threads.
  * 
  * @author Maciej Szefler
- *
+ * 
  */
 abstract class MessageExchangeImpl implements MessageExchange {
 
@@ -92,49 +93,44 @@
 
     Contexts _contexts;
 
-    QName _callee;
-    
     BpelEngineImpl _engine;
 
     boolean _associated;
-    
+
     InvocationStyle _istyle;
-    
+
     /** The point at which this message-exchange will time out. */
     Date _timeout;
-   
-    enum Change { 
-        EPR,
-        RESPONSE, 
-        RELEASE
+
+    enum Change {
+        EPR, RESPONSE, RELEASE, REQUEST
     }
 
     final HashSet<Change> _changes = new HashSet<Change>();
-    
+
     /** Properties that have been retrieved from the database. */
-    final HashMap<String, String> _properties = new HashMap<String,String>();
-    
+    final HashMap<String, String> _properties = new HashMap<String, String>();
+
     /** Names of properties that have been retrieved from the database. */
     final HashSet<String> _loadedProperties = new HashSet<String>();
 
     /** Names of proprties that have been modified. */
     final HashSet<String> _modifiedProperties = new HashSet<String>();
-    
+
     private FailureType _failureType;
 
     private Set<String> _propNames;
-    
+
     public MessageExchangeImpl(BpelEngineImpl engine, String mexId) {
         _contexts = engine._contexts;
         _engine = engine;
         _mexId = mexId;
     }
 
-
     void load(MessageExchangeDAO dao) {
         if (dao.getMessageExchangeId().equals(_mexId))
             throw new IllegalArgumentException("MessageExchangeId mismatch!");
-        
+
         if (_pattern == null)
             _pattern = MessageExchangePattern.valueOf(dao.getPattern());
         if (_opname == null)
@@ -149,37 +145,44 @@
             _explanation = dao.getFaultExplanation();
         if (_status == null)
             _status = Status.valueOf(dao.getStatus());
-        if (_callee == null)
-            _callee = dao.getCallee();
         if (_istyle == null)
             _istyle = InvocationStyle.valueOf(dao.getInvocationStyle());
     }
-    
+
     public void save(MessageExchangeDAO dao) {
         dao.setStatus(_status.toString());
         dao.setInvocationStyle(_istyle.toString());
         dao.setFault(_fault);
         dao.setFaultExplanation(_explanation);
-        //todo: set failureType
-        
+        // todo: set failureType
+
         if (_changes.contains(Change.RESPONSE)) {
             MessageDAO responseDao = dao.createMessage(_response.getType());
             responseDao.setData(_response.getMessage());
         }
-        
+
         if (_changes.contains(Change.EPR)) {
             if (_epr != null)
                 dao.setEPR(_epr.toXML().getDocumentElement());
             else
                 dao.setEPR(null);
         }
-        
+
         for (String modprop : _modifiedProperties) {
             dao.setProperty(modprop, _properties.get(modprop));
         }
 
     }
-    
+
+    void save() {
+        doInTX(new InDbAction<Void>() {
+            public Void call(MessageExchangeDAO mexdao) {
+                save(mexdao);
+                return null;
+            }
+        });
+    }
+
     public InvocationStyle getInvocationStyle() {
         return _istyle;
     }
@@ -240,12 +243,12 @@
         if (_request != null)
             return _request;
 
-        return _request = doInDb(new InDbAction<MessageImpl>() {
+        return _request = doInTX(new InDbAction<MessageImpl>() {
             public MessageImpl call(MessageExchangeDAO dao) {
                 MessageDAO req = dao.getRequest();
                 if (req == null)
                     return null;
-                return new MemBackedMessageImpl(req.getData(),req.getType(),true);
+                return new MemBackedMessageImpl(req.getData(), req.getType(), true);
             }
         });
 
@@ -255,13 +258,13 @@
         if (_response != null)
             return _response;
 
-        return _response = doInDb(new InDbAction<MessageImpl>() {
+        return _response = doInTX(new InDbAction<MessageImpl>() {
             public MessageImpl call(MessageExchangeDAO dao) {
                 MessageDAO req = dao.getResponse();
                 if (req == null)
                     return null;
-                return new MemBackedMessageImpl(req.getData(),req.getType(),true);
-                
+                return new MemBackedMessageImpl(req.getData(), req.getType(), true);
+
             }
         });
     }
@@ -277,7 +280,7 @@
         setStatus(Status.FAULT);
         _fault = faultType;
         _response = (MessageImpl) outputFaultMessage;
-        
+
         _changes.add(Change.RESPONSE);
     }
 
@@ -295,7 +298,7 @@
         _response = (MessageImpl) outputMessage;
         _response.makeReadOnly();
         _changes.add(Change.RESPONSE);
-        
+
     }
 
     void setFailure(FailureType type, String reason, Element details) throws BpelEngineException
{
@@ -303,7 +306,7 @@
         setStatus(Status.FAILURE);
         _failureType = type;
         _explanation = reason;
-        
+
         _changes.add(Change.RESPONSE);
     }
 
@@ -312,7 +315,7 @@
     }
 
     public Message createMessage(javax.xml.namespace.QName msgType) {
-        return new MemBackedMessageImpl(null,msgType,false);
+        return new MemBackedMessageImpl(null, msgType, false);
     }
 
     public void setEndpointReference(EndpointReference ref) {
@@ -324,7 +327,7 @@
         if (_epr != null)
             return _epr;
 
-        return _epr = doInDb(new InDbAction<EndpointReference>() {
+        return _epr = doInTX(new InDbAction<EndpointReference>() {
 
             public EndpointReference call(MessageExchangeDAO mexdao) {
                 Element eprdao = mexdao.getEPR();
@@ -335,14 +338,13 @@
 
     }
 
-
     public String getProperty(final String key) {
         if (!_loadedProperties.contains(key)) {
-            _properties.put(key, doInDb(new InDbAction<String> () {
+            _properties.put(key, doInTX(new InDbAction<String>() {
                 public String call(MessageExchangeDAO mexdao) {
                     return mexdao.getProperty(key);
                 }
-                
+
             }));
             _loadedProperties.add(key);
         }
@@ -351,7 +353,7 @@
     }
 
     public void setProperty(String key, String value) {
-        _properties.put(key,value);
+        _properties.put(key, value);
         _loadedProperties.add(key);
         _modifiedProperties.add(key);
     }
@@ -359,13 +361,13 @@
     public Set<String> getPropertyNames() {
         if (_propNames != null)
             return _propNames;
-        
-        return _propNames = doInDb(new InDbAction<Set<String>>() {
+
+        return _propNames = doInTX(new InDbAction<Set<String>>() {
             public Set<String> call(MessageExchangeDAO mexdao) {
                 return mexdao.getPropertyNames();
             }
         });
-        
+
     }
 
     public void release() {
@@ -376,22 +378,47 @@
     public String toString() {
         return "MEX[" + _mexId + "]";
     }
-    
+
     protected void assertTransaction() {
         if (!_contexts.scheduler.isTransacted())
             throw new BpelEngineException("Operation must be performed in a transaction!");
     }
 
-    protected <T> T doInDb(InDbAction<T> action) {
+    protected <T> T doInTX(final InDbAction<T> action) {
         if (_txflag) {
-            MessageExchangeDAO mexDao;
-            action.call(mexDao);
+            assertTransaction();
+            return action.call(getDAO());
         } else {
+            try {
+                return _contexts.scheduler.execIsolatedTransaction(new Callable<T>()
{
+                    public T call() throws Exception {
+                        assertTransaction();
+                        return action.call(getDAO());
+                    }
+
+                }).get();
+            } catch (Exception ie) {
+                __log.error("Internal error executing transaction.", ie);
+                throw new BpelEngineException("Internal Error",ie);
+            }
         }
     }
 
+    /**
+     * Get the DAO object. Note, we can do this only when we are running in a transaction.
+     * 
+     * @return 
+     */
+    protected MessageExchangeDAO getDAO() {
+        assertTransaction();
+        MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(_mexId);
+        if (mexdao == null)
+            mexdao = _contexts.dao.getConnection().getMessageExchange(_mexId);
+        return mexdao;
+    }
+
     interface InDbAction<T> {
         public T call(MessageExchangeDAO mexdao);
     }
-    
+
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=550983&r1=550982&r2=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -95,20 +95,8 @@
             __log.debug("invoke() EPR= " + _epr + " ==> " + target);
         setStatus(Status.REQUEST);
         save(getDAO());
-        
         scheduleInvoke(target);
-                            
     }
 
-    /**
-     * Get the DAO object. Note, we can do this for RELIABLE, since we are guaranteed to
be running in 
-     * a transaction.
-     * 
-     * @return
-     */
-    MessageExchangeDAO getDAO() {
-        MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(_mexId);
-        if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(_mexId);
-        return mexdao;        
-    }
+
 }

Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=auto&rev=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
(added)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
Tue Jun 26 16:21:01 2007
@@ -0,0 +1,16 @@
+package org.apache.ode.bpel.engine;
+
+/**
+ * Transacted my-role message exchange. 
+ * 
+ * TODO: IMPLEMENT!
+ * @author Maciej Szefler 
+ *
+ */
+public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+
+    public TransactedMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+        super(engine, mexId);
+    }
+
+}

Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?view=diff&rev=550983&r1=550982&r2=550983
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
(original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
Tue Jun 26 16:21:01 2007
@@ -60,7 +60,7 @@
     }
 
     public enum Type {
-        TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL
+        TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL, INVOKE_TIMEOUT
     }
 
     public String getChannel() {



Mime
View raw message