ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mr...@apache.org
Subject svn commit: r551300 - in /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao: BpelDAOConnectionImpl.java ProcessDaoImpl.java
Date Wed, 27 Jun 2007 19:52:36 GMT
Author: mriou
Date: Wed Jun 27 12:52:35 2007
New Revision: 551300

URL: http://svn.apache.org/viewvc?view=rev&rev=551300
Log:
A bit more careful about in-memory. Making sure processes don't pile up.

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=551300&r1=551299&r2=551300
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
Wed Jun 27 12:52:35 2007
@@ -46,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -53,11 +54,13 @@
  */
 class BpelDAOConnectionImpl implements BpelDAOConnection {
     private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
+    public static long TIME_TO_LIVE = 10*60*1000;
 
     private Scheduler _scheduler;
     private Map<QName, ProcessDaoImpl> _store;
     private List<BpelEvent> _events = new LinkedList<BpelEvent>();
     private static Map<String,MessageExchangeDAO> _mexStore = Collections.synchronizedMap(new
HashMap<String,MessageExchangeDAO>());
+    protected static Map<String, Long> _mexAge = new ConcurrentHashMap<String, Long>();
     private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
 
     BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, Scheduler scheduler) {
@@ -172,9 +175,28 @@
     }
 
     public MessageExchangeDAO createMessageExchange(char dir) {
-        String id = Long.toString(counter.getAndIncrement());
+        final String id = Long.toString(counter.getAndIncrement());
         MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir,id);
         _mexStore.put(id,mex);
+        _mexAge.put(id, System.currentTimeMillis());
+
+        ArrayList<String> removals = new ArrayList<String>();
+        for (Map.Entry<String, Long> entry : _mexAge.entrySet()) {
+            if (System.currentTimeMillis() - entry.getValue() > TIME_TO_LIVE) {
+                removeMessageExchange(entry.getKey());
+                removals.add(entry.getKey());
+            }
+        }
+        for (String removal : removals) _mexAge.remove(removal);
+
+        // Removing right away on rollback
+        onRollback(new Runnable() {
+            public void run() {
+                removeMessageExchange(id);
+                _mexAge.remove(id);
+            }
+        });
+
         return mex;
     }
 
@@ -297,7 +319,7 @@
         __log.debug("Removing mex " + mexId + " from memory store.");
         MessageExchangeDAO mex = _mexStore.remove(mexId);
         if (mex == null)
-            __log.warn("Couldn't find mex " + mexId + " for cleanup.");
+            __log.debug("Couldn't find mex " + mexId + " for cleanup.");
     }
 
     public void defer(final Runnable runnable) {
@@ -306,6 +328,15 @@
             }
             public void beforeCompletion() {
                 runnable.run();
+            }
+        });
+    }
+    public void onRollback(final Runnable runnable) {
+        _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+            public void afterCompletion(boolean success) {
+                if (!success) runnable.run();
+            }
+            public void beforeCompletion() {
             }
         });
     }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=551300&r1=551299&r2=551300
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
Wed Jun 27 12:52:35 2007
@@ -47,6 +47,7 @@
     private long _version;
     final Map<String, CorrelatorDaoImpl> _correlators = new ConcurrentHashMap<String,
CorrelatorDaoImpl>();
     protected final Map<Long, ProcessInstanceDAO> _instances = new ConcurrentHashMap<Long,
ProcessInstanceDAO>();
+    protected final Map<Long, Long> _instancesAge = new ConcurrentHashMap<Long,
Long>();
     protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer,
PartnerLinkDAO>();
     private Map<QName, ProcessDaoImpl> _store;
     private BpelDAOConnectionImpl _conn;
@@ -107,6 +108,28 @@
                 _instances.put(newInstance.getInstanceId(), newInstance);
             }
         });
+        _instancesAge.put(newInstance.getInstanceId(), System.currentTimeMillis());
+
+        // Checking for old instances that could still be around because of a failure
+        // or completion problem
+        ArrayList<Long> removals = new ArrayList<Long>();
+        for (Map.Entry<Long, Long> entry : _instancesAge.entrySet()) {
+            if (System.currentTimeMillis() - entry.getValue() > BpelDAOConnectionImpl.TIME_TO_LIVE)
{
+                ProcessInstanceDAO idao = _instances.remove(entry.getKey());
+                removals.add(entry.getKey());
+            }
+        }
+        for (Long removal : removals) _instancesAge.remove(removal);
+
+        // Removing right away on rollback
+        final Long iid = newInstance.getInstanceId();
+        _conn.onRollback(new Runnable() {
+            public void run() {
+                _instances.remove(iid);
+                _instancesAge.remove(iid);
+            }
+        });
+
         _executionCount++;
         return newInstance;
     }
@@ -114,7 +137,6 @@
     public ProcessInstanceDAO getInstance(Long instanceId) {
         return _instances.get(instanceId);
     }
-
 
     public Collection<ProcessInstanceDAO> findInstance(CorrelationKey key) {
         ArrayList<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>();



Mime
View raw message