airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [3/3] airavata git commit: Saved taskDag in process model and added monitor task after job submissiontask
Date Mon, 19 Oct 2015 15:37:03 GMT
Saved taskDag in process model and added monitor task after job submissiontask


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e7cbd9f8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e7cbd9f8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e7cbd9f8

Branch: refs/heads/orchestratorTaskBreakdown
Commit: e7cbd9f81c5c5a0eef6b3c2e6b784d7e3a13f07c
Parents: f532a9d
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Mon Oct 19 11:36:52 2015 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Mon Oct 19 11:36:52 2015 -0400

----------------------------------------------------------------------
 .../cpi/impl/SimpleOrchestratorImpl.java        | 105 +++++++++++++------
 .../server/OrchestratorServerHandler.java       |   4 +-
 2 files changed, 76 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e7cbd9f8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b34f64d..ae08cf5 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -22,19 +22,10 @@ package org.apache.airavata.orchestrator.cpi.impl;
 
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.error.ValidationResults;
@@ -42,8 +33,6 @@ import org.apache.airavata.model.error.ValidatorResult;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.experiment.*;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.*;
@@ -60,15 +49,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 
 public class SimpleOrchestratorImpl extends AbstractOrchestrator{
     private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
     private ExecutorService executor;
-    
+
     // this is going to be null unless the thread count is 0
     private JobSubmitter jobSubmitter = null;
 
@@ -277,7 +264,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return processModels;
     }
 
-    public void createAndSaveTasks (String gatewayId, ExperimentModel experimentModel, ProcessModel
processModel) throws OrchestratorException {
+    public String createAndSaveTasks(String gatewayId, ExperimentModel experimentModel, ProcessModel
processModel) throws OrchestratorException {
         try {
             ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog();
             AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog();
@@ -291,31 +278,56 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             }
             ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
 
-            createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog);
-            createAndSaveDataStagingTasks(processModel);
-            if (autoSchedule){
+            List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel,
experimentCatalog);
+            taskIdList.addAll(createAndSaveDataStagingTasks(processModel));
+
+            if (autoSchedule) {
                 List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
-                for (BatchQueue batchQueue : definedBatchQueues){
-                    if (batchQueue.getQueueName().equals(userGivenQueueName)){
+                for (BatchQueue batchQueue : definedBatchQueues) {
+                    if (batchQueue.getQueueName().equals(userGivenQueueName)) {
                         int maxRunTime = batchQueue.getMaxRunTime();
-                        if (maxRunTime < userGivenWallTime){
+                        if (maxRunTime < userGivenWallTime) {
                             // need to create more job submissions
-                            int i = (int)maxRunTime / userGivenWallTime;
-                            for (int k=0; k < i; i++){
-                                createAndSaveJobSubmissionTask(gatewayId, processModel);
+                            int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime
/ maxRunTime));
+                            for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,
processModel, maxRunTime));
+                            }
+                            int leftWallTime = userGivenWallTime % maxRunTime;
+                            if (leftWallTime != 0) {
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,
processModel, leftWallTime));
                             }
-                        }else {
-                            createAndSaveJobSubmissionTask(gatewayId, processModel);
+                        } else {
+                            taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel,
userGivenWallTime));
                         }
                     }
                 }
+            } else {
+                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, userGivenWallTime));
             }
+
+            return getTaskDag(taskIdList);
         } catch (Exception e) {
             throw new OrchestratorException("Error during creating process");
         }
     }
 
-    private void createAndSaveEnvSetupTask(String gatewayId, ProcessModel processModel, ExperimentCatalog
experimentCatalog) throws RegistryException, TException {
+    private String getTaskDag(List<String> taskIdList) {
+        if (taskIdList.isEmpty()) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (String s : taskIdList) {
+            sb.append(s).append(","); // comma separated values
+        }
+        String dag = sb.toString();
+        return dag.substring(0, dag.length() - 1); // remove last comma
+    }
+
+    private List<String> createAndSaveEnvSetupTask(String gatewayId,
+                                                   ProcessModel processModel,
+                                                   ExperimentCatalog experimentCatalog)
+            throws RegistryException, TException {
+        List<String> envTaskIds = new ArrayList<>();
         TaskModel envSetupTask = new TaskModel();
         envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
         envSetupTask.setTaskStatus(new TaskStatus(TaskState.CREATED));
@@ -329,11 +341,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         envSetupSubModel.setLocation(workingDir);
         byte[] envSetupSub = ThriftUtils.serializeThriftObject(envSetupSubModel);
         envSetupTask.setSubTaskModel(envSetupSub);
-        String envSetupTaskId = (String)experimentCatalog.add(ExpCatChildDataType.TASK, envSetupTask,
processModel.getProcessId());
+        String envSetupTaskId = (String) experimentCatalog.add(ExpCatChildDataType.TASK,
envSetupTask, processModel.getProcessId());
         envSetupTask.setTaskId(envSetupTaskId);
+        envTaskIds.add(envSetupTaskId);
+        return envTaskIds;
     }
 
-    public void createAndSaveDataStagingTasks (ProcessModel processModel) throws RegistryException{
+    public List<String> createAndSaveDataStagingTasks (ProcessModel processModel) throws
RegistryException {
+        List<String> dataStagingTaskIds = new ArrayList<>();
         List<InputDataObjectType> processInputs = processModel.getProcessInputs();
         sortByInputOrder(processInputs);
         if (processInputs != null) {
@@ -347,8 +362,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                     case URI:
                         try {
                             TaskModel inputDataStagingTask = getInputDataStagingTask(processModel,
processInput);
-                            orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
inputDataStagingTask,
+                            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
inputDataStagingTask,
                                     processModel.getProcessId());
+                            inputDataStagingTask.setTaskId(taskId);
+                            dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
                         } catch (TException e) {
                             throw new RegistryException("Error while serializing data staging
sub task model");
                         }
@@ -359,9 +376,12 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                 }
             }
         }
+        return dataStagingTaskIds;
     }
 
-    private void createAndSaveJobSubmissionTask(String gatewayId, ProcessModel processModel)
throws TException, RegistryException {
+    private List<String> createAndSaveSubmissionTasks(String gatewayId, ProcessModel
processModel, int wallTime)
+            throws TException, RegistryException {
+        List<String> submissionTaskIds = new ArrayList<>();
         TaskModel taskModel = new TaskModel();
         taskModel.setParentProcessId(processModel.getProcessId());
         taskModel.setCreationTime(new Date().getTime());
@@ -374,10 +394,31 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
         submissionSubTask.setJobSubmissionProtocol(
                 OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext,
processModel, gatewayId));
+        submissionSubTask.setWallTime(wallTime);
         byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
         taskModel.setSubTaskModel(bytes);
-        orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
taskModel,
+        String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
taskModel,
                 processModel.getProcessId());
+        taskModel.setTaskId(taskId);
+        submissionTaskIds.add(taskModel.getTaskId());
+
+        // create monitor task for this job
+        TaskModel monitorTaskModel = new TaskModel();
+        monitorTaskModel.setParentProcessId(processModel.getProcessId());
+        monitorTaskModel.setCreationTime(new Date().getTime());
+        monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
+        TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+        monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        monitorTaskModel.setTaskStatus(monitorTaskStatus);
+        taskModel.setTaskType(TaskTypes.MONITORING);
+        MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
+        monitorSubTaskModel.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+        monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
+        String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
monitorTaskModel, processModel.getProcessId());
+        monitorTaskModel.setTaskId(mTaskId);
+        submissionTaskIds.add(monitorTaskModel.getTaskId());
+
+        return submissionTaskIds;
     }
 
     private void sortByInputOrder(List<InputDataObjectType> processInputs) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e7cbd9f8/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 4eb4736..f75c91e 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -141,7 +141,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
             List<ProcessModel> processes = orchestrator.createProcesses(experimentId,
gatewayId);
             experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT,
experimentId);
             for (ProcessModel processModel : processes){
-                orchestrator.createAndSaveTasks(gatewayId, experiment, processModel);
+                String taskDag = orchestrator.createAndSaveTasks(gatewayId, experiment, processModel);
+                processModel.setTaskDag(taskDag);
+                experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel,
processModel.getProcessId());
             }
             if (experiment == null) {
                 log.error(experimentId, "Error retrieving the Experiment by the given experimentID:
{} ", experimentId);


Mime
View raw message