ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r551417 - in /incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/ memdao/
Date Thu, 28 Jun 2007 00:43:49 GMT
Author: mszefler
Date: Wed Jun 27 17:43:48 2007
New Revision: 551417

URL: http://svn.apache.org/viewvc?view=rev&rev=551417
Log:
BART tweaks. 

Modified:
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -9,6 +9,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 
 /**
@@ -20,14 +21,17 @@
 public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
     private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
 
-    private static Map<String, ResponseFuture> _waitingFutures = new ConcurrentHashMap<String, ResponseFuture>();
-
+    ResponseFuture _future;
+    
     public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
         super(engine, mexId);
     }
 
     public Future<Status> invokeAsync() {
-        ResponseFuture future = new ResponseFuture();
+        if (_future != null)
+            return _future;
+        
+        _future = new ResponseFuture();
 
         BpelProcess target = _engine.route(_callee, _request);
         if (target == null) {
@@ -36,23 +40,30 @@
 
             _cstatus = CorrelationStatus.UKNOWN_ENDPOINT;
             setFailure(FailureType.UNKNOWN_ENDPOINT, null, null);
-            future.done(_status);
+            _future.done(_status);
 
-            return future;
+            return _future;
         }
 
-        scheduleInvoke(target);
-      
-        if (getOperation().getOutput() != null) {
-            _waitingFutures.put(getMessageExchangeId(), future);
+        if (target.isInMemory()) {
+            target.invokeProcess(this);
         } else {
-            future.done(getStatus());
+            scheduleInvoke(target);
+        }
+      
+        if (getOperation().getOutput() == null) {
+            _future.done(getStatus());
         }
 
-        return future;
+        return _future;
 
     }
 
+    protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
+        load(mexdao);
+        _future.done(getStatus());         
+    }
+    
     private static class ResponseFuture implements Future<Status> {
         private Status _status;
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -1,11 +1,17 @@
 package org.apache.ode.bpel.engine;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
 
+/**
+ * Non-transaction blocking MyRole message-exchange implementation.
+ * 
+ * @author Maciej Szefler 
+ */
 public class BlockingMyRoleMessageExchangeImpl extends AsyncMyRoleMessageExchangeImpl {
     Future<Status> _future;
     boolean _done = false;
@@ -23,12 +29,17 @@
     public Status invokeBlocking() throws BpelEngineException, TimeoutException {
         if (_done) 
             return _status;
-        if (_future != null)
-            _future.get();
-        Future<Status> future = super.invokeAsync();
-        
-        future.get(timeout, unit)
-    }
 
-    
+        Future<Status> future = _future != null ? _future : super.invokeAsync();
+        
+        try {
+            future.get(Math.max(System.currentTimeMillis()-_timeout.getTime(),1), TimeUnit.MILLISECONDS);
+            _done = true;
+            return _status;
+        } catch (InterruptedException e) {
+            throw new BpelEngineException(e);
+        } catch (ExecutionException e) {
+            throw new BpelEngineException(e.getCause());
+        } 
+    }    
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Wed Jun 27 17:43:48 2007
@@ -25,7 +25,6 @@
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.BpelEngine;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
@@ -58,13 +57,12 @@
 import java.util.concurrent.TimeUnit;
 
 /**
- * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
- * transaction.
+ * Transaction process engine.
  * 
  * @author mszefler
  * @author Matthieu Riou <mriou at apache dot org>
  */
-public class BpelEngineImpl implements BpelEngine {
+class BpelEngineImpl {
     private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
 
     /** RNG, for delays */
@@ -72,226 +70,18 @@
 
     private static double _delayMean = 0;
 
-    static {
-        try {
-            String delay = System.getenv("ODE_DEBUG_TX_DELAY");
-            if (delay != null && delay.length() > 0) {
-                _delayMean = Double.valueOf(delay);
-                __log.info("Stochastic debugging delay activated. Delay (Mean)=" + _delayMean + "ms.");
-            }
-        } catch (Throwable t) {
-            if (__log.isDebugEnabled()) {
-                __log.debug("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay", t);
-            } else {
-                __log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");
-            }
-        }
-    }
-
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
     /** Maximum number of tries for async jobs. */
     private static final int MAX_RETRIES = 3;
 
-    /** Active processes, keyed by process id. */
-    final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
-
-    /** Mapping from myrole endpoint name to active process. */
-    private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
-
-    /** Manage instance-level locks. */
-    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
-
     final Contexts _contexts;
 
-    public BpelEngineImpl(Contexts contexts) {
+    BpelEngineImpl(Contexts contexts) {
         _contexts = contexts;
     }
 
-    MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService, final String operation,
-            final String clientKey) throws BpelEngineException {
-
-        final BpelProcess target = route(targetService, null);
-
-        if (target == null)
-            throw new BpelEngineException("NoSuchService: " + targetService);
-
-        Callable<String> createDao = new Callable<String>() {
-
-            public String call() throws Exception {
-                MessageExchangeDAO dao;
-                if (target.isInMemory()) {
-                    dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-                } else {
-                    dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-                }
-                dao.setInvocationStyle(istyle.toString());
-                dao.setCorrelationId(clientKey);
-                dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
-                dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
-                dao.setCallee(targetService);
-                dao.setStatus(Status.NEW.toString());
-                dao.setOperation(operation);
-                return dao.getMessageExchangeId();
-            }
-
-        };
-
-        MyRoleMessageExchangeImpl mex;
-        String mexId;
-        switch (istyle) {
-        case ASYNC:
-            try {
-                mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
-            } catch (Exception e) {
-                __log.error("Internal Error: could not execute isolated transaction.", e);
-                throw new BpelEngineException("Internal Error", e);
-            }
-            mex = new AsyncMyRoleMessageExchangeImpl(this, mexId);
-            break;
-        case BLOCKING:
-            try {
-                mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
-            } catch (Exception e) {
-                __log.error("Internal Error: could not execute isolated transaction.", e);
-                throw new BpelEngineException("Internal Error", e);
-            }
-            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId);
-            break;
-            
-        case RELIABLE:
-            assertTransaction();
-            try {
-                mexId = createDao.call();
-            } catch (Exception e) {
-                __log.error("Internal Error: could not execute DB calls.", e);
-                throw new BpelEngineException("Internal Error", e);
-            }
-            mex = new ReliableMyRoleMessageExchangeImpl(this, mexId);
-            break;
-        case TRANSACTED:
-            assertTransaction();
-            try {
-                mexId = createDao.call();
-            } catch (Exception e) {
-                __log.error("Internal Error: could not execute DB calls.", e);
-                throw new BpelEngineException("Internal Error", e);
-            }
-            mex = new TransactedMyRoleMessageExchangeImpl(this, mexId);
-        default:
-            throw new Error("Internal Error: unknown InvocationStyle: " + istyle);
-        }
-
-        target.initMyRoleMex(mex);
-
-        return mex;
-    }
-
-    MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
-        MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
-        if (mexdao == null)
-            mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
-        if (mexdao == null)
-            return null;
-
-        ProcessDAO pdao = mexdao.getProcess();
-        BpelProcess process = pdao == null ? null : _activeProcesses.get(pdao.getProcessId());
-
-        MessageExchangeImpl mex;
-        switch (mexdao.getDirection()) {
-        case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
-            if (process == null) {
-                String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
-                __log.error(errmsg);
-                // TODO: Perhaps we should define a checked exception for this
-                // condition.
-                throw new BpelEngineException(errmsg);
-            }
-            {
-                OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
-                PortType ptype = plink.partnerRolePortType;
-                Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
-                // TODO: recover Partner's EPR
-                mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null, plink.hasMyRole() ? process
-                        .getInitialMyRoleEPR(plink) : null, process.getPartnerRoleChannel(plink));
-            }
-            break;
-        case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-            mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao);
-            if (process != null) {
-                OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
-                PortType ptype = plink.myRolePortType;
-                Operation op = plink.getMyRoleOperation(mexdao.getOperation());
-                mex.setPortOp(ptype, op);
-            }
-            break;
-        default:
-            String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
-            __log.fatal(errmsg);
-            throw new BpelEngineException(errmsg);
-        }
-
-        return mex;
-    }
-
-    BpelProcess unregisterProcess(QName process) {
-        BpelProcess p = _activeProcesses.remove(process);
-        if (p != null) {
-            if (__log.isDebugEnabled())
-                __log.debug("Deactivating process " + p.getPID());
-
-            p.deactivate();
-            while (_serviceMap.values().remove(p))
-                ;
-        }
-        return p;
-    }
-
-    boolean isProcessRegistered(QName pid) {
-        return _activeProcesses.containsKey(pid);
-    }
-
-    /**
-     * Register a process with the engine.
-     * 
-     * @param process
-     *            the process to register
-     */
-    void registerProcess(BpelProcess process) {
-        _activeProcesses.put(process.getPID(), process);
-        for (Endpoint e : process.getServiceNames()) {
-            __log.debug("Register process: serviceId=" + e + ", process=" + process);
-            _serviceMap.put(e, process);
-        }
-        process.activate(this);
-    }
-
-    /**
-     * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
-     * would not be registered under the same service qname but different endpoint.
-     * 
-     * @param service
-     *            target service id
-     * @param request
-     *            request message
-     * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
-     */
-    BpelProcess route(QName service, Message request) {
-        // TODO: use the message to route to the correct service if more than
-        // one service is listening on the same endpoint.
-
-        BpelProcess routed = null;
-        for (Endpoint endpoint : _serviceMap.keySet()) {
-            if (endpoint.serviceName.equals(service))
-                routed = _serviceMap.get(endpoint);
-        }
-        if (__log.isDebugEnabled())
-            __log.debug("Routed: svcQname " + service + " --> " + routed);
-        return routed;
-
-    }
-
-    OProcess getOProcess(QName processId) {
+     OProcess getOProcess(QName processId) {
         BpelProcess process = _activeProcesses.get(processId);
 
         if (process == null)
@@ -300,85 +90,7 @@
         return process.getOProcess();
     }
 
-    public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
-        final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
-
-        // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
-        // Note that we don't want to wait too long here to get our lock, since we are likely holding
-        // on to scheduler's locks of various sorts.
-        try {
-            _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
-            _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                public void afterCompletion(boolean success) {
-                    _instanceLockManager.unlock(we.getIID());
-                }
-
-                public void beforeCompletion() {
-                }
-            });
-        } catch (InterruptedException e) {
-            // Retry later.
-            __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
-            throw new Scheduler.JobProcessorException(true);
-        } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
-            __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
-            // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
-            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
-                    + Math.min(randomExp(1000), 10000)));
-            return;
-        }
-        // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
-        // all types of failure here, the scheduler is not going to know how to handle our errors,
-        // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
-        // to a grinding halt.
-        try {
-
-            BpelProcess process;
-            if (we.getProcessId() != null) {
-                process = _activeProcesses.get(we.getProcessId());
-            } else {
-                ProcessInstanceDAO instance;
-                if (we.isInMem())
-                    instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
-                else
-                    instance = _contexts.dao.getConnection().getInstance(we.getIID());
-
-                if (instance == null) {
-                    __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
-                    // nothing we can do, this instance is not in the database, it will
-                    // always
-                    // fail.
-                    return;
-                }
-                ProcessDAO processDao = instance.getProcess();
-                process = _activeProcesses.get(processDao.getProcessId());
-            }
-
-            if (process == null) {
-                // If the process is not active, it means that we should not be
-                // doing any work on its behalf, therefore we will reschedule the
-                // events for some time in the future (1 minute).
-                Date future = new Date(System.currentTimeMillis() + (60 * 1000));
-                __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
-                _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, future);
-                return;
-            }
-
-            process.handleWorkEvent(jobInfo.jobDetail);
-            debuggingDelay();
-        } catch (BpelEngineException bee) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
-            throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));
-        } catch (ContextException ce) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
-            throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));
-        } catch (RuntimeException rte) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
-            throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));
-        } catch (Throwable t) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
-            throw new Scheduler.JobProcessorException(false);
-
+    public void processJob(WorkEvent we) throws BpelEngineException {
         }
     }
 
@@ -470,9 +182,5 @@
         return _contexts.globalIntereceptors;
     }
 
-    protected void assertTransaction() {
-        if (!_contexts.scheduler.isTransacted())
-            throw new BpelEngineException("Operation must be performed in a transaction!");
-    }
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jun 27 17:43:48 2007
@@ -26,7 +26,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -44,9 +47,12 @@
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OElementVarType;
@@ -70,7 +76,7 @@
 /**
  * Entry point into the runtime of a BPEL process.
  * 
- * @author mszefler
+ * @author Maciej Szefler 
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class BpelProcess {
@@ -118,6 +124,11 @@
     /** Latch-like thing to control hydration/dehydration. */
     private HydrationLatch _hydrationLatch;
 
+    private Contexts _contexts;
+    
+    /** Manage instance-level locks. */
+    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+
     public BpelProcess(ProcessConf conf, BpelEventListener debugger) {
         _pid = conf.getProcessId();
         _pconf = conf;
@@ -148,9 +159,11 @@
      * 
      * @param mex
      */
-    void invokeProcess(ReliableMyRoleMessageExchangeImpl mex) {
+    void invokeProcess(MyRoleMessageExchangeImpl mex) {
         _hydrationLatch.latch(1);
         try {
+            MessageExchangeDAO mexdao = getDAO(mex);
+
             PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
             if (target == null) {
                 String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
@@ -174,12 +187,12 @@
         }
 
         // For a one way, once the engine is done, the mex can be safely released.
-        if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
+        if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
             mex.release();
         }
     }
 
-    private MessageExchangeDAO getDAO(ReliableMyRoleMessageExchangeImpl mex) {
+    private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
 
     }
 
@@ -199,7 +212,10 @@
                 target = getEndpointToMyRoleMap().get(endpoint);
         }
         if (target != null) {
-            mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
+            Operation op = target._plinkDef.getMyRoleOperation(mex.getOperationName());
+            MessageExchange.MessageExchangePattern pattern = op.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
+                    : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+            mex.init(target._plinkDef.myRolePortType, op, pattern);
         } else {
             __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
         }
@@ -274,7 +290,7 @@
      *            message exchange
      * @return <code>true</code> if execution should continue, <code>false</code> otherwise
      */
-    boolean processInterceptors(ReliableMyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
+    boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
         InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf);
 
         for (MessageExchangeInterceptor i : _mexInterceptors)
@@ -287,6 +303,48 @@
         return true;
     }
 
+    
+    /*
+
+        // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
+        // all types of failure here, the scheduler is not going to know how to handle our errors,
+        // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
+        // to a grinding halt.
+        try {
+
+            ProcessInstanceDAO instance;
+            if (process.isInMemory())
+                instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
+            else
+                instance = _contexts.dao.getConnection().getInstance(we.getIID());
+
+            if (instance == null) {
+                __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
+                // nothing we can do, this instance is not in the database, it will
+                // always
+                // fail.
+                return;
+            }
+            ProcessDAO processDao = instance.getProcess();
+            process = _activeProcesses.get(processDao.getProcessId());
+
+            process.handleWorkEvent(we.getDetail());
+            debuggingDelay();
+        } catch (BpelEngineException bee) {
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
+            throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));
+        } catch (ContextException ce) {
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
+            throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));
+        } catch (RuntimeException rte) {
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
+            throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));
+        } catch (Throwable t) {
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
+            throw new Scheduler.JobProcessorException(false);
+
+
+     */
     /**
      * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
      */
@@ -299,18 +357,43 @@
                 __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobData", jobData }));
             }
 
-
-            WorkEvent we = new WorkEvent(jobData);
+            final WorkEvent we = new WorkEvent(jobData);
 
             // Process level events
             if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) {
                 if (__log.isDebugEnabled()) {
                     __log.debug("InvokeInternal event for mexid " + we.getMexId());
                 }
-                ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
+                ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we
+                        .getMexId());
                 invokeProcess(mex);
             } else {
                 // Instance level events
+                // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
+                // Note that we don't want to wait too long here to get our lock, since we are likely holding
+                // on to scheduler's locks of various sorts.
+                try {
+                    _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
+                    _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+                        public void afterCompletion(boolean success) {
+                            _instanceLockManager.unlock(we.getIID());
+                        }
+
+                        public void beforeCompletion() {
+                        }
+                    });
+                } catch (InterruptedException e) {
+                    // Retry later.
+                    __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
+                    throw new Scheduler.JobProcessorException(true);
+                } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
+                    __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
+                    // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
+                    _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
+                            + Math.min(randomExp(1000), 10000)));
+                    return;
+                }
+
                 ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
                 if (procInstance == null) {
                     if (__log.isDebugEnabled()) {
@@ -437,14 +520,14 @@
         return endpoints;
     }
 
-    void activate(BpelEngineImpl engine) {
-        _engine = engine;
+    void activate(Contexts contexts) {
+        _contexts = contexts;
         _debugger = new DebuggerSupport(this);
 
         __log.debug("Activating " + _pid);
         // Activate all the my-role endpoints.
         for (Map.Entry<String, Endpoint> entry : _pconf.getProvideEndpoints().entrySet()) {
-            EndpointReference initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, entry.getValue());
+            EndpointReference initialEPR = _contexts.bindingContext.activateMyRoleEndpoint(_pid, entry.getValue());
             __log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": EPR is " + initialEPR);
             _myEprs.put(entry.getValue(), initialEPR);
         }
@@ -558,7 +641,7 @@
             _hydrationLatch.release(1);
         }
     }
-    
+
     OProcess getOProcess() {
         _hydrationLatch.latch(1);
         try {
@@ -568,6 +651,61 @@
         }
     }
 
+    MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) {
+        InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+        
+        _hydrationLatch.latch(1);
+        try {
+            MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+            OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
+            PortType ptype = plink.myRolePortType;
+            Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+            mex.init(ptype, op, MessageExchangePattern.valueOf(mexdao.getPattern()));
+            return mex;
+        } finally {
+            _hydrationLatch.release(1);
+        }
+    }
+
+    PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO  mexdao) {
+        InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+        PartnerRoleMessageExchangeImpl mex;
+        _hydrationLatch.latch(1);
+        try {
+            OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
+            PortType ptype = plink.partnerRolePortType;
+            Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
+            switch (istyle) {
+            case BLOCKING:
+                mex = new BlockingPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                break;
+            case ASYNC:
+                mex = new AsyncPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+                        null, /* EPR todo */
+                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                break;
+
+            case TRANSACTED:
+                mex = new TransactedPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                break;
+            case RELIABLE:
+                mex = new ReliablePartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                break;
+                
+            default:
+                throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
+
+            }
+            return mex;
+        } finally {
+            _hydrationLatch.release(1);
+        }
+
+    }
+
     private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
         _hydrationLatch.latch(1);
         try {
@@ -612,7 +750,7 @@
 
     /** Create a version-appropriate runtime context. */
     BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
-            ReliableMyRoleMessageExchangeImpl instantiatingMessageExchange) {
+            MessageExchangeDAO instantiatingMessageExchange) {
         return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
 
     }
@@ -741,14 +879,13 @@
                 }
             }
 
-
             if (isInMemory()) {
                 bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
             } else if (_engine._contexts.scheduler.isTransacted()) {
                 // If we have a transaction, we do this in the current transaction.
                 bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
             } else {
-                // If we do not have a transaction we need to create one. 
+                // If we do not have a transaction we need to create one.
                 try {
                     _engine._contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
                         public Object call() throws Exception {

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Jun 27 17:43:48 2007
@@ -127,6 +127,7 @@
     /** List of ASYNC invocations that need to be deferred until the end of the current TX. */
     private List<PartnerRoleMessageExchange> _todoAsyncCalls = new LinkedList<PartnerRoleMessageExchange>();
     
+    
     private BpelProcess _bpelProcess;
 
     /** Five second maximum for continous execution. */

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Jun 27 17:43:48 2007
@@ -18,11 +18,17 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -32,17 +38,26 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngine;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.BpelEventListener;
 import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
@@ -56,45 +71,51 @@
  * <p>
  * The BPEL server implementation.
  * </p>
- *
+ * 
  * <p>
- * This implementation is intended to be thread safe. The key concurrency
- * mechanism is a "management" read/write lock that synchronizes all management
- * operations (they require "write" access) and prevents concurrent management
- * operations and processing (processing requires "read" access). Write access
- * to the lock is scoped to the method, while read access is scoped to a
+ * This implementation is intended to be thread safe. The key concurrency mechanism is a "management" read/write lock that
+ * synchronizes all management operations (they require "write" access) and prevents concurrent management operations and processing
+ * (processing requires "read" access). Write access to the lock is scoped to the method, while read access is scoped to a
  * transaction.
  * </p>
- *
+ * 
  * @author Maciej Szefler <mszefler at gmail dot com>
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
 
     private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
+
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
+    private final List<WeakReference<MessageExchangeStateListener>> _mexStateListeners = new ArrayList<WeakReference<MessageExchangeStateListener>>();
+
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
-    /** 
-     * Set of processes that are registered with the server. Includes hydrated and dehydrated processes.
-     * Guarded by _mngmtLock.writeLock(). 
+    /**
+     * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
+     * _mngmtLock.writeLock().
      */
-    private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
+    private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>();
+
+    /** Mapping from myrole endpoint name to active process. */
+    private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
 
     private State _state = State.SHUTDOWN;
+
     private Contexts _contexts = new Contexts();
+
     private DehydrationPolicy _dehydrationPolicy;
+
     private Properties _configProperties;
-    
-    BpelEngineImpl _engine;
+
+
     BpelDatabase _db;
 
     /**
-     * Management lock for synchronizing management operations and preventing
-     * processing (transactions) from occuring while management operations are
-     * in progress.
+     * Management lock for synchronizing management operations and preventing processing (transactions) from occuring while
+     * management operations are in progress.
      */
     private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
 
@@ -121,7 +142,7 @@
 
     public BpelServerImpl() {
     }
-    
+
     public void start() {
         _mngmtLock.writeLock().lock();
         try {
@@ -135,43 +156,45 @@
             _contexts.scheduler.start();
             _state = State.RUNNING;
             __log.info(__msgs.msgServerStarted());
-            if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
+            if (_dehydrationPolicy != null)
+                new Thread(new ProcessDefReaper()).start();
         } finally {
             _mngmtLock.writeLock().unlock();
         }
     }
 
     /**
-     * Register a global listener to receive {@link BpelEvent}s froom all
-     * processes.
+     * Register a global listener to receive {@link BpelEvent}s froom all processes.
+     * 
      * @param listener
      */
     public void registerBpelEventListener(BpelEventListener listener) {
         // Do not synchronize, eventListeners is copy-on-write array.
-    	listener.startup(_configProperties);
-    	_contexts.eventListeners.add(listener);
+        listener.startup(_configProperties);
+        _contexts.eventListeners.add(listener);
     }
 
     /**
-     * Unregister a global listener from receive {@link BpelEvent}s from all
-     * processes.
+     * Unregister a global listener from receive {@link BpelEvent}s from all processes.
+     * 
      * @param listener
      */
     public void unregisterBpelEventListener(BpelEventListener listener) {
         // Do not synchronize, eventListeners is copy-on-write array.
-    	try {
-    		listener.shutdown();
-    	} catch (Exception e) {
-    		__log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
-    	} finally {
-    		_contexts.eventListeners.remove(listener);
-    	}
+        try {
+            listener.shutdown();
+        } catch (Exception e) {
+            __log.warn("Stopping BPEL event listener " + listener.getClass().getName()
+                    + " failed, nevertheless it has been unregistered.", e);
+        } finally {
+            _contexts.eventListeners.remove(listener);
+        }
     }
-    
+
     private void unregisterBpelEventListeners() {
-    	for (BpelEventListener l : _contexts.eventListeners) {
-    		unregisterBpelEventListener(l);
-    	}
+        for (BpelEventListener l : _contexts.eventListeners) {
+            unregisterBpelEventListener(l);
+        }
     }
 
     public void stop() {
@@ -185,7 +208,6 @@
             __log.debug("BPEL SERVER STOPPING");
 
             _contexts.scheduler.stop();
-            _engine = null;
             _state = State.INIT;
             __log.info(__msgs.msgServerStopped());
         } finally {
@@ -203,8 +225,6 @@
 
             _db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
             _state = State.INIT;
-            
-            _engine = new BpelEngineImpl(_contexts);
 
         } finally {
             _mngmtLock.writeLock().unlock();
@@ -218,7 +238,6 @@
             unregisterBpelEventListeners();
 
             _db = null;
-            _engine = null;
             _state = State.SHUTDOWN;
         } finally {
             _mngmtLock.writeLock().unlock();
@@ -226,26 +245,6 @@
 
     }
 
-    public BpelEngine getEngine() {
-        boolean registered = false;
-        _mngmtLock.readLock().lock();
-        try {
-            _contexts.scheduler.registerSynchronizer(new Synchronizer() {
-                public void afterCompletion(boolean success) {
-                    _mngmtLock.readLock().unlock();
-                }
-                public void beforeCompletion() {
-                }
-            });
-            registered = true;
-        } finally {
-            // If we failed to register the synchro,then there was an ex/throwable; we need to unlock now.
-            if (!registered)
-                _mngmtLock.readLock().unlock();
-        }
-        return _engine;
-    }
-
     public void register(ProcessConf conf) {
         if (conf == null)
             throw new NullPointerException("must specify non-null process configuration.");
@@ -263,7 +262,7 @@
 
         try {
             // If the process is already active, do nothing.
-            if (_engine.isProcessRegistered(conf.getProcessId())) {
+            if (_registeredProcesses.containsKey(conf.getProcessId())) {
                 __log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
                 return;
             }
@@ -272,8 +271,14 @@
 
             BpelProcess process = new BpelProcess(conf, null);
 
-            _engine.registerProcess(process);
-            _registeredProcesses.add(process);
+            for (Endpoint e : process.getServiceNames()) {
+                __log.debug("Register process: serviceId=" + e + ", process=" + process);
+                _serviceMap.put(e, process);
+            }
+            
+            process.activate(_contexts);
+
+            _registeredProcesses.put(process.getPID(), process);
             process.hydrate();
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
@@ -294,11 +299,10 @@
         }
 
         try {
-            BpelProcess p = null;
-            if (_engine != null) {
-                _engine.unregisterProcess(pid);
-                _registeredProcesses.remove(p);
-            }
+            BpelProcess p = _registeredProcesses.remove(pid);
+            p.deactivate();
+            while (_serviceMap.values().remove(p))
+                ;
 
             __log.info(__msgs.msgProcessUnregistered(pid));
 
@@ -312,7 +316,9 @@
 
     /**
      * Register a global message exchange interceptor.
-     * @param interceptor message-exchange interceptor
+     * 
+     * @param interceptor
+     *            message-exchange interceptor
      */
     public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -321,7 +327,9 @@
 
     /**
      * Unregister a global message exchange interceptor.
-     * @param interceptor message-exchange interceptor
+     * 
+     * @param interceptor
+     *            message-exchange interceptor
      */
     public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -329,6 +337,31 @@
     }
 
     /**
+     * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
+     * would not be registered under the same service qname but different endpoint.
+     * 
+     * @param service
+     *            target service id
+     * @param request
+     *            request message
+     * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
+     */
+    BpelProcess route(QName service, Message request) {
+        // TODO: use the message to route to the correct service if more than
+        // one service is listening on the same endpoint.
+
+        BpelProcess routed = null;
+        for (Endpoint endpoint : _serviceMap.keySet()) {
+            if (endpoint.serviceName.equals(service))
+                routed = _serviceMap.get(endpoint);
+        }
+        if (__log.isDebugEnabled())
+            __log.debug("Routed: svcQname " + service + " --> " + routed);
+        return routed;
+
+    }
+
+    /**
      * Check a state transition from state "i" to state "j".
      */
     private final boolean checkState(State i, State j) {
@@ -361,43 +394,25 @@
     }
 
     public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
-        getEngine().onScheduledJob(jobInfo);
-    }
-    
-    private class ProcessDefReaper implements Runnable {
-        public void run() {
-            __log.debug("Starting process definition reaper thread.");
-            long pollingTime = 10000;
-            try {
-                while (true) {
-                    Thread.sleep(pollingTime);
-                    _mngmtLock.writeLock().lockInterruptibly();
-                    try { 
-                        __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
-                        // Copying the runnning process list to avoid synchronization
-                        // problems and a potential mess if a policy modifies the list
-                        List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses);
-                        CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() {
-                            public boolean isMember(BpelProcess o) {
-                                return !o.hintIsHydrated();
-                            }
-                            
-                        });
-
-                        // And the happy winners are...
-                        List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates);
-                        // Bye bye
-                        for (BpelProcess process : ripped) {
-                            __log.debug("Dehydrating process " + process.getPID());
-                            process.dehydrate();
-                        }
-                    } finally {
-                        _mngmtLock.writeLock().unlock();
-                    }
-                }
-            } catch (InterruptedException e) {
-                __log.info(e);
+        _mngmtLock.readLock().lock();
+        try {
+            WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+            BpelProcess process = _registeredProcesses.get(we.getProcessId());
+            if (process == null) {
+                // If the process is not active, it means that we should not be
+                // doing any work on its behalf, therefore we will reschedule the
+                // events for some time in the future (1 minute).
+                Date future = new Date(System.currentTimeMillis() + (60 * 1000));
+                __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
+                _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+                return;
             }
+
+            process.handleWorkEvent(jobInfo.jobDetail);
+        } catch (Exception ex) {
+            throw new JobProcessorException(ex, true);
+        } finally {
+            _mngmtLock.readLock().unlock();
         }
     }
 
@@ -406,9 +421,9 @@
     }
 
     public void setConfigProperties(Properties configProperties) {
-    	_configProperties = configProperties;
+        _configProperties = configProperties;
     }
-    
+
     public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
         _contexts.mexContext = mexContext;
     }
@@ -422,9 +437,8 @@
     }
 
     /**
-     * Set the DAO connection factory. The DAO is used by the BPEL engine to
-     * persist information about active processes.
-     *
+     * Set the DAO connection factory. The DAO is used by the BPEL engine to persist information about active processes.
+     * 
      * @param daoCF
      *            {@link BpelDAOConnectionFactory} implementation.
      */
@@ -440,4 +454,213 @@
         _contexts.bindingContext = bc;
     }
 
+    public MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService,
+            final String operation, final String clientKey) throws BpelEngineException {
+
+        _mngmtLock.readLock().lock();
+        try {
+            final BpelProcess target = route(targetService, null);
+
+            if (target == null)
+                throw new BpelEngineException("NoSuchService: " + targetService);
+
+            Callable<String> createDao = new Callable<String>() {
+
+                public String call() throws Exception {
+                    MessageExchangeDAO dao;
+                    if (target.isInMemory()) {
+                        dao = _contexts.inMemDao.getConnection().createMessageExchange(
+                                MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+                    } else {
+                        dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+                    }
+                    dao.setInvocationStyle(istyle.toString());
+                    dao.setCorrelationId(clientKey);
+                    dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
+                    dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
+                    dao.setCallee(targetService);
+                    dao.setStatus(Status.NEW.toString());
+                    dao.setOperation(operation);
+                    return dao.getMessageExchangeId();
+                }
+
+            };
+
+            MyRoleMessageExchangeImpl mex;
+            String mexId;
+            switch (istyle) {
+            case ASYNC:
+                try {
+                    mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+                } catch (Exception e) {
+                    __log.error("Internal Error: could not execute isolated transaction.", e);
+                    throw new BpelEngineException("Internal Error", e);
+                }
+                mex = new AsyncMyRoleMessageExchangeImpl(this, mexId);
+                break;
+            case BLOCKING:
+                try {
+                    mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+                } catch (Exception e) {
+                    __log.error("Internal Error: could not execute isolated transaction.", e);
+                    throw new BpelEngineException("Internal Error", e);
+                }
+                mex = new BlockingMyRoleMessageExchangeImpl(this, mexId);
+                break;
+
+            case RELIABLE:
+                assertTransaction();
+                try {
+                    mexId = createDao.call();
+                } catch (Exception e) {
+                    __log.error("Internal Error: could not execute DB calls.", e);
+                    throw new BpelEngineException("Internal Error", e);
+                }
+                mex = new ReliableMyRoleMessageExchangeImpl(this, mexId);
+                break;
+            case TRANSACTED:
+                assertTransaction();
+                try {
+                    mexId = createDao.call();
+                } catch (Exception e) {
+                    __log.error("Internal Error: could not execute DB calls.", e);
+                    throw new BpelEngineException("Internal Error", e);
+                }
+                mex = new TransactedMyRoleMessageExchangeImpl(this, mexId);
+            default:
+                throw new Error("Internal Error: unknown InvocationStyle: " + istyle);
+            }
+
+            target.initMyRoleMex(mex);
+
+            return mex;
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+    public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
+
+        _mngmtLock.readLock().lock();
+        try {
+            final MessageExchangeDAO inmemdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
+
+            Callable<MessageExchange> loadMex = new Callable<MessageExchange>() {
+
+                public MessageExchange call() {
+                    MessageExchangeDAO mexdao = (inmemdao == null) ? mexdao = _contexts.dao.getConnection().getMessageExchange(
+                            mexId) : inmemdao;
+                    if (mexdao == null)
+                        return null;
+
+                    ProcessDAO pdao = mexdao.getProcess();
+                    BpelProcess process = pdao == null ? null : _engine._activeProcesses.get(pdao.getProcessId());
+
+                    if (process == null) {
+                        String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
+                        __log.error(errmsg);
+                        // TODO: Perhaps we should define a checked exception for this
+                        // condition.
+                        throw new BpelEngineException(errmsg);
+                    }
+
+                    InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+                    if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
+                        assertTransaction();
+
+                    switch (mexdao.getDirection()) {
+                    case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
+                        return process.createPartnerRoleMex(mexdao);
+                    case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
+                        return process.createMyRoleMex(mexdao);
+                    default:
+                        String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
+                        __log.fatal(errmsg);
+                        throw new BpelEngineException(errmsg);
+                    }
+                }
+            };
+
+            try {
+                if (inmemdao != null)
+                    return loadMex.call();
+
+                return _contexts.scheduler.execIsolatedTransaction(loadMex).get();
+            } catch (ContextException e) {
+                throw new BpelEngineException(e);
+            } catch (Exception e) {
+                throw new BpelEngineException(e);
+            }
+
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+
+    }
+
+    public MessageExchange getMessageExchangeByForeignKey(String foreignKey) throws BpelEngineException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
+
+        _mngmtLock.readLock().lock();
+        try {
+
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+    
+    void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
+        WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
+
+    }
+
+    protected void assertTransaction() {
+        if (!_contexts.scheduler.isTransacted())
+            throw new BpelEngineException("Operation must be performed in a transaction!");
+    }
+
+    private class ProcessDefReaper implements Runnable {
+        public void run() {
+            __log.debug("Starting process definition reaper thread.");
+            long pollingTime = 10000;
+            try {
+                while (true) {
+                    Thread.sleep(pollingTime);
+                    _mngmtLock.writeLock().lockInterruptibly();
+                    try {
+                        __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
+                        // Copying the runnning process list to avoid synchronization
+                        // problems and a potential mess if a policy modifies the list
+                        List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses.values());
+                        CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() {
+                            public boolean isMember(BpelProcess o) {
+                                return !o.hintIsHydrated();
+                            }
+
+                        });
+
+                        // And the happy winners are...
+                        List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates);
+                        // Bye bye
+                        for (BpelProcess process : ripped) {
+                            __log.debug("Dehydrating process " + process.getPID());
+                            process.dehydrate();
+                        }
+                    } finally {
+                        _mngmtLock.writeLock().unlock();
+                    }
+                }
+            } catch (InterruptedException e) {
+                __log.info(e);
+            }
+        }
+    }
+
+    
+    
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -127,6 +127,11 @@
         _mexId = mexId;
     }
 
+    @Override
+    public boolean equals(Object other) {
+        return _mexId.equals(((MessageExchangeImpl)other)._mexId);
+    }
+    
     void load(MessageExchangeDAO dao) {
         if (dao.getMessageExchangeId().equals(_mexId))
             throw new IllegalArgumentException("MessageExchangeId mismatch!");
@@ -147,6 +152,8 @@
             _status = Status.valueOf(dao.getStatus());
         if (_istyle == null)
             _istyle = InvocationStyle.valueOf(dao.getInvocationStyle());
+        if (_timeout == null)  // TODO: custom timeout
+            _timeout = new Date(dao.getCreateTime().getTime() + 60000); 
     }
 
     public void save(MessageExchangeDAO dao) {
@@ -269,11 +276,12 @@
         });
     }
 
-    void setPortOp(PortType portType, Operation operation) {
+    void init(PortType portType, Operation operation, MessageExchangePattern pattern) {
         if (__log.isTraceEnabled())
             __log.trace("Mex[" + getMessageExchangeId() + "].setPortOp(" + portType + "," + operation + ")");
         _portType = portType;
         _operation = operation;
+        _pattern = pattern;
     }
 
     void setFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
@@ -421,4 +429,6 @@
         public T call(MessageExchangeDAO mexdao);
     }
 
+    
 }
+

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -7,13 +7,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -115,21 +113,16 @@
         we1.setMexId(_mexId);
         
         setStatus(Status.ASYNC);
-        if (target.isInMemory()) {
-            _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
-            _engine._contexts.scheduler.scheduleVolatileJob(true, we1.getDetail());
-        } else {
-            doInTX(new InDbAction<Void>() {
-
-                public Void call(MessageExchangeDAO mexdao) {
-                    _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-                    _engine._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
-                    return null;
-                }
+        doInTX(new InDbAction<Void>() {
 
-            });
+            public Void call(MessageExchangeDAO mexdao) {
+                _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+                _engine._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
+                return null;
+            }
+
+        });
 
-        }
     }
     
 
@@ -168,5 +161,12 @@
         return true;
     }
 
-    
+
+    /**
+     * Callback.
+     * 
+     * @param mexdao
+     */
+    protected void onMessageExchangeComplete(MessageExchangeDAO mexdao) {
+    }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Wed Jun 27 17:43:48 2007
@@ -24,6 +24,7 @@
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.common.InvalidMessageException;
 import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.MessageRouteDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -34,6 +35,7 @@
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -55,6 +57,7 @@
  */
 class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
     private static final Log __log = LogFactory.getLog(BpelProcess.class);
+
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
     /** The local endpoint for this "myrole". */
@@ -79,28 +82,24 @@
 
     /**
      * Called when an input message has been received.
-     *
+     * 
      * @param mex
      *            exchange to which the message is related
      */
-    public void invokeMyRole(ReliableMyRoleMessageExchangeImpl mex) {
+    public void invokeMyRole(MessageExchangeDAO mex) {
         if (__log.isTraceEnabled()) {
-            __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
-                    "messageExchange", mex }));
+            __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
         }
 
-        Operation operation = getMyRoleOperation(mex.getOperationName());
+        Operation operation = getMyRoleOperation(mex.getOperation());
         if (operation == null) {
-            __log.error(__msgs.msgUnknownOperation(mex.getOperationName(), _plinkDef.myRolePortType.getQName()));
-            mex.setFailure(MessageExchange.FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
+            __log.error(__msgs.msgUnknownOperation(mex.getOperation(), _plinkDef.myRolePortType.getQName()));
+            mex.setStatus(Status.FAILURE.toString());
+            mex.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
+            mex.setFaultExplanation(mex.getOperation());
             return;
         }
 
-        mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
-        mex.setPortOp(_plinkDef.myRolePortType, operation);
-        mex.setPattern(operation.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
-                : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE);
-
         // Is this a /possible/ createInstance Operation?
         boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
 
@@ -118,26 +117,28 @@
         MessageRouteDAO messageRoute = null;
 
         // We need to compute the correlation keys (based on the operation
-        // we can  infer which correlation keys to compute - this is merely a set
+        // we can infer which correlation keys to compute - this is merely a set
         // consisting of each correlationKey used in each correlation sets
         // that is ever referenced in an <receive>/<onMessage> on this
         // partnerlink/operation.
         try {
-            keys = computeCorrelationKeys(mex);
+            keys = computeCorrelationKeys(mex, operation);
         } catch (InvalidMessageException ime) {
             // We'd like to do a graceful exit here, no sense in rolling back due to a
             // a message format problem.
-            __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
-            mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
+            __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
+            mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
+            mex.setStatus(Status.FAILURE.toString());
+            mex.setFaultExplanation(ime.getMessage());
+
             return;
         }
 
         String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
         String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
         if (__log.isDebugEnabled()) {
-            __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
-                    + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
-                    + " partnerSessionId=" + partnerSessionId);
+            __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
+                    + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
         }
 
         CorrelationKey matchedKey = null;
@@ -169,26 +170,25 @@
                 throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
             }
 
-            if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
-                __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
-                return;
-            }
+            // if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+            // __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+            // return;
+            // }
 
             ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
 
-            BpelRuntimeContextImpl instance = _process
-                    .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
+            BpelRuntimeContextImpl instance = _process.createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
 
             // send process instance event
-            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace,
-                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
-            evt.setPortType(mex.getPortType().getQName());
+            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace, _process
+                    .getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+            evt.setPortType(mex.getPortType());
             evt.setOperation(operation.getName());
             evt.setMexId(mex.getMessageExchangeId());
             _process._debugger.onEvent(evt);
             _process.saveEvent(evt, newInstance);
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE);
-            mex.getDAO().setInstance(newInstance);
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+            mex.setInstance(newInstance);
 
             instance.execute();
         } else if (messageRoute != null) {
@@ -208,10 +208,9 @@
             correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
 
             // send process instance event
-            CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace,
-                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(),
-                    instanceDao.getInstanceId(), matchedKey);
-            evt.setPortType(mex.getPortType().getQName());
+            CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace, _process
+                    .getOProcess().getName()), _process.getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+            evt.setPortType(mex.getPortType());
             evt.setOperation(operation.getName());
             evt.setMexId(mex.getMessageExchangeId());
 
@@ -219,35 +218,32 @@
             // store event
             _process.saveEvent(evt, instanceDao);
 
-            // EXPERIMENTAL -- LOCK
-            // instanceDao.lock();
-
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
-            mex.getDAO().setInstance(messageRoute.getTargetInstance());
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
+            mex.setInstance(messageRoute.getTargetInstance());
             instance.execute();
         } else {
             if (__log.isDebugEnabled()) {
                 __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
             }
 
-            if (!mex.isAsynchronous()) {
-                mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
-
-            } else {
-                // send event
-                CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
-                        .getOperation().getName(), mex.getMessageExchangeId(), keys);
-
-                evt.setProcessId(_process.getProcessDAO().getProcessId());
-                evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
-                _process._debugger.onEvent(evt);
+            // TODO: Revist (BART)
+            // if (!mex.isAsynchronous()) {
+            // mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+            //
+            // } else {
+            // send event
+            CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType(), mex.getOperation(),
+                    mex.getMessageExchangeId(), keys);
 
-                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+            evt.setProcessId(_process.getProcessDAO().getProcessId());
+            evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+            _process._debugger.onEvent(evt);
 
-                // No match, means we add message exchange to the queue.
-                correlator.enqueueMessage(mex.getDAO(), keys);
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED.toString());
 
-            }
+            // No match, means we add message exchange to the queue.
+            correlator.enqueueMessage(mex, keys);
+            // }
         }
 
         // Now we have to update our message exchange status. If the <reply>
@@ -255,8 +251,8 @@
         // invocation, then we will be in the "REQUEST" phase which means
         // that either this was a one-way
         // or a two-way that needs to delivery the reply asynchronously.
-        if (mex.getStatus() == MessageExchange.Status.REQUEST) {
-            mex.setStatus(MessageExchange.Status.ASYNC);
+        if (Status.valueOf(mex.getStatus()) == MessageExchange.Status.REQUEST) {
+            mex.setStatus(MessageExchange.Status.ASYNC.toString());
         }
     }
 
@@ -266,17 +262,16 @@
         return op;
     }
 
-    private CorrelationKey[] computeCorrelationKeys(ReliableMyRoleMessageExchangeImpl mex) {
-        Operation operation = mex.getOperation();
-        Element msg = mex.getRequest().getMessage();
+    private CorrelationKey[] computeCorrelationKeys(MessageExchangeDAO mex, Operation operation) {
+        Element msg = mex.getRequest().getData();
         javax.wsdl.Message msgDescription = operation.getInput().getMessage();
         List<CorrelationKey> keys = new ArrayList<CorrelationKey>();
 
         Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
 
         for (OScope.CorrelationSet cset : csets) {
-            CorrelationKey key = computeCorrelationKey(cset,
-                    _process.getOProcess().messageTypes.get(msgDescription.getQName()), msg);
+            CorrelationKey key = computeCorrelationKey(cset, _process.getOProcess().messageTypes.get(msgDescription.getQName()),
+                    msg);
             keys.add(key);
         }
 
@@ -288,8 +283,7 @@
         return keys.toArray(new CorrelationKey[keys.size()]);
     }
 
-    private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
-            Element msg) {
+    private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, Element msg) {
         String[] values = new String[cset.properties.size()];
 
         int jIdx = 0;
@@ -300,8 +294,8 @@
             if (alias == null) {
                 // TODO: Throw a real exception! And catch this at compile
                 // time.
-                throw new IllegalArgumentException("No alias matching property '" + property.name
-                        + "' with message type '" + messagetype + "'");
+                throw new IllegalArgumentException("No alias matching property '" + property.name + "' with message type '"
+                        + messagetype + "'");
             }
 
             String value;

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -63,7 +63,7 @@
         _myRoleEPR = myRoleEPR;
         _partnerRoleChannel = channel;
         _inMem = inMem;
-        setPortOp(portType, operation);    
+        init(portType, operation);    
     }
 
     @Override

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Wed Jun 27 17:43:48 2007
@@ -8,8 +8,10 @@
 
 public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
-    ReliablePartnerRoleMessageExchangeImpl();
+    public ReliablePartnerRoleMessageExchangeImpl(BpelEngineImpl impl, String messageExchangeId, PortType ptype, Operation op, boolean b, Object object, EndpointReference reference, PartnerRoleChannel partnerRoleChannel) {
         // TODO Auto-generated constructor stub
     }
+
+
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=551417&r1=551416&r2=551417
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Wed Jun 27 17:43:48 2007
@@ -19,6 +19,14 @@
 
 package org.apache.ode.bpel.memdao;
 
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
@@ -26,13 +34,6 @@
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.w3c.dom.Element;
 
-import javax.xml.namespace.QName;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
 public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO {
 
     private String messageExchangeId;
@@ -58,7 +59,9 @@
 	private QName callee;
 	private Properties properties = new Properties();
     private PartnerLinkDAOImpl _plink;
-    private String pipedMessageExchangeId;
+    private String _istyle;
+    private MessageExchangeDAO _pipedExchange;
+    private String _failureType;
 
 	public MessageExchangeDAOImpl(char direction, String messageEchangeId){
 		this.direction = direction;
@@ -263,13 +266,6 @@
         return retVal;
     }
 
-    public String getPipedMessageExchangeId() {
-        return pipedMessageExchangeId;
-    }
-
-    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
-        this.pipedMessageExchangeId = pipedMessageExchangeId;
-    }
 
     public void release() {
         instance = null;
@@ -282,5 +278,28 @@
 
     public String toString() {
         return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")";
+    }
+
+    public String getInvocationStyle() {
+        return _istyle;
+    }
+
+    public MessageExchangeDAO getPipedMessageExchange() {
+        return _pipedExchange;
+    }
+
+    public void setFailureType(String failureType) {
+        _failureType = failureType;
+        
+    }
+
+    public void setInvocationStyle(String invocationStyle) {
+        _istyle = invocationStyle;
+        
+    }
+
+    public void setPipedMessageExchange(MessageExchangeDAO mex) {
+        _pipedExchange = mex;
+        
     }
 }



Mime
View raw message