oodt-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bfos...@apache.org
Subject svn commit: r1052143 [2/11] - in /oodt/branches/wengine-branch: ./ src/ src/main/ src/main/assembly/ src/main/bin/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/oodt/ src/main/java/org/apache/oodt/cas/ src/main/ja...
Date Thu, 23 Dec 2010 02:44:31 GMT
Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocalFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocalFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocalFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocalFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.engine.runner.EngineRunnerFactory;
+import org.apache.oodt.cas.workflow.event.repo.WorkflowEngineEventRepositoryFactory;
+import org.apache.oodt.cas.workflow.instance.repo.WorkflowInstanceRepositoryFactory;
+import org.apache.oodt.cas.workflow.model.repo.WorkflowModelRepositoryFactory;
+import org.apache.oodt.cas.workflow.priority.PriorityManagerFactory;
+import org.apache.oodt.cas.workflow.processor.map.WorkflowProcessorMapFactory;
+import org.apache.oodt.cas.workflow.processor.repo.WorkflowProcessorRepositoryFactory;
+import org.apache.oodt.cas.workflow.server.channel.CommunicationChannelClientFactory;
+
+
+/**
+ * Factory for creating local WorkflowEngine
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ */
+public class WorkflowEngineLocalFactory implements WorkflowEngineFactory {
+
+	private WorkflowModelRepositoryFactory modelRepoFactory;
+	private WorkflowProcessorRepositoryFactory processorRepoFactory;
+	private WorkflowInstanceRepositoryFactory instanceRepoFactory;
+	private WorkflowProcessorMapFactory processorMapFactory;
+	private PriorityManagerFactory priorityManagerFactory;
+	private EngineRunnerFactory runnerFactory;
+	private CommunicationChannelClientFactory communicationChannelClientFactory;
+	private WorkflowEngineEventRepositoryFactory eventRepoFactory;
+	private List<String> metadataKeysToCache;
+	private boolean debug;
+	
+	public WorkflowEngineLocal createEngine() {
+		try {
+			return new WorkflowEngineLocal(this.modelRepoFactory.createModelRepository(), this.processorRepoFactory.createRepository(), this.instanceRepoFactory.createRepo(), this.eventRepoFactory.createRepo(), this.processorMapFactory.createProcessorMap(), this.priorityManagerFactory.createPriorityManager(), this.runnerFactory.createRunner(), this.communicationChannelClientFactory.createCommunicationChannelClient(), this.metadataKeysToCache, this.debug);
+		}catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}
+	}
+
+	public WorkflowModelRepositoryFactory getModelRepoFactory() {
+		return modelRepoFactory;
+	}
+
+	public void setModelRepoFactory(WorkflowModelRepositoryFactory modelRepoFactory) {
+		this.modelRepoFactory = modelRepoFactory;
+	}
+
+	public WorkflowProcessorRepositoryFactory getProcessorRepoFactory() {
+		return processorRepoFactory;
+	}
+
+	public void setProcessorRepoFactory(WorkflowProcessorRepositoryFactory processorRepoFactory) {
+		this.processorRepoFactory = processorRepoFactory;
+	}
+
+	public WorkflowInstanceRepositoryFactory getInstanceRepoFactory() {
+		return instanceRepoFactory;
+	}
+
+	public void setInstanceRepoFactory(WorkflowInstanceRepositoryFactory instanceRepoFactory) {
+		this.instanceRepoFactory = instanceRepoFactory;
+	}
+
+	public WorkflowEngineEventRepositoryFactory getEventRepoFactory() {
+		return eventRepoFactory;
+	}
+
+	public void setEventRepoFactory(
+			WorkflowEngineEventRepositoryFactory eventRepoFactory) {
+		this.eventRepoFactory = eventRepoFactory;
+	}
+
+	public WorkflowProcessorMapFactory getProcessorMapFactory() {
+		return processorMapFactory;
+	}
+
+	public void setProcessorMapFactory(WorkflowProcessorMapFactory processorMapFactory) {
+		this.processorMapFactory = processorMapFactory;
+	}
+
+	public PriorityManagerFactory getPriorityManagerFactory() {
+		return priorityManagerFactory;
+	}
+
+	public void setPriorityManagerFactory(PriorityManagerFactory priorityManagerFactory) {
+		this.priorityManagerFactory = priorityManagerFactory;
+	}
+
+	public EngineRunnerFactory getRunnerFactory() {
+		return runnerFactory;
+	}
+
+	public void setRunnerFactory(EngineRunnerFactory runnerFactory) {
+		this.runnerFactory = runnerFactory;
+	}
+
+	public CommunicationChannelClientFactory getCommunicationChannelClientFactory() {
+		return communicationChannelClientFactory;
+	}
+
+	public void setCommunicationChannelClientFactory(CommunicationChannelClientFactory communicationChannelClientFactory) {
+		this.communicationChannelClientFactory = communicationChannelClientFactory;
+	}
+	
+	public List<String> getMetadataKeysToCache() {
+		return metadataKeysToCache;
+	}
+
+	public void setMetadataKeysToCache(List<String> metadataKeysToCache) {
+		this.metadataKeysToCache = metadataKeysToCache;
+	}
+
+	public boolean getDebug() {
+		return this.debug;
+	}
+	
+	public void setDebug(boolean debug) {
+		this.debug = debug;
+	}
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.queue;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.page.PageInfo;
+import org.apache.oodt.cas.catalog.page.ProcessedPageInfo;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+import org.apache.oodt.cas.workflow.page.PageFilter;
+import org.apache.oodt.cas.workflow.page.QueuePage;
+import org.apache.oodt.cas.workflow.page.RunnablesPage;
+import org.apache.oodt.cas.workflow.priority.HighestPriorityFirstManager;
+import org.apache.oodt.cas.workflow.priority.Priority;
+import org.apache.oodt.cas.workflow.priority.PriorityManager;
+import org.apache.oodt.cas.workflow.processor.ProcessorStub;
+import org.apache.oodt.cas.workflow.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.processor.WorkflowProcessor;
+import org.apache.oodt.cas.workflow.processor.repo.WorkflowProcessorRepository;
+import org.apache.oodt.cas.workflow.state.RevertableWorkflowState;
+import org.apache.oodt.cas.workflow.state.WorkflowState;
+import org.apache.oodt.cas.workflow.state.running.ExecutingState;
+import org.apache.oodt.cas.workflow.state.waiting.QueuedState;
+import org.apache.oodt.cas.workflow.state.waiting.WaitingOnResourcesState;
+import org.apache.oodt.cas.workflow.util.WorkflowUtils;
+
+//JDK imports
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ * Manages caching and uncaching, scheduling, and queuing of WorkflowProcessors
+ *
+ */
+public class QueueManager {
+	
+	private static final Logger LOG = Logger.getLogger(QueueManager.class.getName());
+	
+	private Map<String, CachedWorkflowProcessor> processorQueue;
+	private List<ProcessorStub> runnableTasks;
+	private Map<String, ProcessorStub> executingTasks;
+	private WorkflowProcessorRepository processorRepo;
+	private WorkflowProcessorLock processorLock;
+	private PriorityManager priorityManager;
+	private List<String> metadataKeysToCache;
+	private boolean debugMode;
+	private boolean allowQueuerToWork;
+	
+	private Thread queuerThread;
+	
+	public QueueManager(WorkflowProcessorRepository processorRepo) {
+		this(processorRepo, null, null, false);
+	}
+	
+	public QueueManager(WorkflowProcessorRepository processorRepo, PriorityManager priorityManager) {
+		this(processorRepo, priorityManager, null, false);
+	}
+	
+	public QueueManager(WorkflowProcessorRepository processorRepo, boolean debugMode) {
+		this(processorRepo, null, null, debugMode);
+	}
+
+	public QueueManager(WorkflowProcessorRepository processorRepo, List<String> metadataKeysToCache) {
+		this(processorRepo, null, metadataKeysToCache, false);
+	}
+	
+	public QueueManager(WorkflowProcessorRepository processorRepo, PriorityManager priorityManager, List<String> metadataKeysToCache, boolean debugMode) {
+		this.processorQueue = Collections.synchronizedMap(new HashMap<String, CachedWorkflowProcessor>());
+		this.runnableTasks = new Vector<ProcessorStub>();
+		this.executingTasks = Collections.synchronizedMap(new HashMap<String, ProcessorStub>());
+		this.processorLock = new WorkflowProcessorLock();
+		this.processorRepo = processorRepo;
+		this.priorityManager = priorityManager == null ? new HighestPriorityFirstManager() : priorityManager;
+		if (metadataKeysToCache != null)
+			this.metadataKeysToCache = new Vector<String>(metadataKeysToCache);
+		this.debugMode = debugMode;
+		this.allowQueuerToWork = true;
+		
+		try {
+			this.loadProcessorRepo();
+		}catch (Exception e) {
+			e.printStackTrace();
+		}
+		
+		// Task QUEUER thread
+		queuerThread = new Thread(new Runnable() {
+			public void run() {
+				while(allowQueuerToWork) {
+					try {
+						Vector<CachedWorkflowProcessor> processors = null; 
+						synchronized(QueueManager.this.processorQueue) {
+							processors = new Vector<CachedWorkflowProcessor>(QueueManager.this.processorQueue.values());
+						}
+						List<ProcessorStub> runnableProcessors = new Vector<ProcessorStub>();
+						for (CachedWorkflowProcessor cachedWP : processors) {
+							if (!allowQueuerToWork)
+								break;
+							if (!(cachedWP.getStub().getState().getCategory().equals(WorkflowState.Category.DONE) || cachedWP.getStub().getState().getCategory().equals(WorkflowState.Category.HOLDING))) {
+								cachedWP.uncache();
+								if (!QueueManager.this.debugMode) {
+									processorLock.lock(cachedWP.getInstanceId());
+									WorkflowProcessor wp = cachedWP.getWorkflowProcessor();
+									for (TaskProcessor tp : wp.getRunnableWorkflowProcessors()) {
+										tp.setState(new WaitingOnResourcesState("Added to Runnable queue", new ExecutingState("")));
+										runnableProcessors.add(tp.getStub());
+									}
+									processorLock.unlock(cachedWP.getInstanceId());
+								}
+								cachedWP.cache();
+							}else {
+								continue;
+							}
+
+							if (runnableProcessors.size() > 0) {
+								synchronized (QueueManager.this.runnableTasks) {
+									QueueManager.this.runnableTasks.addAll(runnableProcessors);
+									QueueManager.this.priorityManager.sort(QueueManager.this.runnableTasks);
+								}
+							}
+							runnableProcessors.clear();
+							
+							//take a breather
+							try {
+								synchronized(this) {
+									this.wait(1);
+								}
+							}catch (Exception e){}
+						}
+					}catch (Exception e) {
+						e.printStackTrace();
+					}
+				}
+				
+				try {
+					synchronized(this) {
+						this.wait(2000);
+					}
+				}catch (Exception e){}
+			}
+		});
+		queuerThread.start();
+	}
+		
+	public void loadProcessorRepo() throws Exception {
+		if (this.processorRepo != null) {
+			for (String instanceId : this.processorRepo.getStoredInstanceIds())
+				this.processorQueue.put(instanceId, new CachedWorkflowProcessor(instanceId));
+		}
+	}
+	
+	public void shutdown() {
+		this.allowQueuerToWork = false;
+		try {
+			this.queuerThread.join(5000);
+		}catch(Exception e) {}
+	}
+	
+	public void addToQueue(WorkflowProcessor workflowProcessor) throws Exception {
+		workflowProcessor.setStateRecur(new QueuedState(""));
+		this.processorQueue.put(workflowProcessor.getInstanceId(), new CachedWorkflowProcessor(workflowProcessor));
+	}
+	
+	public TaskInstance getNext() throws Exception {
+		ProcessorStub stub = null;
+		synchronized (this.runnableTasks) {
+			if (!this.runnableTasks.isEmpty()) 
+				stub = this.runnableTasks.remove(0);
+		}
+		if (stub != null) {
+			CachedWorkflowProcessor cachedWP = this.processorQueue.get(stub.getInstanceId());
+			cachedWP.uncache();
+			processorLock.lock(cachedWP.getInstanceId());
+			TaskProcessor taskProcessor = (TaskProcessor) WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
+			TaskInstance taskInstance = this.makeInstance(taskProcessor);
+			this.executingTasks.put(taskProcessor.getInstanceId() + ":" + taskProcessor.getModelId(), taskProcessor.getStub());
+			processorLock.unlock(cachedWP.getInstanceId());
+			cachedWP.cache();
+			return taskInstance;
+		}else { 
+			return null;
+		}
+	}
+	
+	private TaskInstance makeInstance(TaskProcessor taskProcessor) throws InstantiationException, IllegalAccessException {
+		TaskInstance ti = taskProcessor.getInstanceClass().newInstance();
+		ti.setInstanceId(taskProcessor.getInstanceId());
+		ti.setDynamicMetadata(taskProcessor.getDynamicMetadata());
+		ti.setStaticMetadata(taskProcessor.getStaticMetadata());
+		ti.setModelId(taskProcessor.getModelId());
+		if (taskProcessor.getJobId() == null) {
+			ti.setJobId(UUID.randomUUID().toString());
+			taskProcessor.setJobId(ti.getJobId());
+		}else {
+			ti.setJobId(taskProcessor.getJobId());
+		}
+		return ti;
+	}
+	
+	public void revertState(String instanceId, String modelId) {
+		try {
+			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
+			if (cachedWP != null) {
+				cachedWP.uncache();
+				processorLock.lock(cachedWP.getInstanceId());
+				if (modelId != null)
+					WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId).revertState();
+				else
+					cachedWP.getWorkflowProcessor().revertState();
+				WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
+				processorLock.unlock(cachedWP.getInstanceId());
+				cachedWP.cache();
+			}
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to revert state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
+		}
+	}
+	
+	public void setState(String instanceId, String modelId, WorkflowState state) {
+		try {
+			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
+			if (cachedWP != null) {
+				cachedWP.uncache();
+				processorLock.lock(cachedWP.getInstanceId());
+				WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+				if (state instanceof RevertableWorkflowState)
+					((RevertableWorkflowState) state).setPrevState(wp.getState());
+				wp.setState(state);
+				if (wp instanceof TaskProcessor) {
+					if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+						if (!(state instanceof ExecutingState))
+							this.executingTasks.remove(instanceId + ":" + modelId);
+						else
+							this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
+					}else {
+						this.updateRunnableStub(wp);
+					}
+				}
+				processorLock.unlock(cachedWP.getInstanceId());
+				cachedWP.cache();
+			}
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to set state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
+		}
+	}
+	
+	public void setPriority(String instanceId, String modelId, Priority priority) {
+		try {
+			if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+				LOG.log(Level.WARNING, "Can't change the priority of an executing task [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : ");
+				return;
+			}
+			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
+			if (cachedWP != null) {
+				cachedWP.uncache();
+				processorLock.lock(cachedWP.getInstanceId());
+				WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+				wp.setPriorityRecur(priority);
+				if (wp instanceof TaskProcessor) 
+					this.updateRunnableStub(wp);
+				processorLock.unlock(cachedWP.getInstanceId());
+				cachedWP.cache();
+			}
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to set priority for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
+		}
+	}
+	
+	public void setMetadata(String instanceId, String modelId, Metadata metadata) {
+		try {
+			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
+			if (cachedWP != null) {
+				cachedWP.uncache();
+				processorLock.lock(cachedWP.getInstanceId());
+				WorkflowProcessor wp = modelId == null ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+				wp.setDynamicMetadata(metadata);
+				processorLock.unlock(cachedWP.getInstanceId());
+				cachedWP.cache();
+			}
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to set metadata for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
+		}
+	}
+	
+	public WorkflowProcessor getWorkflowProcessor(String instanceId) {
+		CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
+		WorkflowProcessor returnProcessor = null;
+		if (cachedWP != null) {
+			cachedWP.uncache();
+			processorLock.lock(instanceId);
+			returnProcessor = cachedWP.getWorkflowProcessor();
+			processorLock.unlock(instanceId);
+			cachedWP.cache();
+		}		
+		return returnProcessor;
+	}
+	
+	public boolean containsWorkflow(String instanceId) {
+		return this.processorQueue.containsKey(instanceId);
+	}
+	
+	public void deleteWorkflowProcessor(String instanceId) {
+		CachedWorkflowProcessor cachedWP = this.processorQueue.remove(instanceId);
+		if (cachedWP != null) {
+			cachedWP.delete();
+			this.processorLock.delete(instanceId);
+			synchronized (this.runnableTasks) {
+				for (int i = 0; i < this.runnableTasks.size(); i++) {
+					ProcessorStub stub = this.runnableTasks.get(i);
+					if (stub.getInstanceId().equals(instanceId)) 
+						this.runnableTasks.remove(i--);
+				}
+			}
+		}
+	}
+	
+    public RunnablesPage getExecutingPage(PageInfo pageInfo) {
+		List<ProcessorStub> executing = new Vector<ProcessorStub>(this.executingTasks.values());
+    	Vector<ProcessorStub> pageWPs = new Vector<ProcessorStub>();
+		int startIndex = (pageInfo.getPageNum() - 1) * pageInfo.getPageSize();
+		for (int i = startIndex; i < startIndex + pageInfo.getPageSize() && i < executing.size(); i++) 
+			pageWPs.add(executing.get(i));
+		return new RunnablesPage(this.getProcessedPageInfo(pageInfo, executing.size()), pageWPs);
+	}
+	
+    public RunnablesPage getRunnablesPage(PageInfo pageInfo) {
+		List<ProcessorStub> runnables = new Vector<ProcessorStub>(this.runnableTasks);
+    	Vector<ProcessorStub> pageWPs = new Vector<ProcessorStub>();
+		int startIndex = (pageInfo.getPageNum() - 1) * pageInfo.getPageSize();
+		for (int i = startIndex; i < startIndex + pageInfo.getPageSize() && i < runnables.size(); i++) 
+			pageWPs.add(runnables.get(i));
+		return new RunnablesPage(this.getProcessedPageInfo(pageInfo, runnables.size()), pageWPs);
+	}
+    	
+    public QueuePage getPage(PageInfo pageInfo) {
+    	return this.getPage(pageInfo, (Comparator<ProcessorStub>) null);
+    }
+    
+    public QueuePage getPage(PageInfo pageInfo, PageFilter filter) {
+    	Vector<CachedWorkflowProcessor> acceptedWPs = new Vector<CachedWorkflowProcessor>();
+    	Vector<CachedWorkflowProcessor> cachedWPs = null;
+    	synchronized(processorQueue) {
+    		cachedWPs = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+    	}
+		if (filter != null) 
+			for (CachedWorkflowProcessor cachedWP : cachedWPs) 
+				if (filter.accept(cachedWP.getStub(), cachedWP.getCachedMetadata()))
+					acceptedWPs.add(cachedWP);
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, acceptedWPs.size()), this.getPage(pageInfo, acceptedWPs), filter);
+    }
+    
+    public QueuePage getPage(PageInfo pageInfo, Comparator<ProcessorStub> comparator) {
+		Vector<CachedWorkflowProcessor> sortedCachedWPs = null;
+		synchronized(this.processorQueue) {
+			sortedCachedWPs = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		if (comparator != null) {
+			final Comparator<ProcessorStub> comparatorFinal = comparator;
+			Collections.sort(sortedCachedWPs, new Comparator<CachedWorkflowProcessor>() {
+				public int compare(CachedWorkflowProcessor o1,
+						CachedWorkflowProcessor o2) {
+					return comparatorFinal.compare(o1.getStub(), o2.getStub());
+				}
+			});
+		}
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, sortedCachedWPs.size()), this.getPage(pageInfo, sortedCachedWPs), comparator);
+    }
+	
+	public QueuePage getPage(PageInfo pageInfo, WorkflowState state) {
+		List<CachedWorkflowProcessor> processorsOfGivenState = new Vector<CachedWorkflowProcessor>();
+		Vector<CachedWorkflowProcessor> processorQueueValues = null;
+		synchronized(this.processorQueue) {
+			processorQueueValues = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		for (CachedWorkflowProcessor cachedWP : processorQueueValues) 
+			if (cachedWP.getStub().getState().equals(state))
+				processorsOfGivenState.add(cachedWP);
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, processorsOfGivenState.size()), this.getPage(pageInfo, processorsOfGivenState), state);
+	}
+	
+    public QueuePage getPage(PageInfo pageInfo, WorkflowState.Category category) {
+		List<CachedWorkflowProcessor> processorsOfGivenCategory = new Vector<CachedWorkflowProcessor>();
+		Vector<CachedWorkflowProcessor> processorQueueValues = null;
+		synchronized(this.processorQueue) {
+			processorQueueValues = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		for (CachedWorkflowProcessor cachedWP : processorQueueValues) 
+			if (cachedWP.getStub().getState().getCategory().equals(category))
+				processorsOfGivenCategory.add(cachedWP);
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, processorsOfGivenCategory.size()), this.getPage(pageInfo, processorsOfGivenCategory), category);
+    }
+    
+    public QueuePage getPage(PageInfo pageInfo, String modelId) {
+		List<CachedWorkflowProcessor> processorsOfGivenModelId = new Vector<CachedWorkflowProcessor>();
+		Vector<CachedWorkflowProcessor> processorQueueValues = null;
+		synchronized(this.processorQueue) {
+			processorQueueValues = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		for (CachedWorkflowProcessor cachedWP : processorQueueValues) 
+			if (cachedWP.getStub().getModelId().equals(modelId))
+				processorsOfGivenModelId.add(cachedWP);
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, processorsOfGivenModelId.size()), this.getPage(pageInfo, processorsOfGivenModelId), modelId);
+    }
+    
+    public QueuePage getPage(PageInfo pageInfo, Map<String, List<String>> keyValPairs) {
+		List<CachedWorkflowProcessor> queryResults = new Vector<CachedWorkflowProcessor>();
+		Vector<CachedWorkflowProcessor> processorQueueValues = null;
+		synchronized(this.processorQueue) {
+			processorQueueValues = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		for (CachedWorkflowProcessor cachedWP : processorQueueValues) {
+			Metadata cachedMetadata = cachedWP.getCachedMetadata();
+			if (cachedMetadata.getAllKeys().size() > 0) {
+				for (Entry<String, List<String>> entry : keyValPairs.entrySet()) {
+					String value = cachedMetadata.getMetadata(entry.getKey());
+					if (value != null && entry.getValue().contains(value))
+						queryResults.add(cachedWP);
+				}
+			}
+		}
+		return new QueuePage(this.getProcessedPageInfo(pageInfo, queryResults.size()), this.getPage(pageInfo, queryResults), keyValPairs);
+    }
+    
+    protected ProcessedPageInfo getProcessedPageInfo(PageInfo pageInfo, int numOfHits) {
+    	return new ProcessedPageInfo(pageInfo.getPageSize(), pageInfo.getPageNum(),	numOfHits);
+    }
+    
+    protected List<ProcessorStub> getPage(PageInfo pageInfo, List<CachedWorkflowProcessor> cachedWPs) {
+    	Vector<ProcessorStub> pageWPs = new Vector<ProcessorStub>();
+		int startIndex = (pageInfo.getPageNum() - 1) * pageInfo.getPageSize();
+		for (int i = startIndex; i < startIndex + pageInfo.getPageSize() && i < cachedWPs.size(); i++) {
+			CachedWorkflowProcessor cachedWP = cachedWPs.get(i);
+			processorLock.lock(cachedWP.getInstanceId());
+			pageWPs.add(cachedWP.getStub());
+			processorLock.unlock(cachedWP.getInstanceId());
+		}
+    	return pageWPs;
+    }
+    
+    protected void updateRunnableStub(WorkflowProcessor wp) {
+		if (this.runnableTasks.remove(wp.getStub())) {
+			if (wp.getState() instanceof WaitingOnResourcesState) {
+				this.runnableTasks.add(wp.getStub());
+				synchronized (this.runnableTasks) {
+					this.priorityManager.sort(this.runnableTasks);
+				}
+			}
+		}
+	}
+    
+    public int getNumOfLoadedProcessors() {
+    	int loaded = 0;
+		Vector<CachedWorkflowProcessor> processorQueueValues = null;
+		synchronized(this.processorQueue) {
+			processorQueueValues = new Vector<CachedWorkflowProcessor>(this.processorQueue.values());
+		}
+		for (CachedWorkflowProcessor cachedWP : processorQueueValues) 
+			if (cachedWP.processorStub != null)
+				loaded++;
+		return loaded;
+    }
+    
+    public int getNumOfProcessors() {
+    	return this.processorQueue.size();
+    }
+	
+	private class CachedWorkflowProcessor {
+		
+		private String instanceId;
+		private ProcessorStub processorStub;
+		private Metadata cachedMetadata;
+		private WorkflowProcessor wp;
+		private int uncachedCalls;
+		private boolean firstUncache;
+		
+		public CachedWorkflowProcessor(String instanceId) {
+			this.instanceId = instanceId;
+			this.processorStub = null;
+			this.cachedMetadata = null;
+			this.uncachedCalls = 0;
+			this.firstUncache = true;
+		}
+		
+		public CachedWorkflowProcessor(WorkflowProcessor workflowProcessor) {
+			this(workflowProcessor, true);
+		}
+		
+		public CachedWorkflowProcessor(WorkflowProcessor workflowProcessor, boolean cache) {
+			this(workflowProcessor.getInstanceId());
+			this.firstUncache = false;
+			this.wp = workflowProcessor;
+			if (cache)
+				this.cache();
+			else
+				this.save();
+		}
+		
+		public synchronized boolean isCached() {
+			return this.wp == null;
+		}
+		
+		public synchronized void delete() {
+			try {
+				if (QueueManager.this.processorRepo != null)
+					QueueManager.this.processorRepo.delete(this.instanceId);
+			}catch (Exception e) {
+				LOG.log(Level.WARNING, "Failed to delete " + this.instanceId + " : " + e.getMessage(), e);
+			}
+		}
+		
+		public synchronized void save() {
+			try {
+				this.processorStub = this.wp.getStub();
+				this.loadCachedMetadata();
+				if (QueueManager.this.processorRepo != null)
+					QueueManager.this.processorRepo.store(this.wp);
+			}catch (Exception e) {
+				LOG.log(Level.WARNING, "Failed to cache " + this.instanceId + " : " + e.getMessage(), e);
+			}
+		}
+		
+		public synchronized void cache() {
+			this.save();
+			if (this.uncachedCalls > 0) 
+				this.uncachedCalls--;
+			if (this.uncachedCalls == 0) {
+				if (QueueManager.this.processorRepo != null)
+					this.wp = null;	
+			}
+		}
+		
+		public synchronized void uncache() {
+			try {
+				if (this.isCached() && QueueManager.this.processorRepo != null)
+					this.wp = QueueManager.this.processorRepo.load(this.instanceId);
+				this.uncachedCalls++;
+				if (this.firstUncache && !(this.getStub().getState().getCategory().equals(WorkflowState.Category.DONE) || this.getStub().getState().getCategory().equals(WorkflowState.Category.HOLDING))) {
+					WorkflowUtils.validateWorkflowProcessor(this.wp);
+					this.firstUncache = false;
+				}
+			}catch (Exception e) {
+				LOG.log(Level.WARNING, "Failed to uncache " + this.instanceId + " : " + e.getMessage(), e);
+			}
+		}
+		
+		public synchronized String getInstanceId() {
+			return this.instanceId;
+		}
+		
+		public synchronized ProcessorStub getStub() {
+			if (this.isCached()) {
+				if (this.processorStub == null) {
+					this.uncache();
+					this.processorStub = this.wp.getStub();
+					this.cache();
+				}
+			}else {
+				this.processorStub = this.wp.getStub();
+			}
+			return this.processorStub;
+		}
+		
+		public synchronized Metadata getCachedMetadata() {
+			if (this.isCached()) {
+				if (this.cachedMetadata == null) {
+					this.uncache();
+					this.loadCachedMetadata();
+					this.cache();
+				}
+			}else {
+				this.loadCachedMetadata();
+			}
+			return this.cachedMetadata != null ? this.cachedMetadata : new Metadata();
+		}
+		
+		private void loadCachedMetadata() {
+			if (QueueManager.this.metadataKeysToCache != null)
+				this.cachedMetadata = this._getCachedMetadata();
+		}
+		
+		private Metadata _getCachedMetadata() {
+			Metadata m = new Metadata();
+			for (String key : QueueManager.this.metadataKeysToCache) {
+				List<String> values = this.wp.getDynamicMetadata().getAllMetadata(key);
+				if (values == null)
+					values = this.wp.getStaticMetadata().getAllMetadata(key);
+				if (values != null)
+					m.addMetadata(key, values);
+			}
+			return m;
+		}
+		
+		public synchronized WorkflowProcessor getWorkflowProcessor() {
+			try {
+				return this.wp;
+			}catch (Exception e) {
+				e.printStackTrace();
+				return null;
+			}
+		}
+		
+	}
+	
+	private class WorkflowProcessorLock {
+		
+		private Map<String, ReadWriteLock> lockedProcessors;
+		
+		public WorkflowProcessorLock() {
+			this.lockedProcessors = new HashMap<String, ReadWriteLock>();
+		}
+		
+		public void lock(String instanceId) {
+			ReadWriteLock lock = null;
+			synchronized(this.lockedProcessors) {
+				lock = this.lockedProcessors.get(instanceId);
+				if (lock == null)
+					this.lockedProcessors.put(instanceId, lock = new ReentrantReadWriteLock());
+			}
+			lock.writeLock().lock();
+		}
+		
+		public void unlock(String instanceId) {
+			ReadWriteLock lock = null;
+			synchronized(this.lockedProcessors) {
+				lock = this.lockedProcessors.get(instanceId);
+			}
+			lock.writeLock().unlock();
+		}
+		
+		public void delete(String instanceId) {
+			synchronized(this.lockedProcessors) {
+				this.lockedProcessors.remove(instanceId);
+			}
+		}
+		
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ * Responsible for executing TaskInstances
+ *
+ */
+public abstract class EngineRunner {
+	
+	public abstract int getOpenSlots() throws Exception;
+	
+	public abstract boolean hasOpenSlots() throws Exception;
+	
+	public abstract void execute(TaskInstance workflowInstance) throws Exception;
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunnerFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunnerFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunnerFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunnerFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	Factory for creating EngineRunners
+ *
+ */
+public interface EngineRunnerFactory {
+
+	public EngineRunner createRunner();
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+/**
+ * 
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	Mutli-Threaded Runner which executes TaskInstances in local JVM
+ *
+ */
+public class LocalEngineRunner extends EngineRunner {
+	
+	private static final int NUM_OF_SLOTS = 6;
+	private int usedSlots = 0;
+	
+	public void execute(final TaskInstance workflowInstance) throws Exception {
+		incrSlots();
+		new Thread(new Runnable() {
+			public void run() {
+				try {
+					workflowInstance.execute();
+				}finally {
+					decrSlots();
+				}
+			}
+		}).start();
+	}
+
+	@Override
+	public synchronized int getOpenSlots() throws Exception {
+		return NUM_OF_SLOTS - usedSlots;
+	}
+
+	@Override
+	public boolean hasOpenSlots() throws Exception {
+		return this.getOpenSlots() > 0;
+	}
+	
+	private synchronized void incrSlots() {
+		usedSlots++;
+	}
+	
+	private synchronized void decrSlots() {
+		usedSlots--;
+	}
+		
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+/**
+ * 
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Factory for creating local EngineRunners
+ *
+ */
+public class LocalEngineRunnerFactory implements EngineRunnerFactory {
+
+	public LocalEngineRunner createRunner() {
+		return new LocalEngineRunner();
+	}
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInput.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInput.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInput.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInput.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+import org.apache.oodt.cas.workflow.util.Serializer;
+
+//JDK imports
+import java.util.Hashtable;
+import java.util.Properties;
+
+/**
+ * 
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	A TaskInstance JobInput for running in CAS-Resource
+ *
+ */
+public class ResourceJobInput implements JobInput {
+
+	protected TaskInstance workflowInstance;
+	private static final String KEY = "WorkflowInstanceXml";
+	
+	public String getId() {
+		return workflowInstance.getModelId();
+	}
+
+	public void read(Object object) {
+		this.workflowInstance = new Serializer().deserializeObject(TaskInstance.class, (String) ((Hashtable) object).get(KEY));
+	}
+
+	public Object write() {
+		Hashtable<String, String> table = new Hashtable<String, String>();
+		table.put(KEY, new Serializer().serializeObject(this.workflowInstance));
+		return table;
+	}
+
+	public void configure(Properties properties) {
+		//do nothing
+	}
+	
+}
\ No newline at end of file

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInstance.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInstance.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceJobInstance.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.resource.structs.JobInput;
+import org.apache.oodt.cas.resource.structs.JobInstance;
+import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
+import org.apache.oodt.cas.workflow.state.transition.ExecutionCompleteState;
+
+/**
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * WorkflowManager TaskInstance equivalent in CAS-Resource
+ *
+ */
+public class ResourceJobInstance implements JobInstance {
+
+	public boolean execute(JobInput input) throws JobInputException {
+		((ResourceJobInput) input).workflowInstance.execute();
+		return ((ResourceJobInput) input).workflowInstance.getState() instanceof ExecutionCompleteState;
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+import static org.apache.oodt.cas.workflow.metadata.ResourceMetKeys.*;
+
+/**
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * CAS-Resource EngineRunner
+ *
+ */
+public class ResourceRunner extends EngineRunner {
+
+	private XmlRpcResourceManagerClient rsManagerClient;
+	
+	public ResourceRunner(XmlRpcResourceManagerClient rsManagerClient) {
+		super();
+		this.rsManagerClient = rsManagerClient;
+	}
+	
+	@Override
+	public void execute(TaskInstance workflowInstance) throws Exception {
+		ResourceJobInput input = new ResourceJobInput();
+		input.workflowInstance = workflowInstance;
+		Job job = new Job();
+		job.setId(workflowInstance.getJobId());
+		job.setName(workflowInstance.getModelId());
+		job.setJobInputClassName(ResourceJobInput.class.getCanonicalName());
+		job.setJobInstanceClassName(ResourceJobInstance.class.getCanonicalName());
+		Metadata m = workflowInstance.getMetadata();
+		if (m.getMetadata(LOAD) != null)
+			job.setLoadValue(Integer.parseInt(m.getMetadata(LOAD)));
+		else 
+			job.setLoadValue(2);
+		if (m.getMetadata(QUEUE_NAME) != null)
+			job.setQueueName(m.getMetadata(QUEUE_NAME));
+		else
+			throw new Exception("Must specify 'QueueName' for task [instanceId = '" + workflowInstance.getInstanceId() + "', modelId = '" + workflowInstance.getModelId() + "']");
+		this.rsManagerClient.submitJob(job, input);
+	}
+
+	@Override
+	/**
+	 * Additional '-1' is a workaround of a bug in resource manager
+	 */
+	public int getOpenSlots() throws Exception {
+		return this.rsManagerClient.getJobQueueCapacity() - this.rsManagerClient.getJobQueueSize() - 1;
+	}
+
+	@Override
+	public boolean hasOpenSlots() throws Exception {
+		return this.getOpenSlots() > 0;
+	}
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunnerFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//JDK imports
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+
+/**
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Factory for creating ResourceRunners
+ *
+ */
+public class ResourceRunnerFactory implements EngineRunnerFactory {
+
+	private static final Logger LOG = Logger.getLogger(ResourceRunnerFactory.class.getName());
+	
+	private String resourceManagerUrl;
+	
+	public ResourceRunner createRunner() {
+		try {
+			return new ResourceRunner(new XmlRpcResourceManagerClient(new URL(this.resourceManagerUrl)));
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to create ResourceRunner : " + e.getMessage(), e);
+			return null;
+		}
+	}
+
+	public void setResourceManagerUrl(String resourceManagerUrl) {
+		this.resourceManagerUrl = resourceManagerUrl;
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByCategory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByCategory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByCategory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByCategory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.page.PageInfo;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineLocal;
+import org.apache.oodt.cas.workflow.page.QueuePage;
+import org.apache.oodt.cas.workflow.processor.ProcessorStub;
+import org.apache.oodt.cas.workflow.state.StateUtils;
+
+/**
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Event for delete all workflows in a given category
+ *
+ */
+public class DeleteWorkflowsByCategory extends WorkflowEngineEvent {
+	
+	private static final String CATEGORY = "Category";
+	
+	@Override
+	public void performAction(WorkflowEngineLocal engine, Metadata inputMetadata)
+			throws Exception {
+		String category = inputMetadata.getMetadata(CATEGORY);
+		if (category == null)
+			throw new Exception("Must set '" + CATEGORY + "' metadata field!");
+		try {
+			QueuePage page = null; 
+			do {
+				page = engine.getPage(new PageInfo(50, PageInfo.FIRST_PAGE), StateUtils.getCategoryByName(engine.getSupportedStates(), category));
+				for (ProcessorStub stub : page.getStubs()) 
+					engine.deleteWorkflow(stub.getInstanceId());
+			}while (!page.getPageInfo().isLastPage());		
+		}catch (Exception e) {
+			throw new Exception("Failed to delete workflows by category '" + category + "' : " + e.getMessage(), e);
+		}
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByState.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByState.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByState.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/DeleteWorkflowsByState.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.page.PageInfo;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineLocal;
+import org.apache.oodt.cas.workflow.page.QueuePage;
+import org.apache.oodt.cas.workflow.processor.ProcessorStub;
+import org.apache.oodt.cas.workflow.state.StateUtils;
+
+/**
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Event for delete all workflows in a given state
+ *
+ */
+public class DeleteWorkflowsByState extends WorkflowEngineEvent {
+	
+	private static final String STATE = "State";
+	
+	@Override
+	public void performAction(WorkflowEngineLocal engine, Metadata inputMetadata)
+			throws Exception {
+		String state = inputMetadata.getMetadata(STATE);
+		if (state == null)
+			throw new Exception("Must set '" + STATE + "' metadata field!");
+		try {
+			QueuePage page = null; 
+			do {
+				page = engine.getPage(new PageInfo(50, PageInfo.FIRST_PAGE), StateUtils.getStateByName(engine.getSupportedStates(), state));
+				for (ProcessorStub stub : page.getStubs()) 
+					engine.deleteWorkflow(stub.getInstanceId());
+			}while (!page.getPageInfo().isLastPage());		
+		}catch (Exception e) {
+			throw new Exception("Failed to delete workflows by state '" + state + "' : " + e.getMessage(), e);
+		}
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/GeneratePerformanceReport.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/GeneratePerformanceReport.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/GeneratePerformanceReport.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/GeneratePerformanceReport.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event;
+
+//JDK imports
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Date;
+import java.util.HashMap;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.page.PageInfo;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineLocal;
+import org.apache.oodt.cas.workflow.page.QueuePage;
+import org.apache.oodt.cas.workflow.processor.ProcessorStub;
+import org.apache.oodt.cas.workflow.server.action.GetSortedPage;
+import org.apache.oodt.cas.workflow.state.done.SuccessState;
+
+/**
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Event which writes out workflow performance to a file
+ *
+ */
+public class GeneratePerformanceReport extends WorkflowEngineEvent {
+
+	private File reportFile;
+	
+	@Override
+	public void performAction(WorkflowEngineLocal engine, Metadata inputMetadata)
+			throws Exception {
+		Report overallReport = new Report();
+		HashMap<String, Report> workflowBasedReports = new HashMap<String, Report>();
+		QueuePage firstWorkflow = engine.getPage(new PageInfo(1, PageInfo.FIRST_PAGE), GetSortedPage.COMPARATOR.CreationDate.getComparator());
+		QueuePage page = engine.getPage(new PageInfo(Integer.MAX_VALUE, PageInfo.FIRST_PAGE), new SuccessState(""));
+		for (ProcessorStub stub : page.getStubs()) {
+			long runtime = (stub.getProcessorInfo().getCompletionDate().getTime() - stub.getProcessorInfo().getCreationDate().getTime()) / 1000 / 60;
+			overallReport.minRuntime = Math.min(overallReport.minRuntime, runtime);
+			overallReport.maxRuntime = Math.max(overallReport.maxRuntime, runtime);
+			overallReport.totalRuntime += runtime; 
+			overallReport.totalWorkflows++; 
+			Report workflowReport = workflowBasedReports.get(stub.getModelId());
+			if (workflowReport == null)
+				workflowReport = new Report();
+			workflowReport.minRuntime = Math.min(workflowReport.minRuntime, runtime);
+			workflowReport.maxRuntime = Math.max(workflowReport.maxRuntime, runtime);
+			workflowReport.totalRuntime += runtime; 
+			workflowReport.totalWorkflows++; 
+			workflowBasedReports.put(stub.getModelId(), workflowReport);
+		}
+		PrintStream ps = null;
+		try {
+			ps = new PrintStream(new FileOutputStream(this.reportFile));
+			ps.println();
+			ps.println("Performance Report Generated on: " + new Date());
+			if (overallReport.totalWorkflows > 0) {
+				double hoursUp = (new Date().getTime() - firstWorkflow.getStubs().get(0).getProcessorInfo().getCreationDate().getTime()) / 1000.0 / 60.0 / 60.0;
+				ps.println("Time Up: " + new DecimalFormat("#.###").format(hoursUp) + " hours");
+				ps.println();
+				ps.println("**** Overall Performance Report ****");
+				ps.println(" - Total Workflows Analyzed: " + overallReport.totalWorkflows);
+				ps.println(" - Workflow Throughput: " + (int) ((double) overallReport.totalWorkflows / hoursUp) + " workflows per hour");
+				ps.println(" - Max Workflow Runtime: " + overallReport.maxRuntime + " mins");
+				ps.println(" - Min Workflow Runtime: " + overallReport.minRuntime + " mins");
+				ps.println(" - Average Workflow Runtime: " + (overallReport.totalRuntime / overallReport.totalWorkflows) + " mins");
+				for (String modelId : workflowBasedReports.keySet()) {
+					Report workflowReport = workflowBasedReports.get(modelId);
+					ps.println();
+					ps.println("**** '" + modelId + "' Performance Report ****");
+					ps.println(" - Total Workflows Analyzed: " + workflowReport.totalWorkflows);
+					ps.println(" - Workflow Throughput: " + (int) ((double) workflowReport.totalWorkflows / hoursUp) + " workflows per hour");
+					ps.println(" - Max Workflow Runtime: " + workflowReport.maxRuntime + " mins");
+					ps.println(" - Min Workflow Runtime: " + workflowReport.minRuntime + " mins");
+					ps.println(" - Average Workflow Runtime: " + (workflowReport.totalRuntime / workflowReport.totalWorkflows) + " mins");
+				}
+			}else {
+				ps.println("No Workflows In Success State");
+			}
+			ps.println();
+		}catch (Exception e) {
+			throw new Exception("Failed generate report file : " + e.getMessage(), e);
+		}finally {
+			try { ps.close(); } catch(Exception e) {}
+		}
+	}
+
+	public void setReportFile(String reportFile) {
+		this.reportFile = new File(reportFile).getAbsoluteFile();
+		if (!this.reportFile.getParentFile().exists())
+			this.reportFile.getParentFile().mkdirs();
+	}
+	
+	private class Report {
+		long totalWorkflows = 0;
+		long totalRuntime = 0;
+		long minRuntime = Long.MAX_VALUE;
+		long maxRuntime = 0;
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/StartWorkflowsEvent.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/StartWorkflowsEvent.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/StartWorkflowsEvent.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/StartWorkflowsEvent.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event;
+
+//JDK imports
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//JAVAX imports
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+//DOM imports
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineLocal;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	Event which triggers a list of model ids (workflows)
+ *
+ */
+public class StartWorkflowsEvent extends WorkflowEngineEvent {
+
+	private static final Logger LOG = Logger.getLogger(StartWorkflowsEvent.class.getName());
+	
+	private String xmlFile;
+	private List<String> modelIds;
+	
+	@Override
+	public void performAction(WorkflowEngineLocal engine, Metadata inputMetadata) throws Exception {
+		if (xmlFile != null) {
+			for (String modelId : getModelIdsFromXmlFile(xmlFile))
+				engine.startWorkflow(modelId, inputMetadata);
+		}else if (modelIds != null) {
+			for (String modelId : this.modelIds)
+				engine.startWorkflow(modelId, inputMetadata);
+		}else {
+			throw new Exception("Must Specify an xml file or list of model ids!");
+		}
+	}
+
+	public String getXmlFile() {
+		return xmlFile;
+	}
+
+	public void setXmlFile(String xmlFile) {
+		this.xmlFile = xmlFile;
+	}
+
+	public List<String> getModelIds() {
+		return modelIds;
+	}
+
+	public void setModelIds(List<String> modelIds) {
+		this.modelIds = modelIds;
+	}
+
+	private List<String> getModelIdsFromXmlFile(String xmlFile) {
+		List<String> modelIdsFromXml = new Vector<String>();
+
+        Document eventRoot = getDocumentRoot(new File(xmlFile).getAbsolutePath());
+
+        Element eventElement = eventRoot.getDocumentElement();
+
+        NodeList eventElemList = eventElement.getElementsByTagName("event");
+        if (eventElemList != null && eventElemList.getLength() > 0) {
+            for (int j = 0; j < eventElemList.getLength(); j++) {
+                Element eventElem = (Element) eventElemList.item(j);
+
+                NodeList workflowNodeList = eventElem.getElementsByTagName("workflow");
+                if (workflowNodeList != null && workflowNodeList.getLength() > 0) {
+                    for (int k = 0; k < workflowNodeList
+                            .getLength(); k++) {
+                        Element workflowElement = (Element) workflowNodeList
+                                .item(k);
+                        modelIdsFromXml.add(workflowElement.getAttribute("id"));
+                    }
+                }
+            }
+        }
+
+        return modelIdsFromXml;
+	}
+	
+    private Document getDocumentRoot(String xmlFile) {
+        // open up the XML file
+        DocumentBuilderFactory factory = null;
+        DocumentBuilder parser = null;
+        Document document = null;
+        InputSource inputSource = null;
+
+        InputStream xmlInputStream = null;
+
+        try {
+            xmlInputStream = new File(xmlFile).toURL().openStream();
+        } catch (IOException e) {
+            LOG.log(Level.WARNING,
+                    "IOException when getting input stream from [" + xmlFile
+                            + "]: returning null document root");
+            return null;
+        }
+
+        inputSource = new InputSource(xmlInputStream);
+
+        try {
+            factory = DocumentBuilderFactory.newInstance();
+            parser = factory.newDocumentBuilder();
+            document = parser.parse(inputSource);
+        } catch (Exception e) {
+            LOG.warning("Unable to parse xml file [" + xmlFile + "]."
+                    + "Reason is [" + e + "]");
+            return null;
+        }
+
+        return document;
+    }
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/WorkflowEngineEvent.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/WorkflowEngineEvent.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/WorkflowEngineEvent.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/WorkflowEngineEvent.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event;
+
+//OODT imports
+import org.apache.oodt.commons.spring.SpringSetIdInjectionType;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.WorkflowEngineLocal;
+import org.apache.oodt.cas.workflow.precondition.PreConditionedComponent;
+
+//Spring imports
+import org.springframework.beans.factory.annotation.Required;
+
+/**
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	Events which can be registered with WorkflowEngine. An Event represents
+ *	a series of actions performed on a WorkflowEngine. Events are run on
+ *	the server side.
+ *
+ */
+public abstract class WorkflowEngineEvent extends PreConditionedComponent implements SpringSetIdInjectionType {
+
+	protected String id;
+	protected String description;
+	
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+	
+	@Required
+	public void setDescription(String description) {
+		this.description = description;
+	}
+	
+	public String getDescription() {
+		return this.description;
+	}
+	
+	public abstract void performAction(WorkflowEngineLocal engine, Metadata inputMetadata) throws Exception;
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepository.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepository.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepository.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepository.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event.repo;
+
+//JDK imports
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.event.WorkflowEngineEvent;
+
+//Spring imports
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ *
+ *	Spring based Event repo - READ ONLY
+ *	
+ */
+public class SpringBasedEngineEventRepository implements
+		WorkflowEngineEventRepository {
+
+	private Map<String, WorkflowEngineEvent> events;
+	
+	public SpringBasedEngineEventRepository(String beanRepo) {
+		events = new FileSystemXmlApplicationContext(new String[] { beanRepo }).getBeansOfType(WorkflowEngineEvent.class);
+	}
+	
+	public WorkflowEngineEvent getEventById(String id) throws Exception {
+		return events.get(id);
+	}
+
+	public List<String> getEventIds() throws Exception {
+		return new Vector<String>(events.keySet());
+	}
+
+	public void storeEvent(WorkflowEngineEvent event) throws Exception {
+		throw new Exception("Modification not allowed during runtime");
+	}
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepositoryFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepositoryFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/SpringBasedEngineEventRepositoryFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event.repo;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Factory for creating Spring event repos
+ *
+ */
+public class SpringBasedEngineEventRepositoryFactory implements
+		WorkflowEngineEventRepositoryFactory {
+
+	private String beanRepo;
+	
+	public SpringBasedEngineEventRepository createRepo() {
+		return new SpringBasedEngineEventRepository(this.beanRepo);
+	}
+
+	public String getBeanRepo() {
+		return beanRepo;
+	}
+
+	public void setBeanRepo(String beanRepo) {
+		this.beanRepo = beanRepo;
+	}
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepository.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepository.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepository.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepository.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event.repo;
+
+//JDK imports
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.event.WorkflowEngineEvent;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Repo for storing and retrieving events
+ *
+ */
+public interface WorkflowEngineEventRepository {
+
+	public void storeEvent(WorkflowEngineEvent event) throws Exception;
+	
+	public List<String> getEventIds() throws Exception;
+	
+	public WorkflowEngineEvent getEventById(String id) throws Exception;
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepositoryFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepositoryFactory.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepositoryFactory.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/event/repo/WorkflowEngineEventRepositoryFactory.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.event.repo;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Factory for creating Event Repos
+ *
+ */
+public interface WorkflowEngineEventRepositoryFactory {
+
+	public WorkflowEngineEventRepository createRepo();
+	
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/EngineException.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/EngineException.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/EngineException.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/EngineException.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.exceptions;
+
+/**
+ * @author mattmann
+ * @version $Revision$
+ * 
+ * <p>An exception throw by the Workflow Engine.</p>
+ * 
+ */
+public class EngineException extends Exception {
+
+    /* serial version UID */
+    private static final long serialVersionUID = 3690762773826910000L;
+
+    /**
+     * 
+     */
+    public EngineException() {
+        super();
+        // TODO Auto-generated constructor stub
+    }
+
+    /**
+     * @param message
+     */
+    public EngineException(String message) {
+        super(message);
+        // TODO Auto-generated constructor stub
+    }
+
+    /**
+     * @param cause
+     */
+    public EngineException(Throwable cause) {
+        super(cause);
+        // TODO Auto-generated constructor stub
+    }
+
+    /**
+     * @param message
+     * @param cause
+     */
+    public EngineException(String message, Throwable cause) {
+        super(message, cause);
+        // TODO Auto-generated constructor stub
+    }
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/WorkflowTaskInstanceException.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/WorkflowTaskInstanceException.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/WorkflowTaskInstanceException.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/exceptions/WorkflowTaskInstanceException.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.exceptions;
+
+/**
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>An exception thrown by {@link WorkflowTaskInstance}</p>
+ */
+public class WorkflowTaskInstanceException extends Exception {
+
+    private static final long serialVersionUID = 2666346581066070156L;
+
+    public WorkflowTaskInstanceException() {}
+
+    public WorkflowTaskInstanceException(String msg) {
+        super(msg);
+    }
+
+    public WorkflowTaskInstanceException(Throwable throwable) {
+        super(throwable);
+    }
+
+    public WorkflowTaskInstanceException(String msg, Throwable throwable) {
+        super(msg, throwable);
+    }
+
+    
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/HelloWorldInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/HelloWorldInstance.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/HelloWorldInstance.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/HelloWorldInstance.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.instance;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.metadata.ControlMetadata;
+import org.apache.oodt.cas.workflow.state.results.ResultsState;
+import org.apache.oodt.cas.workflow.state.results.ResultsSuccessState;
+
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * Dummy HelloWorld TaskInstance
+ *
+ */
+public class HelloWorldInstance extends TaskInstance {
+
+	@Override
+	protected ResultsState performExecution(ControlMetadata crtlMetadata) {
+		System.out.println("Hello World");
+		return new ResultsSuccessState("");
+	}
+
+}

Added: oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/RequiredMetadata.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/RequiredMetadata.java?rev=1052143&view=auto
==============================================================================
--- oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/RequiredMetadata.java (added)
+++ oodt/branches/wengine-branch/src/main/java/org/apache/oodt/cas/workflow/instance/RequiredMetadata.java Thu Dec 23 02:44:23 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.instance;
+
+//JDK imports
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.metadata.ControlMetadata;
+import org.apache.oodt.cas.workflow.state.results.ResultsFailureState;
+import org.apache.oodt.cas.workflow.state.results.ResultsState;
+import org.apache.oodt.cas.workflow.state.results.ResultsSuccessState;
+
+/**
+ * 
+ * @author bfoster
+ *
+ */
+public class RequiredMetadata extends TaskInstance {
+	
+	public static final String REQUIRED_METADATA = "RequiredMetadata";
+	
+	@Override
+	protected ResultsState performExecution(ControlMetadata crtlMetadata) {
+		List<String> requiredMetadata = crtlMetadata.getAllMetadata(REQUIRED_METADATA);
+		if (requiredMetadata != null && requiredMetadata.size() > 0) {
+			for (String requiredKey : requiredMetadata)
+				if (crtlMetadata.getMetadata(requiredKey) == null)
+					return new ResultsFailureState("Missing required metadata key '" + requiredKey + "'");
+			return new ResultsSuccessState("All Required Metadata has been accounted for");
+		}else {
+			return new ResultsSuccessState("No Required Metadata");
+		}
+	}
+
+}



Mime
View raw message