airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [27/54] [abbrv] airavata git commit: Fixed issues with cancel experiment operation
Date Fri, 04 Dec 2015 21:01:52 GMT
Fixed issues with cancel experiment operation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/497178de
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/497178de
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/497178de

Branch: refs/heads/master
Commit: 497178dea0deb5b9e8911c6bb869da4b32facdd1
Parents: bb5be4b
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Wed Nov 25 15:05:19 2015 -0500
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Wed Nov 25 15:05:19 2015 -0500

----------------------------------------------------------------------
 .../airavata/gfac/core/monitor/JobMonitor.java  |  6 +++
 .../airavata/gfac/impl/GFacEngineImpl.java      | 28 ++++++-------
 .../apache/airavata/gfac/impl/GFacWorker.java   | 15 -------
 .../impl/task/DefaultJobSubmissionTask.java     | 17 ++++++--
 .../impl/watcher/CancelRequestWatcherImpl.java  | 37 ++++++++++++-----
 .../gfac/monitor/email/EmailBasedMonitor.java   | 42 ++++++++++++++++----
 6 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index 4b2ecb2..5e83feb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -40,4 +40,10 @@ public interface JobMonitor {
      * Return <code>true</code> if jobId is already monitoring by this Monitor,
<code>false</code> if not
      */
     boolean isMonitoring(String jobId);
+
+	/**
+	 * make monitor service aware of cancelled jobs, in case job monitor details doesn't comes
withing predefine time
+	 * it will move job to CANCELED state and call output
+	 */
+	void canceledJob(String jobId);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index db2fb50..380349e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -255,11 +255,7 @@ public class GFacEngineImpl implements GFacEngine {
                     processContext.setProcessStatus(status);
                     GFacUtils.saveAndPublishProcessStatus(processContext);
                     executeJobSubmission(taskContext, processContext.isRecovery());
-                    // checkpoint
-                    if (processContext.isInterrupted()) {
-                        GFacUtils.handleProcessInterrupt(processContext);
-                        return;
-                    }
+                    // Don't put any checkpoint in between JobSubmission and Monitoring tasks
 
                     JobStatus jobStatus = processContext.getJobModel().getJobStatus();
                     if (jobStatus != null && (jobStatus.getJobState() == JobState.SUBMITTED
@@ -583,9 +579,8 @@ public class GFacEngineImpl implements GFacEngine {
     @Override
     public void cancelProcess(ProcessContext processContext) throws GFacException {
         if (processContext != null) {
-            processContext.setCancel(true);
             switch (processContext.getProcessState()) {
-                case MONITORING:
+                case MONITORING: case EXECUTING:
                     // get job submission task and invoke cancel
                     JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
                     TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
@@ -614,16 +609,21 @@ public class GFacEngineImpl implements GFacEngine {
         try {
             JobStatus oldJobStatus = jSTask.cancel(taskContext);
 
-            if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED)
{
-                JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
-                monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(),
true);
+/*            if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED)
{
+                ProcessContext pc = taskContext.getParentProcessContext();
                 JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
                 newJobStatus.setReason("Job cancelled");
                 newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
-                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
-                        .getJobModel());
-            }
+                pc.getJobModel().setJobStatus(newJobStatus);
+                GFacUtils.saveJobStatus(pc, pc.getJobModel());
+                JobMonitor monitorService = Factory.getMonitorService(pc.getMonitorMode());
+                monitorService.stopMonitor(pc.getJobModel().getJobId(), true);
+            }*/
+
+            ProcessContext pc = taskContext.getParentProcessContext();
+            JobMonitor monitorService = Factory.getMonitorService(pc.getMonitorMode());
+            monitorService.canceledJob(pc.getJobModel().getJobId());
+
         } catch (TaskException e) {
             throw new GFacException("Error while cancelling job");
         } catch (AiravataException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 970cbf0..596baab 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -193,21 +193,6 @@ public class GFacWorker implements Runnable {
     }
 
 	private void recoverProcess() throws GFacException {
-
-        String taskDag = processContext.getProcessModel().getTaskDag();
-        List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
-        processContext.setTaskExecutionOrder(taskExecutionOrder);
-        Map<String, TaskModel> taskMap = processContext.getTaskMap();
-        String recoverTaskId = null;
-        for (String taskId : taskExecutionOrder) {
-            TaskModel taskModel = taskMap.get(taskId);
-            TaskState state = taskModel.getTaskStatus().getState();
-            if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
-                recoverTaskId = taskId;
-                break;
-            }
-        }
-
         engine.recoverProcess(processContext);
         if (processContext.isInterrupted()) {
             return;

http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 9ccaa94..d435b5f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -34,11 +34,9 @@ import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
 import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -262,6 +260,17 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		JobModel jobModel = processContext.getJobModel();
 		int retryCount = 0;
 		if (jobModel != null) {
+			if (processContext.getProcessState() == ProcessState.EXECUTING) {
+				while (jobModel.getJobId() == null) {
+					log.info("Cancellation pause until process get jobId");
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+			}
+
 			try {
 				JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
 				while (oldJobStatus == null && retryCount <= 5) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 8a2dce3..595380f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.impl.watcher;
 
 import org.apache.airavata.common.utils.ZkConstants;
+import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.impl.Factory;
@@ -35,6 +36,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 	private static final Logger log = LoggerFactory.getLogger(CancelRequestWatcherImpl.class);
 	private final String processId;
 	private final String experimentId;
+	private final int max_retry = 3;
 
 	public CancelRequestWatcherImpl(String experimentId, String processId) {
 		this.experimentId = experimentId;
@@ -47,20 +49,13 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher
{
 		String path = watchedEvent.getPath();
 		Watcher.Event.EventType type = watchedEvent.getType();
 		CuratorFramework curatorClient = Factory.getCuratorClient();
+		log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 cancel watcher triggered process id {}<<<<<<<<<<<<<<<<<<<<<<<<<<<",
processId);
 		switch (type) {
 			case NodeDataChanged:
 				byte[] bytes = curatorClient.getData().forPath(path);
 				String action = new String(bytes);
 				if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
-					ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
-					if (processContext != null) {
-						processContext.setCancel(true);
-						Factory.getGFacEngine().cancelProcess(processContext);
-						log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
-					} else {
-						log.info("expId: {}, Cancel request came for processId {} but couldn't find process
context",
-								experimentId, processId);
-					}
+					cancelProcess(0);
 				} else {
 					curatorClient.getData().usingWatcher(this).forPath(path);
 				}
@@ -88,4 +83,28 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 				break;
 		}
 	}
+
+	private void cancelProcess(int retryAttempt) throws GFacException {
+		ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
+		if (processContext != null) {
+            processContext.setCancel(true);
+            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>
calling process cancelling operation <<<<<<<<<<<<<<<<<<<<<<<<<<<");
+            Factory.getGFacEngine().cancelProcess(processContext);
+            log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+        } else {
+			if (retryAttempt < max_retry) {
+				log.info("expId: {}, Cancel request came for processId {} but couldn't find process context.
" +
+						"retry in {} ms ", experimentId, processId, retryAttempt);
+				try {
+					Thread.sleep(retryAttempt++*1000);
+				} catch (InterruptedException e) {
+					// ignore we don't care this exception.
+				}
+				cancelProcess(retryAttempt);
+			} else {
+				log.info("expId: {}, Cancel request came for processId {} but couldn't find process context.",
+						experimentId, processId);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 7e9e505..64b9be7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -51,12 +51,7 @@ import javax.mail.Session;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class EmailBasedMonitor implements JobMonitor, Runnable{
@@ -77,9 +72,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType,
EmailParser>();
 	private Map<String, ResourceJobManagerType> addressMap = new HashMap<>();
 	private Message[] flushUnseenMessages;
+    private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
+    private Timer timer;
 
 
-	public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs)
throws AiravataException {
+    public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs)
throws AiravataException {
 		init();
 		populateAddressAndParserMap(resourceConfigs);
 	}
@@ -96,6 +93,9 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         }
         properties = new Properties();
         properties.put("mail.store.protocol", storeProtocol);
+        timer = new Timer("CancelJobHandler", true);
+        long period = 1000*60*5; // five minute delay between successive task executions.
+        timer.schedule(new CancelTimerTask(), 0 , period);
     }
 
 	private void populateAddressAndParserMap(Map<ResourceJobManagerType, ResourceConfig>
resourceConfigs) throws AiravataException {
@@ -142,6 +142,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         return jobMonitorMap.containsKey(jobId);
     }
 
+    @Override
+    public void canceledJob(String jobId) {
+        canceledJobs.put(jobId, Boolean.FALSE);
+    }
+
     private JobStatusResult parse(Message message) throws MessagingException, AiravataException
{
         Address fromAddress = message.getFrom()[0];
         String addressStr = fromAddress.toString();
@@ -309,6 +314,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	    JobStatus jobStatus = new JobStatus();
 	    JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " +
jobStatusResult.getJobId();
+        canceledJobs.remove(jobStatusResult.getJobId());
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
@@ -338,7 +344,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
             log.info("[EJM]: Job failed email received , removed job from job monitoring.
" + jobDetails);
         }else if (resultState == JobState.CANCELED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-	        jobStatus.setJobState(JobState.CANCELED);
+            jobStatus.setJobState(JobState.CANCELED);
 	        jobStatus.setReason("Canceled email received");
             jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 	        log.info("[EJM]: Job canceled mail received, removed job from job monitoring. "
+ jobDetails);
@@ -395,5 +401,25 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         this.monitorStartDate = date;
     }
 
+    private class CancelTimerTask extends TimerTask {
 
+        @Override
+        public void run() {
+            if (!canceledJobs.isEmpty()) {
+                Iterator<Map.Entry<String, Boolean>> cancelJobIter = canceledJobs.entrySet().iterator();
+                while (cancelJobIter.hasNext()) {
+                    Map.Entry<String, Boolean> cancelJobIdWithFlag = cancelJobIter.next();
+                    if (!cancelJobIdWithFlag.getValue()) {
+                        cancelJobIdWithFlag.setValue(Boolean.TRUE);
+                    } else {
+                        TaskContext taskContext = jobMonitorMap.get(cancelJobIdWithFlag.getKey());
+                        if (taskContext != null) {
+                            stopMonitor(cancelJobIdWithFlag.getKey(), true);
+                        }
+                        cancelJobIter.remove();
+                    }
+                }
+            }
+        }
+    }
 }


Mime
View raw message