oodt-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattm...@apache.org
Subject svn commit: r1157889 - in /oodt/trunk: ./ workflow/src/main/java/org/apache/oodt/cas/workflow/engine/
Date Mon, 15 Aug 2011 15:31:37 GMT
Author: mattmann
Date: Mon Aug 15 15:31:36 2011
New Revision: 1157889

URL: http://svn.apache.org/viewvc?rev=1157889&view=rev
Log:
- fix for OODT-309 SequentialWorkflowProcessor doesn't need to be a Thread

Modified:
    oodt/trunk/CHANGES.txt
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingIterativeWorkflowProcessorThread.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingShepardThread.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingThreadPoolWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java

Modified: oodt/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Mon Aug 15 15:31:36 2011
@@ -4,7 +4,10 @@ Apache OODT Change Log
 Release 0.4: Current Development
 --------------------------------------------
 
-* OODT-215 WorkflowInstances should have pre-conditions as 
+* OODT-309 SequentialWorkflowProcessor doesn't need to be a 
+  Thread (mattmann)
+
+* OODT-205 WorkflowInstances should have pre-conditions as 
   well (mattmann)
 
 * OODT-306 Added FileManager Tool Aliases (mattmann, goodale)

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingIterativeWorkflowProcessorThread.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingIterativeWorkflowProcessorThread.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingIterativeWorkflowProcessorThread.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingIterativeWorkflowProcessorThread.java
Mon Aug 15 15:31:36 2011
@@ -113,12 +113,12 @@ public class NonBlockingIterativeWorkflo
       running = true;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Runnable#run()
+    /* (non-Javadoc)
+     * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#start()
      */
-    public void run() {
+    @Override
+    public void start() {
+     
         /*
          * okay, we got into the run method, mark the start date time for the
          * workflow instance here

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingShepardThread.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingShepardThread.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingShepardThread.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingShepardThread.java
Mon Aug 15 15:31:36 2011
@@ -17,34 +17,33 @@
 
 package org.apache.oodt.cas.workflow.engine;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.Vector;
 
-import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
-import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.Workflow;
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
 import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
-import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
 import org.apache.oodt.cas.metadata.Metadata;
 
 public class NonBlockingShepardThread implements WorkflowStatus, CoreMetKeys, Runnable {
 
 	private NonBlockingThreadPoolWorkflowEngine engine;
 	
+	private ConditionEvaluator evalutor;
+	
 	/* our log stream */
 	 private static final Logger LOG = Logger
      .getLogger(ThreadPoolWorkflowEngine.class.getName());
 	
 	public NonBlockingShepardThread(NonBlockingThreadPoolWorkflowEngine engine){
 		this.engine = engine;
+		this.evalutor = new ConditionEvaluator();
 	}
 	
 	public void run() {
@@ -68,7 +67,7 @@ public class NonBlockingShepardThread im
 				
 				//check preconditions on current task
 				if (task.getConditions() != null) {
-	                if(!satisfied(wInst)) {
+	                if(!this.evalutor.satisfied(task.getConditions(), task.getTaskId(), wInst.getSharedContext()))
{
 
 	                    /*LOG.log(Level.FINEST, "Pre-conditions for task: "
 	                            + task.getTaskName() + " unsatisfied.");*/
@@ -123,54 +122,6 @@ public class NonBlockingShepardThread im
         return true;
     }
 	
-	private boolean satisfied(WorkflowInstance wInst) {
-    	String taskId = wInst.getCurrentTaskId();
-    	WorkflowTask task = getTaskById(wInst.getWorkflow(), taskId);
-    	List conditionList = task.getConditions(); 
-    	
-        for (Iterator i = conditionList.iterator(); i.hasNext();) {
-            WorkflowCondition c = (WorkflowCondition) i.next();
-            WorkflowConditionInstance cInst = null;
-
-            // see if we've already cached this condition instance
-            if (engine.CONDITION_CACHE.get(taskId) != null) {
-                HashMap conditionMap = (HashMap) engine.CONDITION_CACHE.get(taskId);
-
-                /*
-                 * okay we have some conditions cached for this task, see if we
-                 * have the one we need
-                 */
-                if (conditionMap.get(c.getConditionId()) != null) {
-                    cInst = (WorkflowConditionInstance) conditionMap.get(c
-                            .getConditionId());
-                }
-                /* if not, then go ahead and create it and cache it */
-                else {
-                    cInst = GenericWorkflowObjectFactory
-                            .getConditionObjectFromClassName(c
-                                    .getConditionInstanceClassName());
-                    conditionMap.put(c.getConditionId(), cInst);
-                }
-            }
-            /* no conditions cached yet, so set everything up */
-            else {
-                HashMap conditionMap = new HashMap();
-                cInst = GenericWorkflowObjectFactory
-                        .getConditionObjectFromClassName(c
-                                .getConditionInstanceClassName());
-                conditionMap.put(c.getConditionId(), cInst);
-                engine.CONDITION_CACHE.put(taskId, conditionMap);
-            }
-
-            // actually perform the evaluation
-            if (!cInst.evaluate(wInst.getSharedContext(), c.getTaskConfig())) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-	
 	
 	  private WorkflowTask getTaskById(Workflow w, String Id){
 	    	List tasks = w.getTasks();

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingThreadPoolWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingThreadPoolWorkflowEngine.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingThreadPoolWorkflowEngine.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/NonBlockingThreadPoolWorkflowEngine.java
Mon Aug 15 15:31:36 2011
@@ -30,7 +30,7 @@ import org.apache.oodt.cas.workflow.stru
 import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
 import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
 import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
-import org.apache.oodt.cas.workflow.engine.SequentialWorkflowProcessor;
+import org.apache.oodt.cas.workflow.engine.NonBlockingIterativeWorkflowProcessorThread;
 import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
 import org.apache.oodt.cas.workflow.engine.NonBlockingShepardThread;
 import org.apache.oodt.commons.util.DateConvert;
@@ -155,13 +155,13 @@ public class NonBlockingThreadPoolWorkfl
       .getLong(
               "org.apache.oodt.cas.workflow.engine.resourcemgr.pollingWaitTime",
               10).longValue();
-    	SequentialWorkflowProcessor worker = new SequentialWorkflowProcessor(
+    	NonBlockingIterativeWorkflowProcessorThread worker = new NonBlockingIterativeWorkflowProcessorThread(
                 wInst, instRep, this.wmgrUrl, pollingWaitTime);
         worker.setRClient(rClient);
         workerMap.put(wInst.getId(), worker);
 
         try {
-            pool.execute(worker);
+            pool.execute(new ThreadedProcessor(worker));
         } catch (InterruptedException e) {
         	LOG.log(Level.WARNING, "Error running workflow: "
                     + wInst.getWorkflow().getId());
@@ -235,8 +235,8 @@ public class NonBlockingThreadPoolWorkfl
      */
     public synchronized void pauseWorkflowInstance(String workflowInstId) {
         // okay, try and look up that worker thread in our hash map
-        SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-                .get(workflowInstId);
+      NonBlockingIterativeWorkflowProcessorThread worker = ((ThreadedProcessor) workerMap
+          .get(workflowInstId)).getProcessor();
         if (worker == null) {
             LOG
                     .log(
@@ -259,8 +259,8 @@ public class NonBlockingThreadPoolWorkfl
      */
     public synchronized void resumeWorkflowInstance(String workflowInstId) {
         // okay, try and look up that worker thread in our hash map
-        SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-                .get(workflowInstId);
+      NonBlockingIterativeWorkflowProcessorThread worker = ((ThreadedProcessor) workerMap
+          .get(workflowInstId)).getProcessor();
         if (worker == null) {
             LOG.log(Level.WARNING,
                     "WorkflowEngine: Attempt to resume workflow instance id: "
@@ -374,8 +374,8 @@ public class NonBlockingThreadPoolWorkfl
      */
     public synchronized void stopWorkflow(String workflowInstId) {
         // okay, try and look up that worker thread in our hash map
-        SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-                .get(workflowInstId);
+      NonBlockingIterativeWorkflowProcessorThread worker = ((ThreadedProcessor) workerMap
+          .get(workflowInstId)).getProcessor();
         if (worker == null) {
             LOG.log(Level.WARNING,
                     "WorkflowEngine: Attempt to stop workflow instance id: "
@@ -642,6 +642,41 @@ public class NonBlockingThreadPoolWorkfl
     	
     	return null;
     }
+    
+    class ThreadedProcessor implements Runnable {
+
+      private NonBlockingIterativeWorkflowProcessorThread processor;
+
+      public ThreadedProcessor(NonBlockingIterativeWorkflowProcessorThread processor) {
+        this.processor = processor;
+      }
+
+      /*
+       * (non-Javadoc)
+       * 
+       * @see java.lang.Runnable#run()
+       */
+      @Override
+      public void run() {
+        processor.start();
+      }
+
+      /**
+       * @return the processor
+       */
+      public NonBlockingIterativeWorkflowProcessorThread getProcessor() {
+        return processor;
+      }
+
+      /**
+       * @param processor
+       *          the processor to set
+       */
+      public void setProcessor(NonBlockingIterativeWorkflowProcessorThread processor) {
+        this.processor = processor;
+      }
+
+    }    
 	
 
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SequentialWorkflowProcessor.java
Mon Aug 15 15:31:36 2011
@@ -53,7 +53,7 @@ import java.util.logging.Logger;
  */
 
 public class SequentialWorkflowProcessor extends WorkflowProcessor implements
-    WorkflowStatus, CoreMetKeys, Runnable {
+    WorkflowStatus, CoreMetKeys {
 
   private Iterator<WorkflowTask> taskIterator;
 
@@ -70,12 +70,11 @@ public class SequentialWorkflowProcessor
     this.conditionEvaluator = new ConditionEvaluator();
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see java.lang.Runnable#run()
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.WorkflowProcessor#start()
    */
-  public void run() {
+  @Override
+  public void start() {
 
     String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
     this.workflowInstance.setStartDateTimeIsoStr(startDateTimeIsoStr);

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
Mon Aug 15 15:31:36 2011
@@ -137,8 +137,8 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void pauseWorkflowInstance(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-        .get(workflowInstId);
+    SequentialWorkflowProcessor worker = ((ThreadedProcessor) workerMap
+        .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
           "WorkflowEngine: Attempt to pause workflow instance id: "
@@ -161,8 +161,8 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void resumeWorkflowInstance(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-        .get(workflowInstId);
+    SequentialWorkflowProcessor worker = ((ThreadedProcessor) workerMap
+        .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
           "WorkflowEngine: Attempt to resume workflow instance id: "
@@ -217,7 +217,7 @@ public class ThreadPoolWorkflowEngine im
     persistWorkflowInstance(wInst);
 
     try {
-      pool.execute(worker);
+      pool.execute(new ThreadedProcessor(worker));
     } catch (InterruptedException e) {
       throw new EngineException(e);
     }
@@ -245,8 +245,8 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized boolean updateMetadata(String workflowInstId, Metadata met) {
     // okay, try and look up that worker thread in our hash map
-    SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-        .get(workflowInstId);
+    SequentialWorkflowProcessor worker = ((ThreadedProcessor) workerMap
+        .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
           "WorkflowEngine: Attempt to update metadata context "
@@ -290,8 +290,8 @@ public class ThreadPoolWorkflowEngine im
    */
   public synchronized void stopWorkflow(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-        .get(workflowInstId);
+    SequentialWorkflowProcessor worker = ((ThreadedProcessor) workerMap
+        .get(workflowInstId)).getProcessor();
     if (worker == null) {
       LOG.log(Level.WARNING,
           "WorkflowEngine: Attempt to stop workflow instance id: "
@@ -324,8 +324,8 @@ public class ThreadPoolWorkflowEngine im
    */
   public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
     // okay, try and look up that worker thread in our hash map
-    SequentialWorkflowProcessor worker = (SequentialWorkflowProcessor) workerMap
-        .get(workflowInstId);
+    SequentialWorkflowProcessor worker = ((ThreadedProcessor) workerMap
+        .get(workflowInstId)).getProcessor();
     if (worker == null) {
       // try and get the metadata
       // from the workflow instance repository (as it was persisted)
@@ -477,4 +477,39 @@ public class ThreadPoolWorkflowEngine im
     }
   }
 
+  class ThreadedProcessor implements Runnable {
+
+    private SequentialWorkflowProcessor processor;
+
+    public ThreadedProcessor(SequentialWorkflowProcessor processor) {
+      this.processor = processor;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+      processor.start();
+    }
+
+    /**
+     * @return the processor
+     */
+    public SequentialWorkflowProcessor getProcessor() {
+      return processor;
+    }
+
+    /**
+     * @param processor
+     *          the processor to set
+     */
+    public void setProcessor(SequentialWorkflowProcessor processor) {
+      this.processor = processor;
+    }
+
+  }
+
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1157889&r1=1157888&r2=1157889&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
Mon Aug 15 15:31:36 2011
@@ -99,7 +99,7 @@ public abstract class WorkflowProcessor 
    * <p>
    * Stops once and for all the thread from processing the workflow. This method
    * should not maintain the state of the workflow, it should gracefully shut
-   * down the WorkflowProcessorThread and any of its subsequent resources.
+   * down the WorkflowProcessor and any of its subsequent resources.
    * </p>
    * 
    */
@@ -108,7 +108,7 @@ public abstract class WorkflowProcessor 
   /**
    * <p>
    * Resumes execution of a {@link #pause}d {@link WorkflowInstace} by this
-   * WorkflowProcessorThread.
+   * WorkflowProcessor.
    * </p>
    * 
    */
@@ -117,11 +117,17 @@ public abstract class WorkflowProcessor 
   /**
    * <p>
    * Pauses exectuion of a {@link WorkflowInstace} being handled by this
-   * WorkflowProcessorThread.
+   * WorkflowProcessor.
    * </p>
    * 
    */
   public abstract void pause();
+  
+  
+  /**
+   * Starts execution of the subordinate {@link WorkflowProcessor}.
+   */
+  public abstract void start();
 
   /**
    * Returns the identifier of the current {@link WorkflowTask} being processed



Mime
View raw message