ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r549166 - in /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine: BpelProcess.java BpelServerImpl.java
Date Wed, 20 Jun 2007 17:40:24 GMT
Author: mszefler
Date: Wed Jun 20 10:40:23 2007
New Revision: 549166

URL: http://svn.apache.org/viewvc?view=rev&rev=549166
Log:
Merge issue with hydration latch.

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=549166&r1=549165&r2=549166
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Wed Jun 20 10:40:23 2007
@@ -543,6 +543,17 @@
 
     }
 
+    void hydrate() {
+        _hydrationLatch.latch(1);
+
+        try {
+            // We don't actually need to do anything, the latch will run the doHydrate method
+            // when necessary..
+        } finally {
+            _hydrationLatch.release(1);
+        }
+    }
+    
     OProcess getOProcess() {
         _hydrationLatch.latch(1);
         try {

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=549166&r1=549165&r2=549166
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
(original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Wed Jun 20 10:40:23 2007
@@ -18,6 +18,15 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -39,14 +48,8 @@
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.utils.msg.MessageBundle;
-
-import javax.xml.namespace.QName;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.MemberOfFunction;
 
 /**
  * <p>
@@ -65,7 +68,7 @@
  * @author Maciej Szefler <mszefler at gmail dot com>
  * @author Matthieu Riou <mriou at apache dot org>
  */
-public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor, ProcessLifecycleCallback
{
+public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
 
     private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -73,7 +76,11 @@
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
-    private final Set<BpelProcess> _runningProcesses = new HashSet<BpelProcess>();
+    /** 
+     * Set of processes that are registered with the server. Includes hydrated and dehydrated
processes.
+     * Guarded by _mngmtLock.writeLock(). 
+     */
+    private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
 
     private State _state = State.SHUTDOWN;
     private Contexts _contexts = new Contexts();
@@ -219,7 +226,6 @@
             if (!registered)
                 _mngmtLock.readLock().unlock();
         }
-        assert registered;
         return _engine;
     }
 
@@ -247,13 +253,10 @@
 
             __log.debug("Registering process " + conf.getProcessId() + " with server.");
 
-            BpelProcess process = new BpelProcess(conf, null, this);
+            BpelProcess process = new BpelProcess(conf, null);
 
             _engine.registerProcess(process);
-            
-            synchronized(_runningProcesses) {
-                _runningProcesses.add(process);
-            }
+            _registeredProcesses.add(process);
             process.hydrate();
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
@@ -277,9 +280,7 @@
             BpelProcess p = null;
             if (_engine != null) {
                 _engine.unregisterProcess(pid);
-                synchronized(_runningProcesses) {
-                    _runningProcesses.remove(p);
-                }
+                _registeredProcesses.remove(p);
             }
 
             __log.info(__msgs.msgProcessUnregistered(pid));
@@ -293,50 +294,6 @@
     }
 
     /**
-     * If necessary, create an object in the data store to represent the
-     * process. We'll re-use an existing object if it already exists and matches
-     * the GUID.
-     */
-    private void bounceProcessDAO(BpelDAOConnection conn, final QName pid,
-                                  final long version, final OProcess oprocess) {
-        __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
-        try {
-            boolean create = true;
-            ProcessDAO old = conn.getProcess(pid);
-            if (old != null) {
-                __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
-                if (oprocess.guid == null) {
-                    // No guid, old version assume its good
-                    create = false;
-                } else {
-                    if (old.getGuid().equals(oprocess.guid)) {
-                        // Guids match, no need to create
-                        create = false;
-                    } else {
-                        // GUIDS dont match, delete and create new
-                        String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match
"
-                                + oprocess.guid + "; replacing.";
-                        __log.debug(errmsg);
-                        old.delete();
-                    }
-                }
-            }
-
-            if (create) {
-                ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid,
(int)version);
-                for (String correlator : oprocess.getCorrelators()) {
-                    newDao.addCorrelator(correlator);
-                }
-            }
-        } catch (BpelEngineException ex) {
-            throw ex;
-        } catch (Exception dce) {
-            __log.error("DbError", dce);
-            throw new BpelEngineException("DbError", dce);
-        }
-    }
-
-    /**
      * Register a global message exchange interceptor.
      * @param interceptor message-exchange interceptor
      */
@@ -389,34 +346,7 @@
     public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
         getEngine().onScheduledJob(jobInfo);
     }
-
-    public void hydrated(final BpelProcess process) {
-        // Recreating the process DAO if the definition has changed, shouldn't do anything
-        // except after a redeploy
-        if (_contexts.scheduler.isTransacted() || process.isInMemory())
-            bounceProcessDAO(process.isInMemory() ? _contexts.inMemDao.getConnection() :
_contexts.dao.getConnection(),
-                    process.getPID(), process._pconf.getVersion(), process.getOProcess());
-        else {
-            try {
-                _db.exec(new BpelDatabase.Callable<Object>() {
-                    public Object run(BpelDAOConnection conn) throws Exception {
-                        bounceProcessDAO(conn, process.getPID(),
-                                process._pconf.getVersion(), process.getOProcess());
-                        return null;
-                    }
-                });
-            } catch (Exception ex) {
-                String errmsg = "DbError";
-                __log.error(errmsg, ex);
-                throw new BpelEngineException(errmsg, ex);
-            }
-        }
-
-        synchronized (_runningProcesses) {
-            _runningProcesses.add(process);
-        }
-    }
-
+    
     private class ProcessDefReaper implements Runnable {
         public void run() {
             __log.debug("Starting process definition reaper thread.");
@@ -429,19 +359,20 @@
                         __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
                         // Copying the runnning process list to avoid synchronization
                         // problems and a potential mess if a policy modifies the list
-                        List<BpelProcess> runningProcesses;
-                        synchronized(_runningProcesses) {
-                            runningProcesses = new ArrayList<BpelProcess>(_runningProcesses);
-                        }
+                        List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses);
+                        CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>()
{
+                            public boolean isMember(BpelProcess o) {
+                                return !o.hintIsHydrated();
+                            }
+                            
+                        });
+
                         // And the happy winners are...
-                        List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(runningProcesses);
+                        List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates);
                         // Bye bye
                         for (BpelProcess process : ripped) {
                             __log.debug("Dehydrating process " + process.getPID());
                             process.dehydrate();
-                            synchronized (_runningProcesses) {
-                                _runningProcesses.remove(process);
-                            }
                         }
                     } finally {
                         _mngmtLock.writeLock().unlock();



Mime
View raw message