airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Revert "Moved job model to taskContext from Processcontext"
Date Wed, 04 Nov 2015 20:37:11 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 98f077ed5 -> 5eb3c26ad


Revert "Moved job model to taskContext from Processcontext"

This reverts commit 98f077ed5a9f007b834b6efd98b84f5aea750d4d.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5eb3c26a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5eb3c26a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5eb3c26a

Branch: refs/heads/master
Commit: 5eb3c26ad7f30fec0c899e507099379dbfe2be85
Parents: 98f077e
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Wed Nov 4 15:35:56 2015 -0500
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Wed Nov 4 15:35:56 2015 -0500

----------------------------------------------------------------------
 .../gfac/core/context/ProcessContext.java       | 15 ++++++++++++
 .../airavata/gfac/core/context/TaskContext.java | 24 +-------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 24 +++++++++++++-------
 .../impl/task/SSHForkJobSubmissionTask.java     | 12 +++++-----
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 10 ++++----
 .../gfac/monitor/email/EmailBasedMonitor.java   |  2 +-
 6 files changed, 45 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 3a1a8d1..1a3a236 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -75,6 +75,7 @@ public class ProcessContext {
 	private String stderrLocation;
 	private JobSubmissionProtocol jobSubmissionProtocol;
 	private DataMovementProtocol dataMovementProtocol;
+	private JobModel jobModel;
 	private ComputeResourcePreference computeResourcePreference;
 	private MonitorMode monitorMode;
 	private ResourceJobManager resourceJobManager;
@@ -316,6 +317,20 @@ public class ProcessContext {
         return taskMap;
     }
 
+	public JobModel getJobModel() {
+		if (jobModel == null) {
+			jobModel = new JobModel();
+			jobModel.setProcessId(processId);
+			jobModel.setWorkingDir(getWorkingDir());
+			jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+		}
+		return jobModel;
+	}
+
+	public void setJobModel(JobModel jobModel) {
+		this.jobModel = jobModel;
+	}
+
 	public ComputeResourcePreference getComputeResourcePreference() {
 		return computeResourcePreference;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 156ada0..ae92ba1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,12 +20,9 @@
  */
 package org.apache.airavata.gfac.core.context;
 
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
@@ -42,9 +39,8 @@ public class TaskContext {
     private InputDataObjectType processInput;
     private OutputDataObjectType processOutput;
     private Object subTaskModel = null;
-    private JobModel jobModel;
 
-    public TaskModel getTaskModel() {
+	public TaskModel getTaskModel() {
 		return taskModel;
 	}
 
@@ -121,22 +117,4 @@ public class TaskContext {
         }
         return subTaskModel;
     }
-
-    public RemoteCluster getRemoteCluster() {
-        return getParentProcessContext().getRemoteCluster();
-    }
-
-    public JobModel getJobModel() {
-        if (jobModel == null) {
-            jobModel = new JobModel();
-            jobModel.setProcessId(getProcessId());
-            jobModel.setWorkingDir(getWorkingDir());
-            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-        }
-        return jobModel;
-    }
-
-    public void setJobModel(JobModel jobModel) {
-        this.jobModel = jobModel;
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index d160a5a..4b67ffd 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -136,6 +136,13 @@ public class GFacEngineImpl implements GFacEngine {
                         processContext.getProcessId());
             }
 
+            List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB,
"processId", processId);
+            if (jobModels != null && !jobModels.isEmpty()) {
+                if (jobModels.size() > 1) {
+                    log.warn("Process has more than one job model, take first one");
+                }
+                processContext.setJobModel(((JobModel) jobModels.get(0)));
+            }
             return processContext;
         } catch (AppCatalogException e) {
             throw new GFacException("App catalog access exception ", e);
@@ -288,10 +295,10 @@ public class GFacEngineImpl implements GFacEngine {
 
             MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
             monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
-            if (!monitorService.isMonitoring(taskContext.getJobModel().getJobId())) {
-                monitorService.monitor(taskContext.getJobModel().getJobId(), taskContext);
+            if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
+                monitorService.monitor(processContext.getJobModel().getJobId(), taskContext);
             } else {
-                log.warn("Jobid: {}, already in monitoring map", taskContext.getJobModel().getJobId());
+                log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId());
             }
         } catch (AiravataException | TException e) {
             taskStatus = new TaskStatus(TaskState.FAILED);
@@ -300,8 +307,8 @@ public class GFacEngineImpl implements GFacEngine {
             taskContext.setTaskStatus(taskStatus);
             GFacUtils.saveAndPublishTaskStatus(taskContext);
 
-            String errorMsg = new StringBuilder("expId: ").append(taskContext.getExperimentId()).append(",
processId: ")
-                    .append(taskContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(",
processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
                     .append(", type: ").append(taskContext.getTaskType().name()).append("
:- Input staging failed. Reason: ")
                     .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
@@ -550,12 +557,13 @@ public class GFacEngineImpl implements GFacEngine {
 
             if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED)
{
                 JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
-                monitorService.stopMonitor(taskContext.getJobModel().getJobId(), true);
+                monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(),
true);
                 JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
                 newJobStatus.setReason("Job cancelled");
                 newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                taskContext.getJobModel().setJobStatus(newJobStatus);
-                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getJobModel());
+                taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
+                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
+                        .getJobModel());
             }
         } catch (TaskException e) {
             throw new GFacException("Error while cancelling job");

http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index cde3032..3a14b2e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -60,9 +60,9 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         try {
             ProcessContext processContext = taskContext.getParentProcessContext();
-            JobModel jobModel = taskContext.getJobModel();
+            JobModel jobModel = processContext.getJobModel();
             jobModel.setTaskId(taskContext.getTaskId());
-            RemoteCluster remoteCluster = taskContext.getRemoteCluster();
+            RemoteCluster remoteCluster = processContext.getRemoteCluster();
             JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext);
             jobModel.setJobName(jobDescriptor.getJobName());
             ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
@@ -75,7 +75,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
 	        if (jobFile != null && jobFile.exists()) {
                 jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
 	            JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
-			            taskContext.getWorkingDir());
+			            processContext.getWorkingDir());
 	            jobModel.setExitCode(jobSubmissionOutput.getExitCode());
 	            jobModel.setStdErr(jobSubmissionOutput.getStdErr());
 	            jobModel.setStdOut(jobSubmissionOutput.getStdOut());
@@ -84,8 +84,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
                     jobModel.setJobId(jobId);
                     GFacUtils.saveJobModel(processContext, jobModel);
                     jobStatus.setJobState(JobState.SUBMITTED);
-                    jobStatus.setReason("Successfully Submitted to "
-                            + processContext.getComputeResourceDescription().getHostName());
+                    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+                            .getComputeResourceDescription().getHostName());
                     jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatus(jobStatus);
                     GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
@@ -93,7 +93,7 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
                     taskStatus.setReason("Submitted job to compute resource");
                 }
                 if (jobId == null || jobId.isEmpty()) {
-                    String msg = "expId:" + taskContext.getExperimentId() + " Couldn't find
" +
+                    String msg = "expId:" + processContext.getProcessModel().getExperimentId()
+ " Couldn't find " +
                             "remote jobId for JobName:" + jobModel.getJobName() + ", both
submit and verify steps " +
                             "doesn't return a valid JobId. " + "Hence changing experiment
state to Failed";
                     log.error(msg);

http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index ee02312..4cc041c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -60,7 +60,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
 	    try {
 		    ProcessContext processContext = taskContext.getParentProcessContext();
-		    JobModel jobModel = taskContext.getJobModel();
+		    JobModel jobModel = processContext.getJobModel();
 		    jobModel.setTaskId(taskContext.getTaskId());
 		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
 		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext);
@@ -239,7 +239,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
     @Override
     public TaskStatus recover(TaskContext taskContext) {
-            JobModel jobModel = taskContext.getJobModel();
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
             // original job failed before submitting
             if (jobModel == null || jobModel.getJobId() == null ){
                 return execute(taskContext);
@@ -256,8 +257,9 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
 	@Override
 	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
-		RemoteCluster remoteCluster = taskcontext.getRemoteCluster();
-		JobModel jobModel = taskcontext.getJobModel();
+		ProcessContext processContext = taskcontext.getParentProcessContext();
+		RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		JobModel jobModel = processContext.getJobModel();
 		int retryCount = 0;
 		if (jobModel != null) {
 			try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/5eb3c26a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index ed4c9ac..f983d63 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -302,7 +302,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	    // TODO : update job state on process context
         boolean runOutflowTasks = false;
 	    JobStatus jobStatus = new JobStatus();
-	    JobModel jobModel = taskContext.getJobModel();
+	    JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " +
jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {


Mime
View raw message