airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/3] airavata git commit: user correct version class
Date Sat, 15 Aug 2015 17:59:26 GMT
Repository: airavata
Updated Branches:
  refs/heads/master a68eb1823 -> 4f6e8c5e6


user correct version class


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0fbb6a84
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0fbb6a84
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0fbb6a84

Branch: refs/heads/master
Commit: 0fbb6a84d8818a5259759f8ba8a6d3048ac3ca0a
Parents: a68eb18
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Tue Aug 4 16:40:11 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Tue Aug 4 16:40:11 2015 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |  3 +-
 .../airavata/api/airavataAPIConstants.java      | 70 -----------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 21 +++--
 .../gfac/core/context/ProcessContext.java       |  7 ++
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 81 ++++++++++----------
 .../gfac/monitor/email/EmailBasedMonitor.java   | 29 +++++--
 6 files changed, 80 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index ee1a5a8..6d09245 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.api.server.handler;
 
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.api.airavata_apiConstants;
 import org.apache.airavata.api.server.security.SecurityCheck;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -95,7 +96,7 @@ public class AiravataServerHandler implements Airavata.Iface {
     public String getAPIVersion(AuthzToken authzToken) throws InvalidRequestException, AiravataClientException,
             AiravataSystemException, AuthorizationException, TException {
 
-        return airavataAPIConstants.AIRAVATA_API_VERSION;
+        return airavata_apiConstants.AIRAVATA_API_VERSION;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/airavata-api/airavata-api-stubs/src/main/java/org/apache/airavata/api/airavataAPIConstants.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-stubs/src/main/java/org/apache/airavata/api/airavataAPIConstants.java
b/airavata-api/airavata-api-stubs/src/main/java/org/apache/airavata/api/airavataAPIConstants.java
deleted file mode 100644
index 0c82e0f..0000000
--- a/airavata-api/airavata-api-stubs/src/main/java/org/apache/airavata/api/airavataAPIConstants.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.airavata.api;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class airavataAPIConstants {
-
-  /**
-   * Airavata Interface Versions depend upon this Thrift Interface File. When Making changes,
please edit the
-   *  Version Constants according to Semantic Versioning Specification (SemVer) http://semver.org.
-   * 
-   * Note: The Airavata API version may be different from the Airavata software release versions.
-   * 
-   * The Airavata API version is composed as a dot delimited string with major, minor, and
patch level components.
-   * 
-   *  - Major: Incremented for backward incompatible changes. An example would be changes
to interfaces.
-   *  - Minor: Incremented for backward compatible changes. An example would be the addition
of a new optional methods.
-   *  - Patch: Incremented for bug fixes. The patch level should be increased for every edit
that doesn't result
-   *              in a change to major/minor version numbers.
-   * 
-   */
-  public static final String AIRAVATA_API_VERSION = "0.15.0";
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index af10218..45af599 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -234,24 +234,21 @@ public class GFacUtils {
         return buf.toString();
     }
 
-	public static void saveJobStatus(TaskContext taskContext,
-                                     JobModel jobModel, JobState state) throws GFacException
{
+	public static void saveJobStatus(ProcessContext processContext, JobModel jobModel) throws
GFacException {
 		try {
             // first we save job jobModel to the registry for sa and then save the job status.
-            ProcessContext processContext = taskContext.getParentProcessContext();
+			JobStatus jobStatus = jobModel.getJobStatus();
             ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-            JobStatus status = new JobStatus();
-            status.setJobState(state);
-            jobModel.setJobStatus(status);
-            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            CompositeIdentifier ids = new CompositeIdentifier(taskContext.getTaskId(), jobModel.getJobId());
-			experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, status, ids);
-            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), taskContext.getTaskModel().getTaskId(),
+            jobModel.setJobStatus(jobStatus);
+            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
+			experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
+            JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
                     processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
                     processContext.getGatewayId());
-            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(state, identifier);
+            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(),
identifier);
 			MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
-					(MessageType.JOB.name()), taskContext.getParentProcessContext().getGatewayId());
+					(MessageType.JOB.name()), processContext.getGatewayId());
 			msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
 			processContext.getStatusPublisher().publish(msgCtx);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 9e1ac06..4004787 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.messaging.core.Publisher;
@@ -35,6 +36,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -272,6 +274,11 @@ public class ProcessContext {
 	}
 
 	public JobModel getJobModel() {
+		if (jobModel == null) {
+			jobModel = new JobModel();
+			jobModel.setWorkingDir(getWorkingDir());
+			jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+		}
 		return jobModel;
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index c282f17..b47e5d2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -56,16 +56,11 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
     @Override
     public TaskStatus execute(TaskContext taskContext){
-	    TaskStatus status = new TaskStatus(TaskState.COMPLETED); // set to completed.
+	    TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
 	    try {
 		    ProcessContext processContext = taskContext.getParentProcessContext();
 		    JobModel jobModel = processContext.getJobModel();
-		    if (jobModel == null) {
-			    jobModel = new JobModel();
-			    jobModel.setWorkingDir(processContext.getWorkingDir());
-			    jobModel.setTaskId(taskContext.getTaskId());
-			    jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-		    }
+		    jobModel.setTaskId(taskContext.getTaskId());
 		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
 		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
 		    jobModel.setJobName(jobDescriptor.getJobName());
@@ -74,25 +69,27 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 		    if (resourceJobManager != null) {
 			    jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
 		    }
+		    JobStatus jobStatus = new JobStatus();
 		    File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
 		    if (jobFile != null && jobFile.exists()) {
 			    jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
 			    String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
 			    if (jobId != null && !jobId.isEmpty()) {
 				    jobModel.setJobId(jobId);
-				    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED);
-//                    publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-//                            , GfacExperimentState.JOBSUBMITTED));
-				    processContext.setJobModel(jobModel);
+				    jobStatus.setJobState(JobState.SUBMITTED);
+				    jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext()
+						    .getComputeResourceDescription().getHostName());
+				    jobModel.setJobStatus(jobStatus);
+				    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 				    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
-//                        publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-//                                , GfacExperimentState.JOBSUBMITTED));
-					    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+					    jobStatus.setJobState(JobState.QUEUED);
+					    jobStatus.setReason("Verification step succeeded");
+					    jobModel.setJobStatus(jobStatus);
+					    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 				    }
-				    status = new TaskStatus(TaskState.COMPLETED);
-				    status.setReason("Submitted job to compute resource");
+				    taskStatus = new TaskStatus(TaskState.COMPLETED);
+				    taskStatus.setReason("Submitted job to compute resource");
 			    } else {
-				    processContext.setJobModel(jobModel);
 				    int verificationTryCount = 0;
 				    while (verificationTryCount++ < 3) {
 					    String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
@@ -100,13 +97,15 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 						    // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
 						    jobId = verifyJobId;
 						    jobModel.setJobId(jobId);
-//                            publisher.publish(new GfacExperimentStateChangeRequest(new
MonitorID(jobExecutionContext)
-//                                    , GfacExperimentState.JOBSUBMITTED));
-						    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
-						    status.setState(TaskState.COMPLETED);
-						    status.setReason("Submitted job to compute resource");
+						    jobStatus.setJobState(JobState.QUEUED);
+						    jobStatus.setReason("Verification step succeeded");
+						    jobModel.setJobStatus(jobStatus);
+						    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+						    taskStatus.setState(TaskState.COMPLETED);
+						    taskStatus.setReason("Submitted job to compute resource");
 						    break;
 					    }
+					    log.info("Verify step return invalid jobId, retry verification step in {} secs",
verificationTryCount);
 					    Thread.sleep(verificationTryCount * 1000);
 				    }
 			    }
@@ -117,23 +116,23 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 						    "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
 				    log.error(msg);
 				    GFacUtils.saveErrorDetails(processContext, msg);
-				    status.setState(TaskState.FAILED);
-				    status.setReason("Couldn't find job id in both submitted and verified steps");
+				    taskStatus.setState(TaskState.FAILED);
+				    taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
 			    }
 		    } else {
-			    status.setState(TaskState.FAILED);
+			    taskStatus.setState(TaskState.FAILED);
 			    if (jobFile == null) {
-				    status.setReason("JobFile is null");
+				    taskStatus.setReason("JobFile is null");
 			    } else {
-				    status.setReason("Job file doesn't exist");
+				    taskStatus.setReason("Job file doesn't exist");
 			    }
 		    }
 
 	    } catch (AppCatalogException e) {
 		    String msg = "Error while instatiating app catalog";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
@@ -141,8 +140,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    } catch (ApplicationSettingsException e) {
 		    String msg = "Error occurred while creating job descriptor";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
@@ -150,8 +149,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    } catch (GFacException e) {
 		    String msg = "Error occurred while creating job descriptor";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
@@ -159,8 +158,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    } catch (SSHApiException e) {
 		    String msg = "Error occurred while submitting the job";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
@@ -168,8 +167,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    } catch (IOException e) {
 		    String msg = "Error while reading the content of the job file";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
@@ -177,21 +176,21 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	    } catch (InterruptedException e) {
 		    String msg = "Error occurred while verifying the job submission";
 		    log.error(msg, e);
-		    status.setState(TaskState.FAILED);
-		    status.setReason(msg);
+		    taskStatus.setState(TaskState.FAILED);
+		    taskStatus.setReason(msg);
 		    ErrorModel errorModel = new ErrorModel();
 		    errorModel.setActualErrorMessage(e.getMessage());
 		    errorModel.setUserFriendlyMessage(msg);
 		    taskContext.getTaskModel().setTaskError(errorModel);
 	    }
 
-	    taskContext.setTaskStatus(status);
+	    taskContext.setTaskStatus(taskStatus);
 	    try {
 		    GFacUtils.saveAndPublishTaskStatus(taskContext);
 	    } catch (GFacException e) {
 		    log.error("Error while saving task status", e);
 	    }
-	    return status;
+	    return taskStatus;
     }
 
     private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID)
throws SSHApiException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/0fbb6a84/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 9ecd700..efa0641 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -24,6 +24,7 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.config.ResourceConfig;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
@@ -35,7 +36,9 @@ import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -258,6 +261,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         JobState resultState = jobStatusResult.getState();
 	    // TODO : update job state on process context
         boolean runOutflowTasks = false;
+	    JobStatus jobStatus = new JobStatus();
+	    JobModel jobModel = processContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " +
jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
@@ -269,19 +274,33 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
             log.info("[EJM]: Job Queued email received, " + jobDetails);
         }else if (resultState == JobState.ACTIVE) {
             // nothing special thing to do, update the status change to rabbit mq at the
end of this method.
+	        jobStatus.setJobState(JobState.QUEUED);
+	        jobStatus.setReason("Queued email received");
             log.info("[EJM]: Job Active email received, " + jobDetails);
         }else if (resultState == JobState.FAILED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
             runOutflowTasks = true;
+	        jobStatus.setJobState(JobState.FAILED);
+	        jobStatus.setReason("Failed email received");
             log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
         }else if (resultState == JobState.CANCELED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
             runOutflowTasks = false; // Do we need to run out handlers in canceled case?
+	        jobStatus.setJobState(JobState.CANCELED);
+	        jobStatus.setReason("Canceled email received");
             log.info("[EJM]: Job canceled mail received, removed job from job monitoring.
" + jobDetails);
-
         }
-        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-        publishJobStatusChange(processContext);
+	    if (jobStatus.getJobState() != null) {
+		    try {
+			    jobModel.setJobStatus(jobStatus);
+			    log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
+			    GFacUtils.saveJobStatus(processContext, jobModel);
+		    } catch (GFacException e) {
+			    log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and
publishing Job " +
+					    "status {}", processContext.getExperimentId(), processContext.getProcessId(), jobModel
+					    .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
+		    }
+	    }
 
         if (runOutflowTasks) {
             log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
@@ -293,10 +312,6 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         }
     }
 
-    private void publishJobStatusChange(ProcessContext processContext) {
-	    // TODO : implement this
-    }
-
     private void writeEnvelopeOnError(Message m) throws MessagingException {
         Address[] a;
         // FROM


Mime
View raw message