oodt-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattm...@apache.org
Subject svn commit: r1332505 - in /oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine: PrioritizedQueueBasedWorkflowEngine.java TaskQuerier.java WorkflowProcessorQueue.java
Date Tue, 01 May 2012 02:47:12 GMT
Author: mattmann
Date: Tue May  1 02:47:12 2012
New Revision: 1332505

URL: http://svn.apache.org/viewvc?rev=1332505&view=rev
Log:
- OODT-310 and OODT-215 WIP: TaskQuerier fully fleshed out, need to work on processorQueue
next, and also likely on ChangeType interface within PrioritizedQueueBasedWorkflowEngine.

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorQueue.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1332505&r1=1332504&r2=1332505&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
Tue May  1 02:47:12 2012
@@ -61,6 +61,7 @@ public class PrioritizedQueueBasedWorkfl
   private Thread queuerThread;
   private WorkflowInstanceRepository repo;  
   private PrioritySorter prioritizer;
+  private WorkflowProcessorQueue processorQueue;
   private URL wmgrUrl;  
   private long conditionWait;
   
@@ -86,7 +87,7 @@ public class PrioritizedQueueBasedWorkfl
     }*/
     
     // Task QUEUER thread
-    queuerThread = new Thread(new TaskQuerier());
+    queuerThread = new Thread(new TaskQuerier(processorQueue, prioritizer));
     queuerThread.start();
   }
   

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1332505&r1=1332504&r2=1332505&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
(original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
Tue May  1 02:47:12 2012
@@ -15,78 +15,127 @@
  * limitations under the License.
  */
 
-
 package org.apache.oodt.cas.workflow.engine;
 
+//JDK imports
 import java.util.List;
 import java.util.Vector;
 
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+
 /**
- *
- * Describe your class here.
- *
+ * 
+ * The purpose of this class is to constantly pop off tasks that are run to run
+ * and made available by the {@link WorkflowProcessorQueue}, and then to set
+ * their state to Executing (running Category), so they will be picked up on the
+ * next WorkflowState change, and end up executing.
+ * 
  * @author mattmann
+ * @author bfoster
  * @version $Revision$
- *
+ * 
  */
 public class TaskQuerier implements Runnable {
-  
+
   private boolean running;
-  
-  
+
+  private WorkflowProcessorQueue processorQueue;
+
+  private List<WorkflowProcessor> runnableProcessors;
+
+  private PrioritySorter prioritizer;
+
+  /**
+   * Constructs a new TaskQuerier with the given {@link WorkflowProcessorQueue},
+   * and with the associated {@link PrioritySorter} which acts as a sorter of
+   * the runnable {@link WorkflowProcessor}s.
+   * 
+   * @param processorQueue
+   *          The associated set of queued Workflow Tasks.
+   * @param prioritizer
+   *          The prioritizer to use to sort the ready-to-run Workflow Tasks.
+   */
+  public TaskQuerier(WorkflowProcessorQueue processorQueue,
+      PrioritySorter prioritizer) {
+    this.running = true;
+    this.processorQueue = processorQueue;
+    this.runnableProcessors = new Vector<WorkflowProcessor>();
+    this.prioritizer = prioritizer;
+  }
+
+  /**
+   * Marches through the set of processors that are currently in the Processor
+   * queue, checks to see if they are NOT in the done state, or if they are
+   * currently in the holding state. If either of those are true, the processor
+   * is popped off the queue, and then added to the runnableProcessors list (in
+   * a synchronized fashion), and then their state is set to Executing
+   * (category, running).
+   * 
+   * Finally the runnableProcessors list is sorted according to the given
+   * {@link #prioritizer}.
+   */
   public void run() {
-    /*while(running) {
-      try {
-        Vector<CachedWorkflowProcessor> processors = null; 
-        synchronized(PrioritizedQueueBasedWorkflowEngine.this.processorQueue) {
-          processors = new Vector<CachedWorkflowProcessor>(PrioritizedQueueBasedWorkflowEngine.this.processorQueue.values());
-        }
-        List<WorkflowProcessor> runnableProcessors = new Vector<WorkflowProcessor>();
-        for (CachedWorkflowProcessor cachedWP : processors) {
-          if (!allowQueuerToWork)
-            break;
-          if (!(cachedWP.getStub().getState().getCategory().equals(WorkflowState.Category.DONE)
|| cachedWP.getStub().getState().getCategory().equals(WorkflowState.Category.HOLDING))) {
-            cachedWP.uncache();
-            if (!PrioritizedQueueBasedWorkflowEngine.this.debugMode) {
-              processorLock.lock(cachedWP.getInstanceId());
-              WorkflowProcessor wp = cachedWP.getWorkflowProcessor();
-              for (TaskProcessor tp : wp.getRunnableWorkflowProcessors()) {
-                tp.setState(new WaitingOnResourcesState("Added to Runnable queue", new ExecutingState("")));
-                runnableProcessors.add(tp.getStub());
-              }
-              processorLock.unlock(cachedWP.getInstanceId());
+    while (running) {
+      List<WorkflowProcessor> processors = processorQueue.getProcessors();
+      synchronized (runnableProcessors) {
+        runnableProcessors.clear();
+      }
+      for (WorkflowProcessor processor : processors) {
+        // OK now get its lifecycle
+        WorkflowLifecycle lifecycle = getLifecycleForProcessor(processor);
+        if (!(processor.getState().getCategory().getName().equals("done") || processor
+            .getState().getCategory().getName().equals("holding"))) {
+          for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
+            synchronized (runnableProcessors) {
+              tp.setState(lifecycle.createState("Executing", "running",
+                  "Added to Runnable queue"));
+              runnableProcessors.add(processor);
             }
-            cachedWP.cache();
-          }else {
-            continue;
           }
 
-          if (runnableProcessors.size() > 0) {
-            synchronized (PrioritizedQueueBasedWorkflowEngine.this.runnableTasks) {
-              PrioritizedQueueBasedWorkflowEngine.this.runnableTasks.addAll(runnableProcessors);
-              PrioritizedQueueBasedWorkflowEngine.this.priorityManager.sort(PrioritizedQueueBasedWorkflowEngine.this.runnableTasks);
-            }
+          // now prioritize the runnable processors
+          synchronized (runnableProcessors) {
+            prioritizer.sort(runnableProcessors);
           }
-          runnableProcessors.clear();
-          
-          //take a breather
-          try {
-            synchronized(this) {
-              this.wait(1);
-            }
-          }catch (Exception e){}
+
+        } else {
+          continue;
         }
-      }catch (Exception e) {
-        e.printStackTrace();
       }
     }
-    
-    try {
-      synchronized(this) {
-        this.wait(2000);
-      }
-    }catch (Exception e){}
-  }*/
-}
-  
+  }
+
+  /**
+   * @return the running
+   */
+  public boolean isRunning() {
+    return running;
+  }
+
+  /**
+   * @param running
+   *          the running to set
+   */
+  public void setRunning(boolean running) {
+    this.running = running;
+  }
+
+  /**
+   * @return the runnableProcessors
+   */
+  public synchronized List<WorkflowProcessor> getRunnableProcessors() {
+    return runnableProcessors;
+  }
+
+  private WorkflowLifecycle getLifecycleForProcessor(WorkflowProcessor processor) {
+    if (processor.getWorkflowInstance() != null
+        && processor.getWorkflowInstance().getParentChildWorkflow() != null) {
+      return processor.getLifecycleManager().getLifecycleForWorkflow(
+          processor.getWorkflowInstance().getParentChildWorkflow());
+    } else
+      return processor.getLifecycleManager().getDefaultLifecycle();
+  }
+
 }

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorQueue.java?rev=1332505&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorQueue.java
(added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorQueue.java
Tue May  1 02:47:12 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.List;
+
+/**
+ * 
+ * The queue of available {@link WorkflowTask}s, that will be fed into the
+ * {@link TaskQuerier}.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class WorkflowProcessorQueue {
+
+  /**
+   * Should return the list of available, Queued, {@link WorkflowProcessor}s.
+   * 
+   * @return the list of available, Queued, {@link WorkflowProcessor}s.
+   */
+  public synchronized List<WorkflowProcessor> getProcessors() {
+    return null;
+  }
+
+}



Mime
View raw message