airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: check monitor mode before create monitoring task
Date Fri, 06 Nov 2015 22:26:04 GMT
Repository: airavata
Updated Branches:
  refs/heads/master b4e3c33fd -> 85fb6b694


check monitor mode before create monitoring task


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/687d8126
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/687d8126
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/687d8126

Branch: refs/heads/master
Commit: 687d812617a0fb236a7c44a2a894d71c641e51d6
Parents: 5eb3c26
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Fri Nov 6 17:25:22 2015 -0500
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Fri Nov 6 17:25:22 2015 -0500

----------------------------------------------------------------------
 .../cpi/impl/SimpleOrchestratorImpl.java        | 61 ++++++++++++--------
 .../server/OrchestratorServerHandler.java       |  4 +-
 2 files changed, 39 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/687d8126/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 1e2ad58..ee4d2c6 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -20,14 +20,11 @@
 */
 package org.apache.airavata.orchestrator.cpi.impl;
 
-import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -288,6 +285,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel,
experimentCatalog);
             taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
 
+            JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext,
processModel, gatewayId);
             if (autoSchedule) {
                 List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
                 for (BatchQueue batchQueue : definedBatchQueues) {
@@ -298,19 +296,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                             // need to create more job submissions
                             int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime
/ maxRunTime));
                             for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
-                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,
processModel, maxRunTime));
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
processModel, maxRunTime));
                             }
                             int leftWallTime = userGivenWallTime % maxRunTime;
                             if (leftWallTime != 0) {
-                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,
processModel, leftWallTime));
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
processModel, leftWallTime));
                             }
                         } else {
-                            taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel,
userGivenWallTime));
+                            taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
processModel, userGivenWallTime));
                         }
                     }
                 }
             } else {
-                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, userGivenWallTime));
+                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
processModel, userGivenWallTime));
             }
             taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
             // update process scheduling
@@ -442,8 +440,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return dataStagingTaskIds;
     }
 
-    private List<String> createAndSaveSubmissionTasks(String gatewayId, ProcessModel
processModel, int wallTime)
-            throws TException, RegistryException {
+    private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface
jobSubmissionInterface, ProcessModel processModel, int wallTime)
+            throws TException, RegistryException, OrchestratorException {
+
+        JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+        MonitorMode monitorMode = null;
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol ==
JobSubmissionProtocol.SSH_FORK) {
+            SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext,
jobSubmissionInterface.getJobSubmissionInterfaceId());
+            monitorMode = sshJobSubmission.getMonitorMode();
+        }else {
+            logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol
{}.",
+                    processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
+            throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
+        }
         List<String> submissionTaskIds = new ArrayList<>();
         TaskModel taskModel = new TaskModel();
         taskModel.setParentProcessId(processModel.getProcessId());
@@ -454,7 +463,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskStatus(taskStatus);
         taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
         JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
-        submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+        submissionSubTask.setMonitorMode(monitorMode);
         submissionSubTask.setJobSubmissionProtocol(
                 OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext,
processModel, gatewayId));
         submissionSubTask.setWallTime(wallTime);
@@ -465,21 +474,23 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskId(taskId);
         submissionTaskIds.add(taskModel.getTaskId());
 
-        // create monitor task for this job
-        TaskModel monitorTaskModel = new TaskModel();
-        monitorTaskModel.setParentProcessId(processModel.getProcessId());
-        monitorTaskModel.setCreationTime(new Date().getTime());
-        monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
-        TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
-        monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        monitorTaskModel.setTaskStatus(monitorTaskStatus);
-        monitorTaskModel.setTaskType(TaskTypes.MONITORING);
-        MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
-        monitorSubTaskModel.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
-        monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
-        String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
monitorTaskModel, processModel.getProcessId());
-        monitorTaskModel.setTaskId(mTaskId);
-        submissionTaskIds.add(monitorTaskModel.getTaskId());
+        // create monitor task for this Email based monitor mode job
+        if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+            TaskModel monitorTaskModel = new TaskModel();
+            monitorTaskModel.setParentProcessId(processModel.getProcessId());
+            monitorTaskModel.setCreationTime(new Date().getTime());
+            monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
+            TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+            monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            monitorTaskModel.setTaskStatus(monitorTaskStatus);
+            monitorTaskModel.setTaskType(TaskTypes.MONITORING);
+            MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
+            monitorSubTaskModel.setMonitorMode(monitorMode);
+            monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
+            String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
monitorTaskModel, processModel.getProcessId());
+            monitorTaskModel.setTaskId(mTaskId);
+            submissionTaskIds.add(monitorTaskModel.getTaskId());
+        }
 
         return submissionTaskIds;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/687d8126/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index f75c91e..c1e9d65 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -154,7 +154,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
                 log.error("Validating process fails for given experiment Id : {}", experimentId);
                 return false;
             }
-            ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().getComputeResourcePreference(gatewayId,
experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
+            ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().
+					getComputeResourcePreference(gatewayId,
+							experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
             String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
             if (token == null || token.isEmpty()){
                 // try with gateway profile level token


Mime
View raw message