airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/3] git commit: ading initial version of terminate experiment
Date Thu, 14 Aug 2014 20:44:54 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 404913acd -> d62a957db


ading initial version of terminate experiment


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1c554482
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1c554482
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1c554482

Branch: refs/heads/master
Commit: 1c554482790423daa242325d1167d5df8b326409
Parents: 538e37f
Author: lahiru <lahiru@apache.org>
Authored: Thu Aug 14 22:05:53 2014 +0530
Committer: lahiru <lahiru@apache.org>
Committed: Thu Aug 14 22:05:53 2014 +0530

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |   6 +
 .../server/OrchestratorServerHandler.java       | 153 +++++++++++++++----
 .../core/impl/GFACEmbeddedJobSubmitter.java     |   4 +
 .../core/impl/GFACServiceJobSubmitter.java      |  15 +-
 .../orchestrator/core/job/JobSubmitter.java     |   9 ++
 .../airavata/orchestrator/cpi/Orchestrator.java |   7 +-
 .../cpi/impl/SimpleOrchestratorImpl.java        |   3 +-
 7 files changed, 160 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 967577c..0729478 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -795,6 +795,12 @@ public class CreateLaunchExperiment {
             String sshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             String gsisshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             client.launchExperiment(expId, sshTokenId);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            client.terminateExperiment(expId);
         } catch (ExperimentNotFoundException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new ExperimentNotFoundException(e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 3bffe23..d371d4c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -47,11 +47,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.error.LaunchValidationException;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
@@ -188,29 +184,29 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 	 * @param experimentId
 	 */
 	public boolean launchExperiment(String experimentId) throws TException {
-		Experiment experiment = null;
-		try {
-			List<String> ids = registry.getIds(
+        Experiment experiment = null; // this will inside the bottom catch statement
+        try {
+            experiment = (Experiment) registry.get(
+                    RegistryModelType.EXPERIMENT, experimentId);
+            if (experiment == null) {
+                log.error("Error retrieving the Experiment by the given experimentID: "
+                        + experimentId);
+                return false;
+            }
+            List<String> ids = registry.getIds(
 					RegistryModelType.WORKFLOW_NODE_DETAIL,
 					WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-			for (String workflowNodeId : ids) {
-				WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+            for (String workflowNodeId : ids) {
+                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
 						.get(RegistryModelType.WORKFLOW_NODE_DETAIL,
 								workflowNodeId);
-				List<Object> taskDetailList = registry.get(
+                List<Object> taskDetailList = registry.get(
 						RegistryModelType.TASK_DETAIL,
 						TaskDetailConstants.NODE_ID, workflowNodeId);
-				for (Object o : taskDetailList) {
-					TaskDetails taskID = (TaskDetails) o;
-					// iterate through all the generated tasks and performs the
-					// job submisssion+monitoring
-					experiment = (Experiment) registry.get(
-							RegistryModelType.EXPERIMENT, experimentId);
-					if (experiment == null) {
-						log.error("Error retrieving the Experiment by the given experimentID: "
-								+ experimentId);
-						return false;
-					}
+                for (Object o : taskDetailList) {
+                    TaskDetails taskID = (TaskDetails) o;
+                    // iterate through all the generated tasks and performs the
+                    // job submisssion+monitoring
 					ExperimentStatus status = new ExperimentStatus();
 					status.setExperimentState(ExperimentState.LAUNCHED);
 					status.setTimeOfStateChange(Calendar.getInstance()
@@ -309,13 +305,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 	 * @throws TException
 	 */
 	public boolean terminateExperiment(String experimentId) throws TException {
-		try {
-			orchestrator.cancelExperiment(experimentId);
-		} catch (OrchestratorException e) {
-			log.error("Error canceling experiment " + experimentId, e);
-			return false;
-		}
-		return true;
+        return validateStatesAndCancel(experimentId);
 	}
 
 	/**
@@ -516,4 +506,109 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 		String selectedModuleId=applicationModules.get(0);
 		return selectedModuleId;
 	}
+
+    private boolean validateStatesAndCancel(String experimentId)throws TException{
+        try {
+            Experiment experiment = (Experiment) registry.get(
+                    RegistryModelType.EXPERIMENT, experimentId);
+            if (experiment == null) {
+                log.error("Error retrieving the Experiment by the given experimentID: "
+                        + experimentId);
+                throw new OrchestratorException("Error retrieving the Experiment by the given
experimentID:\n" +
+                        experimentId);
+            }
+            switch (experiment.getExperimentStatus().getExperimentState()){
+                case COMPLETED:
+                    throw new OrchestratorException("Experiment is already finished cannot
cancel the experiment");
+                case CANCELED:
+                    throw new OrchestratorException("Experiment is already canceled, cannot
perform cancel again !!!");
+                case CANCELING:
+                    throw new OrchestratorException("Experiment is  cancelling, cannot perform
cancel again !!!!");
+                case SUSPENDED:
+                    throw new OrchestratorException("Experiment is  suspended, cannot perform
cancel !!!!");
+                case FAILED:
+                    throw new OrchestratorException("Experiment is  failed,cannot perform
cancel !!!!");
+                case UNKNOWN:
+                    throw new OrchestratorException("Experiment is inconsistent,cannot perform
cancel, !!!!");
+            }
+
+            ExperimentStatus status = new ExperimentStatus();
+            status.setExperimentState(ExperimentState.CANCELING);
+            status.setTimeOfStateChange(Calendar.getInstance()
+                    .getTimeInMillis());
+            experiment.setExperimentStatus(status);
+            registry.update(RegistryModelType.EXPERIMENT, experiment,
+                    experimentId);
+
+            List<String> ids = registry.getIds(
+                    RegistryModelType.WORKFLOW_NODE_DETAIL,
+                    WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+            for (String workflowNodeId : ids) {
+                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+                        .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+                                workflowNodeId);
+                if (workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue()
> 1) {
+                    log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot
mark as cancelled, because " +
+                            "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
+                    continue; // this continue is very useful not to process deeper loops
if the upper layers have non-cancel states
+                }else {
+                    WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+                    workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
+                    workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                            .getTimeInMillis());
+                    workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+                    registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+                            workflowNodeId);
+                }
+                List<Object> taskDetailList = registry.get(
+                        RegistryModelType.TASK_DETAIL,
+                        TaskDetailConstants.NODE_ID, workflowNodeId);
+                for (Object o : taskDetailList) {
+                    TaskDetails taskDetails = (TaskDetails) o;
+                    TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+                    if(taskStatus.getExecutionState().getValue()>7){
+                        log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark
as cancelled, because " +
+                                "current task state is "+((TaskDetails) o).getTaskStatus().getExecutionState().toString());
+                        continue;// this continue is very useful not to process deeper loops
if the upper layers have non-cancel states
+                    }else{
+                        taskStatus.setExecutionState(TaskState.CANCELING);
+                        taskStatus.setTimeOfStateChange(Calendar.getInstance()
+                                .getTimeInMillis());
+                        taskDetails.setTaskStatus(taskStatus);
+                        registry.update(RegistryModelType.TASK_DETAIL, o,
+                                taskDetails);
+                    }
+                    // iterate through all the generated tasks and performs the
+                    // job submisssion+monitoring
+                    // launching the experiment
+                    orchestrator.cancelExperiment(experiment,
+                            workflowNodeDetail, taskDetails, null);
+                    taskStatus.setExecutionState(TaskState.CANCELED);
+                    taskStatus.setTimeOfStateChange(Calendar.getInstance()
+                            .getTimeInMillis());
+                    taskDetails.setTaskStatus(taskStatus);
+                    registry.update(RegistryModelType.TASK_DETAIL, o,
+                            taskDetails);
+                }
+                WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+                workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
+                workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                        .getTimeInMillis());
+                workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+                registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+                        workflowNodeId);
+            }
+            status = new ExperimentStatus();
+            status.setExperimentState(ExperimentState.CANCELED);
+            status.setTimeOfStateChange(Calendar.getInstance()
+                    .getTimeInMillis());
+            experiment.setExperimentStatus(status);
+            registry.update(RegistryModelType.EXPERIMENT, experiment,
+                    experimentId);
+
+        } catch (Exception e) {
+            throw new TException(e);
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index ff6b5e8..cde0135 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -76,6 +76,10 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
         }
     }
 
+    public boolean terminate(String experimentID, String taskID) throws OrchestratorException
{
+        return false;
+    }
+
     public GFac getGfac() {
         return gfac;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index a2c153e..0ca95ec 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -46,6 +46,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.naming.OperationNotSupportedException;
+
 /*
  * this class is responsible for submitting a job to gfac in service mode,
  * it will select a gfac instance based on the incoming request and submit to that
@@ -88,10 +90,9 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher {
 			List<String> children = zk.getChildren(gfacServer, this);
 			
 			if (children.size() == 0) {
-				// Zookeeper data need cleaning
-				GfacService.Client localhost = GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST),
Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)));
-				return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
-			} else {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac instance to route
the request");
+            } else {
 				String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
 				// here we are not using an index because the getChildren does not return the same order
everytime
 				String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild,
false, null));
@@ -122,7 +123,11 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher
{
 		return false;
 	}
 
-	synchronized public void process(WatchedEvent event) {
+    public boolean terminate(String experimentID, String taskID) throws OrchestratorException
{
+        throw new OrchestratorException(new OperationNotSupportedException("terminate method
is not yet implemented"));
+    }
+
+    synchronized public void process(WatchedEvent event) {
 		synchronized (mutex) {
 			switch (event.getState()) {
 			case SyncConnected:

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index ff81ac7..6682fa1 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -63,4 +63,13 @@ public interface JobSubmitter {
      * @throws OrchestratorException
      */
     boolean submit(String experimentID,String taskID,String tokenId) throws OrchestratorException;
+
+    /**
+     * This can be used to terminate the experiment
+     * @param experimentID
+     * @param taskID
+     * @return
+     * @throws OrchestratorException
+     */
+    boolean terminate(String experimentID,String taskID)throws OrchestratorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index ddda0f6..89393a8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -74,10 +74,13 @@ public interface Orchestrator {
      * experimentID as the handler to the experiment, during the launchExperiment
      * We just have to give the experimentID
      *
-     * @param experimentID
+     * @param experiment
+     * @param workflowNode
+     * @param task
+     * @param tokenId
      * @throws OrchestratorException
      */
-    void cancelExperiment(String experimentID) throws OrchestratorException;
+    void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails
task,String tokenId) throws OrchestratorException;
     //todo have to add another method to handle failed or jobs to be recovered by orchestrator
     //todo if you don't add these this is not an orchestrator, its just an intemediate component
which invoke gfac
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index bf7af51..83dccf0 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -163,8 +163,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         }
     }
 
-    public void cancelExperiment(String experimentID)
+    public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode,
TaskDetails task,String tokenId)
 			throws OrchestratorException {
+
         throw new OrchestratorException(new OperationNotSupportedException());
     }
 


Mime
View raw message