airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From machris...@apache.org
Subject [29/45] airavata git commit: handling race conditions with multip
Date Wed, 08 Mar 2017 17:13:09 GMT
handling race conditions with multip


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af39d44b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af39d44b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af39d44b

Branch: refs/heads/ansible-testing-0.17
Commit: af39d44b2d0bef340399e45f8c7e27dd7b1a6bc1
Parents: 3c453ae
Author: scnakandala <supun.nakandala@gmail.com>
Authored: Sat Mar 4 20:58:41 2017 -0500
Committer: scnakandala <supun.nakandala@gmail.com>
Committed: Sat Mar 4 20:58:41 2017 -0500

----------------------------------------------------------------------
 .../server/src/main/resources/gfac-config.yaml  |   5 +
 .../gfac/core/monitor/JobStatusResult.java      |   9 ++
 .../gfac/monitor/email/EmailBasedMonitor.java   | 103 +++++++++++++------
 .../email/parser/AiravataCustomMailParser.java  |   8 ++
 4 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml
index cea54a6..edb7922 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -113,3 +113,8 @@ resources:
 
   - jobManagerType: FORK
     commandOutputParser: org.apache.airavata.gfac.impl.job.ForkOutputParser
+
+  - jobManagerType: AIRAVATA
+    emailParser: org.apache.airavata.gfac.monitor.email.parser.AiravataCustomMailParser
+    resourceEmailAddresses:
+      - gw56jobs@scigap.org

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
index 3e43b49..9c80c9d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
@@ -25,6 +25,7 @@ import org.apache.airavata.model.status.JobState;
 public class JobStatusResult {
     private JobState state;
     private String jobId;
+    private boolean authoritative = true;
 
     public String getJobName() {
         return jobName;
@@ -51,5 +52,13 @@ public class JobStatusResult {
     public void setJobId(String jobId) {
         this.jobId = jobId;
     }
+
+    public boolean isAuthoritative() {
+        return authoritative;
+    }
+
+    public void setAuthoritative(boolean authoritative) {
+        this.authoritative = authoritative;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index bbcd635..02dfa00 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -337,42 +337,87 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         ProcessContext parentProcessContext = taskContext.getParentProcessContext();
         JobModel jobModel = parentProcessContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " +
jobStatusResult.getJobId();
+
+        JobState currentState = null;
+        List<JobStatus> jobStatusList = jobModel.getJobStatuses();
+        if (jobStatusList != null && jobStatusList.size() > 0) {
+            JobStatus lastStatus = jobStatusList.get(0);
+            for (JobStatus temp : jobStatusList) {
+                if (temp.getTimeOfStateChange() >= lastStatus.getTimeOfStateChange())
{
+                    lastStatus = temp;
+                }
+            }
+            currentState = lastStatus.getJobState();
+        }
+
         // TODO - Handle all other valid JobStates
+        // FIXME - What if non-authoritative email comes later (getting accumulated in the
email account)
         if (resultState == JobState.COMPLETE) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-	        jobStatus.setJobState(JobState.COMPLETE);
-	        jobStatus.setReason("Complete email received");
-            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-	        runOutflowTasks = true;
-            log.info("[EJM]: Job Complete email received , removed job from job monitoring.
" + jobDetails);
+            if (jobStatusResult.isAuthoritative()) {
+                if (currentState != null && currentState == JobState.COMPLETE) {
+                    jobMonitorMap.remove(jobStatusResult.getJobId());
+                    runOutflowTasks = false;
+                    log.info("[EJM]: Job Complete email received , removed job from job monitoring.
" + jobDetails);
+                } else {
+                    jobMonitorMap.remove(jobStatusResult.getJobId());
+                    runOutflowTasks = true;
+                    jobStatus.setJobState(JobState.COMPLETE);
+                    jobStatus.setReason("Complete email received");
+                    jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    log.info("[EJM]: Job Complete email received , removed job from job monitoring.
" + jobDetails);
+                }
+            } else {
+                runOutflowTasks = true;
+                jobStatus.setJobState(JobState.COMPLETE);
+                jobStatus.setReason("Complete email received");
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Non Authoritative Job Complete email received. " + jobDetails);
+            }
         }else if (resultState == JobState.QUEUED) {
-	        // nothing special thing to do, update the status change to rabbit mq at the end
of this method.
-	        jobStatus.setJobState(JobState.QUEUED);
-	        jobStatus.setReason("Queue email received");
-            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-	        log.info("[EJM]: Job Queued email received, " + jobDetails);
+            //It is possible that we will get an early complete message from custom Airavata
emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                // nothing special thing to do, update the status change to rabbit mq at
the end of this method.
+                jobStatus.setJobState(JobState.QUEUED);
+                jobStatus.setReason("Queue email received");
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job Queued email received, " + jobDetails);
+            }
         }else if (resultState == JobState.ACTIVE) {
-            // nothing special thing to do, update the status change to rabbit mq at the
end of this method.
-	        jobStatus.setJobState(JobState.ACTIVE);
-	        jobStatus.setReason("Active email received");
-            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            log.info("[EJM]: Job Active email received, " + jobDetails);
+            //It is possible that we will get an early complete message from custom Airavata
emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                // nothing special thing to do, update the status change to rabbit mq at
the end of this method.
+                jobStatus.setJobState(JobState.ACTIVE);
+                jobStatus.setReason("Active email received");
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job Active email received, " + jobDetails);
+            }
         }else if (resultState == JobState.FAILED) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutflowTasks = true;
-	        jobStatus.setJobState(JobState.FAILED);
-	        jobStatus.setReason("Failed email received");
-            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
+            //It is possible that we will get an early complete message from custom Airavata
emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                jobMonitorMap.remove(jobStatusResult.getJobId());
+                runOutflowTasks = true;
+                jobStatus.setJobState(JobState.FAILED);
+                jobStatus.setReason("Failed email received");
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
+            }
         }else if (resultState == JobState.CANCELED) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-            jobStatus.setJobState(JobState.CANCELED);
-	        jobStatus.setReason("Canceled email received");
-            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-	        log.info("[EJM]: Job canceled mail received, removed job from job monitoring. "
+ jobDetails);
-	        runOutflowTasks = true; // we run out flow and this will move process to cancel
state.
+            //It is possible that we will get an early complete message from custom Airavata
emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                jobMonitorMap.remove(jobStatusResult.getJobId());
+                jobStatus.setJobState(JobState.CANCELED);
+                jobStatus.setReason("Canceled email received");
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job canceled mail received, removed job from job monitoring.
" + jobDetails);
+                runOutflowTasks = true; // we run out flow and this will move process to
cancel state.
+            }
         }
-	    if (jobStatus.getJobState() != null) {
+
+        if (jobStatus.getJobState() != null) {
 		    try {
 			    jobModel.setJobStatuses(Arrays.asList(jobStatus));
 			    log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
index 8810814..9c773b1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
@@ -55,6 +55,14 @@ public class AiravataCustomMailParser implements EmailParser {
             jobStatusResult.setJobId(matcher.group(JOBID));
             jobStatusResult.setJobName(matcher.group(JOBNAME));
             jobStatusResult.setState(getJobState(matcher.group(STATUS)));
+            jobStatusResult.setAuthoritative(false);
+
+            try {
+                //Waiting some time for the scheduler to move the job from completing to
completed.
+                Thread.sleep(5000);
+            } catch (Exception ex) {
+            }
+
         } else {
             log.error("[EJM]: No matched found for subject -> " + subject);
         }


Mime
View raw message