ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boisv...@apache.org
Subject svn commit: r545337 - in /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/BpelRuntimeContextImpl.java memdao/ProcessInstanceDaoImpl.java
Date Thu, 07 Jun 2007 22:29:13 GMT
Author: boisvert
Date: Thu Jun  7 15:29:09 2007
New Revision: 545337

URL: http://svn.apache.org/viewvc?view=rev&rev=545337
Log:
Avoid serialization of process state for in-memory instances


Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=545337&r1=545336&r2=545337
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Thu Jun  7 15:29:09 2007
@@ -49,6 +49,7 @@
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OMessageVarType.Part;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -90,9 +91,6 @@
 
     private static final Log __log = LogFactory.getLog(BpelRuntimeContextImpl.class);
 
-    // private static final Messages __msgs =
-    // MessageBundle.getMessages(Messages.class);
-
     /** Data-access object for process instance. */
     private ProcessInstanceDAO _dao;
 
@@ -100,14 +98,14 @@
     private final Long _iid;
 
     /** JACOB VPU */
-    protected JacobVPU vpu;
+    protected JacobVPU _vpu;
 
     /** JACOB ExecutionQueue (state) */
-    protected ExecutionQueueImpl soup;
+    protected ExecutionQueueImpl _soup;
 
     private MyRoleMessageExchangeImpl _instantiatingMessageExchange;
 
-    private OutstandingRequestManager _outstandingRequests = new OutstandingRequestManager();
+    private OutstandingRequestManager _outstandingRequests;
 
     private BpelProcess _bpelProcess;
 
@@ -120,25 +118,41 @@
         _dao = dao;
         _iid = dao.getInstanceId();
         _instantiatingMessageExchange = instantiatingMessageExchange;
-        initVPU();
+        _vpu = new JacobVPU();
+        _vpu.registerExtension(BpelRuntimeContext.class, this);
 
+        if (bpelProcess.isInMemory()) {
+            ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
+            if (inmem.getSoup() != null) {
+                _soup = (ExecutionQueueImpl) inmem.getSoup();
+            }
+        } else {
         byte[] daoState = dao.getExecutionState();
         if (daoState != null) {
             ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
             try {
-                soup.read(iis);
-                _outstandingRequests = (OutstandingRequestManager) soup.getGlobalData();
+                    _soup.read(iis);
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             }
         }
+        }
+
+        if (_soup == null) {
+            _soup = new ExecutionQueueImpl(null);
+            _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+            _outstandingRequests = new OutstandingRequestManager();
+        } else {
+            _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
+        }
+        _vpu.setContext(_soup);
 
         if (PROCESS != null) {
-            vpu.inject(PROCESS);
+            _vpu.inject(PROCESS);
         }
 
         if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED
STATE=" + soup.getIndex());
+            __log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED
STATE=" + _soup.getIndex());
         }
     }
 
@@ -818,20 +832,26 @@
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
         boolean canReduce = true;
         while (ProcessState.canExecute(_dao.getState()) && System.currentTimeMillis()
< maxTime && canReduce) {
-            canReduce = vpu.execute();
+            canReduce = _vpu.execute();
         }
         _dao.setLastActiveTime(new Date());
         if (!ProcessState.isFinished(_dao.getState())) {
+            if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance
" + _iid);
+            _soup.setGlobalData(_outstandingRequests);
+            
+            if (_bpelProcess.isInMemory()) {
+                // don't serialize in-memory processes
+                ((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
+            } else {
             ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
             try {
-                soup.setGlobalData(_outstandingRequests);
-                soup.write(bos);
+                    _soup.write(bos);
                 bos.close();
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             }
             _dao.setExecutionState(bos.toByteArray());
-            __log.debug("Setting execution state on instance " + _iid);
+            }
 
             if (ProcessState.canExecute(_dao.getState()) && canReduce) {
                 // Max time exceeded (possibly an infinite loop).
@@ -874,7 +894,7 @@
         _outstandingRequests.associate(responsechannel, mex.getMessageExchangeId());
 
         final String mexId = mex.getMessageExchangeId();
-        vpu.inject(new JacobRunnable() {
+        _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 3168964409165899533L;
 
             public void run() {
@@ -895,7 +915,7 @@
             return;
         }
 
-        vpu.inject(new JacobRunnable() {
+        _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = -7767141033611036745L;
 
             public void run() {
@@ -913,7 +933,7 @@
         _dao.getProcess().removeRoutes(id, _dao);
         _outstandingRequests.cancel(id);
 
-        vpu.inject(new JacobRunnable() {
+        _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 6157913683737696396L;
 
             public void run() {
@@ -936,7 +956,7 @@
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("Invoking message response for mexid " + mexid + " and channel "
+ responseChannelId);
         }
-        vpu.inject(new BpelJacobRunnable() {
+        _vpu.inject(new BpelJacobRunnable() {
             private static final long serialVersionUID = -1095444335740879981L;
 
             public void run() {
@@ -999,14 +1019,6 @@
         _bpelProcess.saveEvent(event, _dao);
     }
 
-    private void initVPU() {
-        vpu = new JacobVPU();
-        vpu.registerExtension(BpelRuntimeContext.class, this);
-        soup = new ExecutionQueueImpl(null);
-        soup.setReplacementMap(_bpelProcess.getReplacementMap());
-        vpu.setContext(soup);
-    }
-
     /**
      * We record all values of properties of a 'MessageType' variable for
      * efficient lookup.
@@ -1242,7 +1254,7 @@
     }
 
     public void recoverActivity(final String channel, final long activityId, final String
action, final FaultData fault) {
-        vpu.inject(new JacobRunnable() {
+        _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 3168964409165899533L;
 
             public void run() {

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=545337&r1=545336&r2=545337
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Thu Jun  7 15:29:09 2007
@@ -48,14 +48,14 @@
  * A very simple, in-memory implementation of the {@link ProcessInstanceDAO}
  * interface.
  */
-class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO {
+public class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO {
     private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList();
 
     private short _previousState;
     private short _state;
     private Long _instanceId;
     private ProcessDaoImpl _processDao;
-    private byte[] _jacobState;
+    private Object _soup;
     private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
     private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String,
List<ScopeDAO>>();
     private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>();
@@ -76,7 +76,7 @@
         _state = 0;
         _processDao = processDao;
         _instantiatingCorrelator = correlator;
-        _jacobState = null;
+        _soup = null;
         _instanceId = IdGen.newProcessId();
         _conn = conn;
     }
@@ -125,11 +125,19 @@
      * @see ProcessInstanceDAO#getExecutionState()
      */
     public byte[] getExecutionState() {
-        return _jacobState;
+        throw new IllegalStateException("In-memory instances are never serialized");
     }
 
     public void setExecutionState(byte[] bytes) {
-        _jacobState = bytes;
+        throw new IllegalStateException("In-memory instances are never serialized");
+    }
+
+    public Object getSoup() {
+        return _soup;
+    }
+
+    public void setSoup(Object soup) {
+        _soup = soup;
     }
 
     public byte[] getMessageExchange(String identifier) {



Mime
View raw message