airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Implemented process cancel, AIRAVATA-1798
Date Wed, 02 Sep 2015 21:22:53 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 6938f907f -> 662396778


Implemented process cancel, AIRAVATA-1798


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/66239677
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/66239677
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/66239677

Branch: refs/heads/master
Commit: 6623967781bb7e50a86bf37051a544396c4583eb
Parents: 6938f90
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Wed Sep 2 17:22:46 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Wed Sep 2 17:22:46 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 17 +++++
 .../airavata/gfac/core/context/GFacContext.java |  2 +-
 .../gfac/core/context/ProcessContext.java       | 14 +++-
 .../airavata/gfac/impl/GFacEngineImpl.java      | 72 +++++++++++++++-----
 .../apache/airavata/gfac/impl/GFacWorker.java   | 16 +++--
 .../impl/watcher/CancelRequestWatcherImpl.java  | 38 +++++++++++
 .../watcher/RedeliveryRequestWatcherImpl.java   |  5 +-
 7 files changed, 140 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 5880fcc..784d214 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -1134,4 +1134,21 @@ public class GFacUtils {
 		}
 	}
 
+	public static void handleProcessInterrupt(ProcessContext processContext) throws GFacException
{
+		if (processContext.isCancel()) {
+			ProcessStatus pStatus = new ProcessStatus(ProcessState.CANCELLING);
+			pStatus.setReason("Process Cancel triggered");
+			saveAndPublishProcessStatus(processContext);
+			// do cancel operation here
+
+			pStatus.setState(ProcessState.CANCELED);
+			saveAndPublishProcessStatus(processContext);
+		}else if (processContext.isHandOver()) {
+
+		} else {
+			log.error("expId: {}, processId: {} :- Unknown process interrupt", processContext.getExperimentId(),
+					processContext.getProcessId());
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
index 9bca09f..36edd07 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
@@ -54,7 +54,7 @@ public class GFacContext {
 		return processes.get(processId);
 	}
 
-	public void remoteProcess(String processId) {
+	public void removeProcess(String processId) {
 		processes.remove(processId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 553a388..8e6056e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -78,7 +78,7 @@ public class ProcessContext {
 	private MonitorMode monitorMode;
 	private ResourceJobManager resourceJobManager;
 	private boolean handOver;
-
+	private boolean cancel;
 	/**
 	 * Note: process context property use lazy loading approach. In runtime you will see some
properties as null
 	 * unless you have access it previously. Once that property access using the api,it will
be set to correct value.
@@ -349,4 +349,16 @@ public class ProcessContext {
 	public void setHandOver(boolean handOver) {
 		this.handOver = handOver;
 	}
+
+	public boolean isCancel() {
+		return cancel;
+	}
+
+	public void setCancel(boolean cancel) {
+		this.cancel = cancel;
+	}
+
+	public boolean isInterrupted(){
+		return this.cancel || this.handOver;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 5a63871..39dffb9 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -127,7 +127,8 @@ public class GFacEngineImpl implements GFacEngine {
 
 	@Override
 	public void executeProcess(ProcessContext processContext) throws GFacException {
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return;
 		}
 //		List<TaskContext> taskChain = new ArrayList<>();
@@ -139,20 +140,23 @@ public class GFacEngineImpl implements GFacEngine {
 		// exit if process is handed orver to another instance while job submission.
 		if (executeJobSubmission(processContext)) return;
 //		processContext.setTaskChain(taskChain);
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return;
 		}
 	}
 
 	private boolean executeJobSubmission(ProcessContext processContext) throws GFacException
{
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return true;
 		}
 		TaskContext taskCtx;
 		TaskStatus taskStatus;
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING));
 		JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return true;
 		}
 		GFacUtils.saveAndPublishProcessStatus(processContext);
@@ -163,11 +167,16 @@ public class GFacEngineImpl implements GFacEngine {
 		if (taskStatus.getState() == TaskState.FAILED) {
 			throw new GFacException("Job submission task failed");
 		}
-		return processContext.isHandOver();
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
+			return true;
+		}
+		return false;
 	}
 
 	private boolean inputDataStaging(ProcessContext processContext, boolean recover) throws
GFacException {
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return true;
 		}
 		TaskContext taskCtx;
@@ -178,7 +187,8 @@ public class GFacEngineImpl implements GFacEngine {
 		sortByInputOrder(processInputs);
 		if (processInputs != null) {
 			for (InputDataObjectType processInput : processInputs) {
-				if (processContext.isHandOver()) {
+				if (processContext.isInterrupted()) {
+					GFacUtils.handleProcessInterrupt(processContext);
 					return true;
 				}
 				DataType type = processInput.getType();
@@ -211,11 +221,16 @@ public class GFacEngineImpl implements GFacEngine {
 				}
 			}
 		}
-		return processContext.isHandOver();
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
+			return true;
+		}
+		return false;
 	}
 
 	private boolean configureWorkspace(ProcessContext processContext, boolean recover) throws
GFacException {
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return true;
 		}
 		TaskContext taskCtx;
@@ -234,7 +249,11 @@ public class GFacEngineImpl implements GFacEngine {
 					().name(), taskStatus.getReason());
 			throw new GFacException("Error while environment setup");
 		}
-		return processContext.isHandOver();
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
+			return true;
+		}
+		return false;
 	}
 
 
@@ -266,37 +285,58 @@ public class GFacEngineImpl implements GFacEngine {
 
 	@Override
 	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return;
 		}
 		// exit if process is handed over to another instance while output staging.
 		if (outpuDataStaging(processContext, false)) return;
 
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return;
 		}
 
 		postProcessing(processContext,false);
 
-		if (processContext.isHandOver()) {
-			return;
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 		}
 	}
 
+	/**
+	 *
+	 * @param processContext
+	 * @param recovery
+	 * @return <code>true</code> if you need to interrupt processing <code>false</code>
otherwise.
+	 * @throws GFacException
+	 */
 	private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException
{
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING));
 		GFacUtils.saveAndPublishProcessStatus(processContext);
 //		taskCtx = getEnvCleanupTaskContext(processContext);
-		return processContext.isHandOver();
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
+			return true;
+		}
+		return false;
 	}
 
+	/**
+	 *
+	 * @param processContext
+	 * @param recovery
+	 * @return <code>true</code> if process execution interrupted , <code>false</code>
otherwise.
+	 * @throws GFacException
+	 */
 	private boolean outpuDataStaging(ProcessContext processContext, boolean recovery) throws
GFacException {
 		TaskContext taskCtx;
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
 		GFacUtils.saveAndPublishProcessStatus(processContext);
 		List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
 		for (OutputDataObjectType processOutput : processOutputs) {
-			if (processContext.isHandOver()) {
+			if (processContext.isInterrupted()) {
+				GFacUtils.handleProcessInterrupt(processContext);
 				return true;
 			}
 			DataType type = processOutput.getType();

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 8d3c5e0..49dc45d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -74,10 +74,11 @@ public class GFacWorker implements Runnable {
 
 	@Override
 	public void run() {
-		if (processContext.isHandOver()) {
-			return;
-		}
 		try {
+			if (processContext.isInterrupted()) {
+				GFacUtils.handleProcessInterrupt(processContext);
+				return;
+			}
 			ProcessState processState = processContext.getProcessStatus().getState();
 			switch (processState) {
 				case CREATED:
@@ -113,6 +114,10 @@ public class GFacWorker implements Runnable {
 				default:
 					throw new GFacException("process Id : " + processId + " Couldn't identify process type");
 			}
+			if (processContext.isCancel()) {
+				sendAck();
+				Factory.getGfacContext().removeProcess(processContext.getProcessId());
+			}
 		} catch (GFacException e) {
 			log.error("GFac Worker throws an exception", e);
 			ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
@@ -132,7 +137,7 @@ public class GFacWorker implements Runnable {
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
 		GFacUtils.saveAndPublishProcessStatus(processContext);
 		sendAck();
-		Factory.getGfacContext().remoteProcess(processContext.getProcessId());
+		Factory.getGfacContext().removeProcess(processContext.getProcessId());
 	}
 
 	private void recoverProcessOutflow() throws GFacException {
@@ -151,7 +156,8 @@ public class GFacWorker implements Runnable {
 	}
 
 	private void executeProcess() throws GFacException {
-		if (processContext.isHandOver()) {
+		if (processContext.isInterrupted()) {
+			GFacUtils.handleProcessInterrupt(processContext);
 			return;
 		}
 		engine.executeProcess(processContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index bc49316..bfeac89 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -20,12 +20,50 @@
  */
 package org.apache.airavata.gfac.impl.watcher;
 
+import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CancelRequestWatcherImpl implements CancelRequestWatcher {
+	private static final Logger log = LoggerFactory.getLogger(CancelRequestWatcherImpl.class);
+
 	@Override
 	public void process(WatchedEvent watchedEvent) throws Exception {
 		// this watcher change data in cancel listener node in the experiment node
+		String path = watchedEvent.getPath();
+		Watcher.Event.EventType type = watchedEvent.getType();
+		CuratorFramework curatorClient = Factory.getCuratorClient();
+		switch (type) {
+			case NodeDataChanged:
+				byte[] bytes = curatorClient.getData().forPath(path);
+				String processId = path.substring(path.lastIndexOf("/") + 1);
+				String action = new String(bytes);
+				if (action.equalsIgnoreCase("CANCEL")) {
+					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
+					if (processContext != null) {
+						processContext.setCancel(true);
+						log.info("procesId : {}, Cancelling process", processId);
+					} else {
+						log.info("Cancel request came for processId {} but couldn't find process context");
+					}
+				} else {
+					curatorClient.getData().usingWatcher(this).forPath(path);
+				}
+				break;
+			case NodeCreated:
+			case NodeChildrenChanged:
+			case NodeDeleted:
+				curatorClient.getData().usingWatcher(this).forPath(path);
+				break;
+			default:
+				curatorClient.getData().usingWatcher(this).forPath(path);
+				break;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/66239677/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index c459f0c..4738edb 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -38,9 +38,9 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher
{
 		String path = watchedEvent.getPath();
 		Watcher.Event.EventType eventType = watchedEvent.getType();
 		log.info("Redelivery request came for zk path {} event type {} ", path, eventType.name());
+		CuratorFramework curatorClient = Factory.getCuratorClient();
 		switch (eventType) {
 			case NodeDataChanged:
-				CuratorFramework curatorClient = Factory.getCuratorClient();
 				byte[] bytes = curatorClient.getData().forPath(path);
 				String serverName = new String(bytes);
 				String processId = path.substring(path.lastIndexOf("/") + 1);
@@ -61,8 +61,11 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher
{
 			case NodeCreated:
 			case NodeChildrenChanged:
 			case None:
+				curatorClient.getData().usingWatcher(this).forPath(path);
+				break;
 				// not yet implemented
 			default:
+				curatorClient.getData().usingWatcher(this).forPath(path);
 				break;
 		}
 	}


Mime
View raw message