airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [41/51] [abbrv] airavata git commit: Saved correct task status in correct place in order to fix recovery issues
Date Tue, 03 Nov 2015 19:48:06 GMT
Saved correct task status in correct place in order to fix recovery issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/72fb57bc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/72fb57bc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/72fb57bc

Branch: refs/heads/master
Commit: 72fb57bc280690885974547ec0c9e1fd4102601c
Parents: 138c994
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Fri Oct 23 11:04:00 2015 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Fri Oct 23 11:04:00 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/monitor/JobMonitor.java  |   5 +
 .../airavata/gfac/impl/GFacEngineImpl.java      | 115 ++++++++++++++-----
 .../apache/airavata/gfac/impl/GFacWorker.java   |   1 +
 .../impl/task/AdvancedSCPDataStageTask.java     |   2 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   |   5 +
 5 files changed, 101 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index a909791..a3f62cf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -35,4 +35,9 @@ public interface JobMonitor {
 	 * Stop monitoring for given jobId
 	 */
 	void stopMonitor(String jobId, boolean runOutFlow);
+
+    /**
+     * Return <code>true</code> if jobId is already monitoring by this Monitor,
<code>false</code> if not
+     */
+    boolean isMonitoring(String jobId);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index e213a9b..76e5ae2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -259,18 +259,11 @@ public class GFacEngineImpl implements GFacEngine {
                     break;
 
                 case MONITORING:
-                    JobMonitor monitorService = null;
-                    try {
-                        MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
-                        status = new ProcessStatus(ProcessState.MONITORING);
-                        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                        processContext.setProcessStatus(status);
-                        GFacUtils.saveAndPublishProcessStatus(processContext);
-                        monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
-                        monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-                    } catch (AiravataException | TException e) {
-                        throw new GFacException(e);
-                    }
+                    status = new ProcessStatus(ProcessState.MONITORING);
+                    status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    processContext.setProcessStatus(status);
+                    GFacUtils.saveAndPublishProcessStatus(processContext);
+                    executeJobMonitoring(taskContext, processContext.isRecovery());
                     break;
 
                 case ENV_CLEANUP:
@@ -290,22 +283,70 @@ public class GFacEngineImpl implements GFacEngine {
         processContext.setComplete(true);
     }
 
-    private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws
GFacException {
+    private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException
{
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        ProcessStatus status;
         TaskStatus taskStatus;
+        JobMonitor monitorService = null;
+        try {
+            taskStatus = new TaskStatus(TaskState.EXECUTING);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+            MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
+            monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
+            if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
+                monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+            }
+        } catch (AiravataException | TException e) {
+            taskStatus = new TaskStatus(TaskState.FAILED);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskStatus.setReason("Couldn't handover jobId {} to monitor service, monitor
service type {}");
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Input staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setUserFriendlyMessage("Error while staging output data");
+            errorModel.setActualErrorMessage(errorMsg);
+            GFacUtils.saveTaskError(taskContext, errorModel);
+            throw new GFacException(e);
+        }
+        taskStatus = new TaskStatus(TaskState.COMPLETED);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskStatus.setReason("Successfully handed over job id to job monitor service.");
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+    }
+
+    private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws
GFacException {
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
         try {
             JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
             JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol());
 
             ProcessContext processContext = taskContext.getParentProcessContext();
             taskStatus = executeTask(taskContext, jobSubmissionTask, recovery);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
             if (taskStatus.getState() == TaskState.FAILED) {
                 log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission
task failed, " +
                         "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(),
taskContext
                         .getParentProcessContext().getProcessId(), taskContext.getTaskId(),
jobSubmissionTask.getType
                         ().name(), taskStatus.getReason());
-                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission
task failed, " +
-                        "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId()
+
-                        taskContext.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
+                String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                        .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                        .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Job submission task failed. Reason: ")
+                        .append(taskStatus.getReason()).toString();
                 ErrorModel errorModel = new ErrorModel();
                 errorModel.setUserFriendlyMessage("Job submission task failed");
                 errorModel.setActualErrorMessage(errorMsg);
@@ -343,8 +384,11 @@ public class GFacEngineImpl implements GFacEngine {
                         "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(),
taskContext
                         .getParentProcessContext().getProcessId(), taskContext.getTaskId(),
envSetupTask.getType
                         ().name(), taskStatus.getReason());
-                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input
staging failed, " +
-                        "reason:" + " {}" + taskContext.getExperimentId() + taskContext.getProcessId()
+ taskContext.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+                ProcessContext processContext = taskContext.getParentProcessContext();
+                String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                        .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                        .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Environment Setup failed. Reason: ")
+                        .append(taskStatus.getReason()).toString();
                 ErrorModel errorModel = new ErrorModel();
                 errorModel.setUserFriendlyMessage("Error while environment setup");
                 errorModel.setActualErrorMessage(errorMsg);
@@ -358,18 +402,27 @@ public class GFacEngineImpl implements GFacEngine {
     }
 
     private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException
{
-        TaskStatus taskStatus;// execute process inputs
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         ProcessContext processContext = taskContext.getParentProcessContext();
         Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
         taskStatus = executeTask(taskContext, dMoveTask, false);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed,
" +
                     "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(),
taskContext
                     .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
-            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging
failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId()
+
-                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Input staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
             errorModel.setUserFriendlyMessage("Error while staging input data");
             errorModel.setActualErrorMessage(errorMsg);
@@ -430,18 +483,28 @@ public class GFacEngineImpl implements GFacEngine {
      * @throws GFacException
      */
     private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException
{
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         ProcessContext processContext = taskContext.getParentProcessContext();
         Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-        TaskStatus taskStatus = executeTask(taskContext, dMoveTask, recovery);
+        taskStatus = executeTask(taskContext, dMoveTask, recovery);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed,
" +
                     "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(),
taskContext
                     .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
 
-            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging
failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId()
+
-                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Output staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
             errorModel.setUserFriendlyMessage("Error while staging output data");
             errorModel.setActualErrorMessage(errorMsg);

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 52cd395..c71c8e7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -196,6 +196,7 @@ public class GFacWorker implements Runnable {
 
         String taskDag = processContext.getProcessModel().getTaskDag();
         List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+        processContext.setTaskExecutionOrder(taskExecutionOrder);
         Map<String, TaskModel> taskMap = processContext.getTaskMap();
         String recoverTaskId = null;
         for (String taskId : taskExecutionOrder) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index e200546..029da77 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -91,7 +91,7 @@ public class AdvancedSCPDataStageTask implements Task {
 
     @Override
     public TaskStatus execute(TaskContext taskContext) {
-        TaskStatus status = new TaskStatus(TaskState.CREATED);
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
         AuthenticationInfo authenticationInfo = null;
         DataStagingTaskModel subTaskModel = null;
         String localDataDir = null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index c4d4676..b3a1398 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -134,6 +134,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 		}
 	}
 
+    @Override
+    public boolean isMonitoring(String jobId) {
+        return jobMonitorMap.containsKey(jobId);
+    }
+
     private JobStatusResult parse(Message message) throws MessagingException, AiravataException
{
         Address fromAddress = message.getFrom()[0];
         String addressStr = fromAddress.toString();


Mime
View raw message