airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: Fixed issues with experiment cancel
Date Fri, 18 Sep 2015 20:21:43 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 9ce24a559 -> ba8538ddf


Fixed issues with experiment cancel


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/44e5ed14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/44e5ed14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/44e5ed14

Branch: refs/heads/master
Commit: 44e5ed14adbe756fcdcd7a541656dd968a61f642
Parents: 1302c30
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Fri Sep 18 16:21:06 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Fri Sep 18 16:21:06 2015 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  |  8 +--
 .../impl/watcher/CancelRequestWatcherImpl.java  | 27 +++++++--
 .../watcher/RedeliveryRequestWatcherImpl.java   | 28 +++++++--
 .../airavata/gfac/server/GfacServerHandler.java |  5 +-
 .../server/OrchestratorServerHandler.java       | 63 +++++++++++++++++---
 .../orchestrator/util/OrchestratorUtils.java    |  6 ++
 6 files changed, 111 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index bb53802..fbba17b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -301,12 +301,12 @@ public abstract class Factory {
 		return getMonitorService(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
 	}
 
-	public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher() {
-		return new RedeliveryRequestWatcherImpl();
+	public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher(String experimentId,
String processId) {
+		return new RedeliveryRequestWatcherImpl(experimentId, processId);
 	}
 
-	public static CancelRequestWatcher getCancelRequestWatcher() {
-		return new CancelRequestWatcherImpl();
+	public static CancelRequestWatcher getCancelRequestWatcher(String experimentId, String processId)
{
+		return new CancelRequestWatcherImpl(experimentId, processId);
 	}
 
 	public static Session getSSHSession(AuthenticationInfo authenticationInfo, ServerInfo serverInfo)
throws AiravataException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 58d2817..ba87587 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -33,6 +33,13 @@ import org.slf4j.LoggerFactory;
 
 public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 	private static final Logger log = LoggerFactory.getLogger(CancelRequestWatcherImpl.class);
+	private final String processId;
+	private final String experimentId;
+
+	public CancelRequestWatcherImpl(String experimentId, String processId) {
+		this.experimentId = experimentId;
+		this.processId = processId;
+	}
 
 	@Override
 	public void process(WatchedEvent watchedEvent) throws Exception {
@@ -43,15 +50,15 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher
{
 		switch (type) {
 			case NodeDataChanged:
 				byte[] bytes = curatorClient.getData().forPath(path);
-				String processId = path.substring(path.lastIndexOf("/") + 1);
 				String action = new String(bytes);
 				if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
 					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
 					if (processContext != null) {
 						processContext.setCancel(true);
-						log.info("procesId : {}, Cancelling process", processId);
+						log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
 					} else {
-						log.info("Cancel request came for processId {} but couldn't find process context");
+						log.info("expId: {}, Cancel request came for processId {} but couldn't find process
context",
+								experimentId, processId);
 					}
 				} else {
 					curatorClient.getData().usingWatcher(this).forPath(path);
@@ -59,14 +66,24 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher
{
 				break;
 			case NodeDeleted:
 				//end of experiment execution, ignore this event
+				log.info("expId: {}, cancel watcher trigger for process {} with event type {}", experimentId,
+						processId, type.name());
 				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
 			case None:
-				curatorClient.getData().usingWatcher(this).forPath(path);
+				log.info("expId: {}, Cancel watcher trigger for process {} with event type {}", experimentId,
+						processId, type.name());
+				if (path != null) {
+					curatorClient.getData().usingWatcher(this).forPath(path);
+				}
 				break;
 			default:
-				curatorClient.getData().usingWatcher(this).forPath(path);
+				log.info("expId: {}, Cancel watcher trigger for process {} with event type {}", experimentId,
+						processId, type.name());
+				if (path != null) {
+					curatorClient.getData().usingWatcher(this).forPath(path);
+				}
 				break;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index dc5317f..8341855 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -32,6 +32,13 @@ import org.slf4j.Logger;
 public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher {
 
 	private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedeliveryRequestWatcherImpl.class);
+	private final String processId;
+	private final String experimentId;
+
+	public RedeliveryRequestWatcherImpl(String experimentId, String procesId) {
+		this.experimentId = experimentId;
+		this.processId = procesId;
+	}
 
 	@Override
 	public void process(WatchedEvent watchedEvent) throws Exception {
@@ -43,31 +50,40 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher
{
 			case NodeDataChanged:
 				byte[] bytes = curatorClient.getData().forPath(path);
 				String serverName = new String(bytes);
-				String processId = path.substring(path.lastIndexOf("/") + 1);
 				if (ServerSettings.getGFacServerName().trim().equals(serverName)) {
 					curatorClient.getData().usingWatcher(this).forPath(path);
-					log.info("processId: {}, change data with same server name : {}" , processId, serverName);
+					log.info("processId: {},event type {}, change data with same server name : {}", processId,
+							eventType, serverName);
 				} else {
 					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
 					if (processContext != null) {
 						processContext.setHandOver(true);
-						log.info("procesId : {}, handing over to new server instance : {}", processId, serverName);
+						log.info("processId : {}, event type {}, handing over to new server instance : {}",
processId,
+								eventType, serverName);
 					} else {
-						log.info("Redelivery request came for processId {} but couldn't find process context");
+						log.info("Redelivery request came for processId {}, with event type {}, but couldn't
find " +
+								"process context", processId, eventType.name());
 					}
 				}
 				break;
 			case NodeDeleted:
 				//end of experiment execution, ignore this event
+				log.info("Redelivery watcher trigger for process {} with event type {}", processId, eventType.name());
 				break;
 			case NodeCreated:
 			case NodeChildrenChanged:
 			case None:
-				curatorClient.getData().usingWatcher(this).forPath(path);
+				if (path != null) {
+					curatorClient.getData().usingWatcher(this).forPath(path);
+					log.info("Redelivery watcher trigger for process {} with event type {}", processId,
eventType.name());
+				}
 				break;
 				// not yet implemented
 			default:
-				curatorClient.getData().usingWatcher(this).forPath(path);
+				if (path != null) {
+					curatorClient.getData().usingWatcher(this).forPath(path);
+					log.info("Redelivery watcher trigger for process {} with event type {}", processId,
eventType.name());
+				}
 				break;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 59b7a54..53665fd 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -312,7 +312,7 @@ public class GfacServerHandler implements GfacService.Iface {
 		String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
 		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
-		curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
+		curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId,
processId)).forPath(zkProcessNodePath);
 
 		// create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag
 		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
@@ -331,8 +331,7 @@ public class GfacServerHandler implements GfacService.Iface {
 
 		// create /experiments/{experimentId}/cancel node and set watcher for data changes
 		String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
-		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode);
+		curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath
(experimentCancelNode);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 109faef..6c04ed7 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -154,7 +154,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
                 log.info("Couldn't identify the gateway Id using the credential token, Use
default gateway Id");
 //                throw new AiravataException("Couldn't identify the gateway Id using the
credential token");
             }
-            ExperimentType executionType = experiment.getExperimentType();
+	        String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
+	        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
+	        String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+	        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+
+	        ExperimentType executionType = experiment.getExperimentType();
             if (executionType == ExperimentType.SINGLE_APPLICATION) {
                 //its an single application execution experiment
                 log.debug(experimentId, "Launching single application experiment {}.", experimentId);
@@ -318,8 +323,14 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 	    if (stat != null) {
 		    curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST
 				    .getBytes());
+		    ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING);
+		    status.setReason("Experiment cancel request processed");
+		    status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+		    OrchestratorUtils.updageExperimentStatus(experimentId, status);
+		    log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState());
+		    return true;
 	    }
-	    return true;
+	    return false;
     }
 
     private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken)
throws TException {
@@ -419,8 +430,20 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 //						case CREATED:
 //						case VALIDATED:
 						case STARTED:
-							status.setState(ExperimentState.EXECUTING);
-							status.setReason("process  started");
+							try {
+								ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+										.getExperimentId());
+								if (stat.getState() == ExperimentState.CANCELING) {
+									status.setState(ExperimentState.CANCELING);
+									status.setReason("Process competed but experiment cancelling is triggered");
+								} else {
+									status.setState(ExperimentState.EXECUTING);
+									status.setReason("process  started");
+								}
+							} catch (RegistryException e) {
+								status.setState(ExperimentState.EXECUTING);
+								status.setReason("process  started");
+							}
 							break;
 //						case PRE_PROCESSING:
 //							break;
@@ -433,12 +456,36 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
 //						case CANCELLING:
 //							break;
 						case COMPLETED:
-							status.setState(ExperimentState.COMPLETED);
-							status.setReason("process  completed");
+							try {
+								ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+										.getExperimentId());
+								if (stat.getState() == ExperimentState.CANCELING) {
+									status.setState(ExperimentState.CANCELED);
+									status.setReason("Process competed but experiment cancelling is triggered");
+								} else {
+									status.setState(ExperimentState.COMPLETED);
+									status.setReason("process  completed");
+								}
+							} catch (RegistryException e) {
+								status.setState(ExperimentState.COMPLETED);
+								status.setReason("process  completed");
+							}
 							break;
 						case FAILED:
-							status.setState(ExperimentState.FAILED);
-							status.setReason("process  failed");
+							try {
+								ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+										.getExperimentId());
+								if (stat.getState() == ExperimentState.CANCELING) {
+									status.setState(ExperimentState.CANCELED);
+									status.setReason("Process failed but experiment cancelling is triggered");
+								} else {
+									status.setState(ExperimentState.FAILED);
+									status.setReason("process  failed");
+								}
+							} catch (RegistryException e) {
+								status.setState(ExperimentState.FAILED);
+								status.setReason("process  failed");
+							}
 							break;
 						case CANCELED:
 							status.setState(ExperimentState.CANCELED);

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
index 4d4b04e..834d3b6 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
@@ -40,4 +40,10 @@ public class OrchestratorUtils {
 			log.error("expId : " + experimentId + " Error while updating experiment status to " +
status.toString(), e);
 		}
 	}
+
+	public static ExperimentStatus getExperimentStatus(String experimentId) throws RegistryException
{
+		return ((ExperimentStatus) RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType
+						.EXPERIMENT_STATUS,
+				experimentId));
+	}
 }


Mime
View raw message