airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: Updated email based monitoring to work with ProcessContext
Date Tue, 16 Jun 2015 20:06:13 GMT
Repository: airavata
Updated Branches:
  refs/heads/master a2b6bdfd9 -> d05c0a166


Updated email based monitoring to work with ProcessContext


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/754bc5c4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/754bc5c4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/754bc5c4

Branch: refs/heads/master
Commit: 754bc5c4e90dded69fe14233e5bad5dac5e7dbca
Parents: d9b2df0
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Tue Jun 16 16:05:47 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Tue Jun 16 16:05:47 2015 -0400

----------------------------------------------------------------------
 .../gfac/core/monitor/JobStatusResult.java      |   3 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   | 103 +++++++------------
 .../monitor/email/parser/LSFEmailParser.java    |   2 +-
 .../monitor/email/parser/PBSEmailParser.java    |   2 +-
 .../monitor/email/parser/SLURMEmailParser.java  |   2 +-
 .../monitor/email/parser/UGEEmailParser.java    |   2 +-
 6 files changed, 41 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
index 9c4fcc3..dfc77ac 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
@@ -20,7 +20,8 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import org.apache.airavata.model.experiment.JobState;
+
+import org.apache.airavata.model.status.JobState;
 
 public class JobStatusResult {
     private JobState state;

http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 5ef0e88..08f8423 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,29 +21,20 @@
 package org.apache.airavata.gfac.monitor.email;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.gfac.core.monitor.EmailParser;
-import org.apache.airavata.gfac.impl.OutHandlerWorker;
+import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
 import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.experiment.CorrectiveAction;
-import org.apache.airavata.model.experiment.ErrorCategory;
-import org.apache.airavata.model.experiment.JobState;
-import org.apache.airavata.model.experiment.JobStatus;
+import org.apache.airavata.model.status.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +67,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
     private Store store;
     private Folder emailFolder;
     private Properties properties;
-    private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String,
JobExecutionContext>();
+    private Map<String, ProcessContext> jobMonitorMap = new ConcurrentHashMap<>();
     private String host, emailAddress, password, storeProtocol, folderName ;
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType,
EmailParser>();
@@ -99,18 +90,16 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         properties.put("mail.store.protocol", storeProtocol);
     }
 
-    public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
-        String monitorId = jobExecutionContext.getJobDetails().getJobID();
-        if (monitorId == null || monitorId.isEmpty()) {
-            monitorId = jobExecutionContext.getJobDetails().getJobName();
-        }
-        addToJobMonitorMap(monitorId, jobExecutionContext);
-    }
+	@Override
+	public void monitor(String jobId, ProcessContext processContext) {
+		log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
+		jobMonitorMap.put(jobId, processContext);
+	}
 
-    public void addToJobMonitorMap(String monitorId, JobExecutionContext jobExecutionContext)
{
-        log.info("[EJM]: Added monitor Id : " + monitorId + " to email based monitor map");
-        jobMonitorMap.put(monitorId, jobExecutionContext);
-    }
+	@Override
+	public void stopMonitor(String jobId) {
+		jobMonitorMap.remove(jobId);
+	}
 
     private JobStatusResult parse(Message message) throws MessagingException, AiravataException
{
         Address fromAddress = message.getFrom()[0];
@@ -132,7 +121,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
                     emailParser = new UGEEmailParser();
                     break;
                 default:
-                    throw new AiravataException("[EJM]: Un-handle resource job manager type:
" + jobMonitorType.toString() + " for email monitoring -->  " + addressStr);
+	                throw new AiravataException("[EJM]: Un-handle resource job manager type:
" + jobMonitorType
+			                .toString() + " for email monitoring -->  " + addressStr);
             }
 
             emailParserMap.put(jobMonitorType, emailParser);
@@ -218,12 +208,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         for (Message message : searchMessages) {
             try {
                 JobStatusResult jobStatusResult = parse(message);
-                JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
-                if (jEC == null) {
-                    jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+                ProcessContext processContext = jobMonitorMap.get(jobStatusResult.getJobId());
+                if (processContext == null) {
+	                processContext = jobMonitorMap.get(jobStatusResult.getJobName());
                 }
-                if (jEC != null) {
-                    process(jobStatusResult, jEC);
+                if (processContext != null) {
+                    process(jobStatusResult, processContext);
                     processedMessages.add(message);
                 } else {
                     // we can get JobExecutionContext null in multiple Gfac instances environment,
@@ -272,15 +262,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         }
     }
 
-    private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
+    private void process(JobStatusResult jobStatusResult, ProcessContext processContext){
         JobState resultState = jobStatusResult.getState();
-        jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
-        boolean runOutHandlers = false;
+	    // TODO : update job state on process context
+        boolean runOutflowTasks = false;
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " +
jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutHandlers = true;
+            runOutflowTasks = true;
             log.info("[EJM]: Job Complete email received , removed job from job monitoring.
" + jobDetails);
         }else if (resultState == JobState.QUEUED) {
             // nothing special thing to do, update the status change to rabbit mq at the
end of this method.
@@ -290,44 +280,29 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
             log.info("[EJM]: Job Active email received, " + jobDetails);
         }else if (resultState == JobState.FAILED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutHandlers = true;
+            runOutflowTasks = true;
             log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
-            try {
-                GFacUtils.saveErrorDetails(jEC, "Job runs on remote compute resource failed",
CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.APPLICATION_FAILURE);
-            } catch (GFacException e) {
-                log.info("[EJM]: Error while saving error details for jobId:{}, expId: {}",
jEC.getJobDetails().getJobID(), jEC.getExperimentID());
-            }
         }else if (resultState == JobState.CANCELED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutHandlers = false; // Do we need to run out handlers in canceled case?
+            runOutflowTasks = false; // Do we need to run out handlers in canceled case?
             log.info("[EJM]: Job canceled mail received, removed job from job monitoring.
" + jobDetails);
 
         }
         log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-        publishJobStatusChange(jEC);
+        publishJobStatusChange(processContext);
 
-        if (runOutHandlers) {
+        if (runOutflowTasks) {
             log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
-            GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC));
+	        try {
+		        GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+	        } catch (GFacException e) {
+		        log.info("[EJM]: Error while running output tasks", e);
+	        }
         }
     }
 
-    private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
-        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
-        JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
-                jobExecutionContext.getTaskData().getTaskID(),
-                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                jobExecutionContext.getExperimentID(),
-                jobExecutionContext.getGatewayID());
-        jobStatus.setJobIdentity(jobIdentity);
-        jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
-        // we have this JobStatus class to handle amqp monitoring
-        log.debugId(jobStatus.getJobIdentity().getJobId(), "[EJM]: Published job status("
+
-                        jobExecutionContext.getJobDetails().getJobStatus().getJobState().toString()
+ ") change request, " +
-                        "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
-                jobStatus.getJobIdentity().getTaskId());
-
-        jobExecutionContext.getLocalEventPublisher().publish(jobStatus);
+    private void publishJobStatusChange(ProcessContext processContext) {
+	    // TODO : implement this
     }
 
     private void writeEnvelopeOnError(Message m) throws MessagingException {
@@ -355,13 +330,5 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         this.monitorStartDate = date;
     }
 
-	@Override
-	public void monitor(String jobId, ProcessContext processContext) {
 
-	}
-
-	@Override
-	public void stopMonitor(String jobId) {
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
index dc7999c..d6e396e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
index 6f2446b..3879daf 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
index db7521c..3b0e32a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/754bc5c4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
index e147d73..266456e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.email.parser;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
-import org.apache.airavata.model.experiment.JobState;
+import org.apache.airavata.model.status.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 


Mime
View raw message