Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4FFAD18E7E for ; Fri, 8 Jan 2016 18:36:58 +0000 (UTC) Received: (qmail 47997 invoked by uid 500); 8 Jan 2016 18:36:58 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 47954 invoked by uid 500); 8 Jan 2016 18:36:58 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 47945 invoked by uid 99); 8 Jan 2016 18:36:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jan 2016 18:36:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F318E000F; Fri, 8 Jan 2016 18:36:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Message-Id: <903102d5734545d8b37b0b943bdc0fe2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: airavata git commit: Fixed edge case issues with cancel & recovery Date: Fri, 8 Jan 2016 18:36:58 +0000 (UTC) Repository: airavata Updated Branches: refs/heads/master 545e75344 -> 4792eac6e Fixed edge case issues with cancel & recovery Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4792eac6 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4792eac6 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4792eac6 Branch: refs/heads/master Commit: 4792eac6e173c9ec81bab4f510ebb45da341a3ad Parents: 545e753 Author: Shameera Rathnayaka Authored: Fri Jan 8 13:36:44 2016 -0500 Committer: Shameera Rathnayaka Committed: Fri Jan 8 13:36:44 2016 -0500 ---------------------------------------------------------------------- .../airavata/gfac/impl/GFacEngineImpl.java | 28 +++++++++------ .../apache/airavata/gfac/impl/GFacWorker.java | 36 +++++++++++++++----- .../gfac/monitor/email/EmailBasedMonitor.java | 18 ++++++---- 3 files changed, 56 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 00d920d..f264e6c 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -215,7 +215,7 @@ public class GFacEngineImpl implements GFacEngine { private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException { // checkpoint - if (processContext.isInterrupted()) { + if (processContext.isInterrupted() && processContext.getProcessState() != ProcessState.MONITORING) { GFacUtils.handleProcessInterrupt(processContext); return; } @@ -552,7 +552,12 @@ public class GFacEngineImpl implements GFacEngine { cancelJobSubmission(processContext, rTaskId, pTaskId); } continueProcess(processContext, recoverTaskId); + } else { + log.error("expId: {}, processId: {}, Error while recovering process, couldn't find recovery task", + processContext.getExperimentId(), processContext.getProcessId()); } + + } private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) { @@ -577,12 +582,17 @@ public class GFacEngineImpl implements GFacEngine { if (jobModels != null && !jobModels.isEmpty()) { JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1); - processContext.setJobModel(jobModel); - log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(), - processContext.getProcessId(), jobModel.getJobId()); - cancelProcess(processContext); - log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(), - processContext.getProcessId(), jobModel.getJobId()); + if (jobModel.getJobId() != null) { + processContext.setJobModel(jobModel); + log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(), + processContext.getProcessId(), jobModel.getJobId()); + cancelProcess(processContext); + log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(), + processContext.getProcessId(), jobModel.getJobId()); + } else { + log.error("expId: {}, processId: {}, Couldn't find jobId in jobModel, aborting process recovery", + processContext.getExperimentId(), processContext.getProcessId()); + } } } catch (GFacException e) { log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode", @@ -606,10 +616,6 @@ public class GFacEngineImpl implements GFacEngine { @Override public void continueProcess(ProcessContext processContext, String taskId) throws GFacException { - if (processContext.isInterrupted()) { - GFacUtils.handleProcessInterrupt(processContext); - return; - } executeTaskListFrom(processContext, taskId); } http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index e0664a5..fd6dad3 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -29,6 +29,7 @@ import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.registry.core.experiment.catalog.model.Process; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ public class GFacWorker implements Runnable { */ public GFacWorker(ProcessContext processContext) throws GFacException { if (processContext == null) { - throw new GFacException("Worker must initialize with valide processContext, Process context is null"); + throw new GFacException("Worker must initialize with valid processContext, Process context is null"); } this.processId = processContext.getProcessId(); this.gatewayId = processContext.getGatewayId(); @@ -78,7 +79,7 @@ public class GFacWorker implements Runnable { @Override public void run() { try { - ProcessState processState = processContext.getProcessStatus().getState(); + ProcessState processState = processContext.getProcessState(); switch (processState) { case CREATED: case VALIDATED: @@ -101,6 +102,9 @@ public class GFacWorker implements Runnable { case COMPLETED: completeProcess(); break; + case CANCELLING: + cancelProcess(); + break; case CANCELED: // TODO - implement cancel scenario break; @@ -111,12 +115,18 @@ public class GFacWorker implements Runnable { throw new GFacException("process Id : " + processId + " Couldn't identify process type"); } if (processContext.isCancel()) { - if (processContext.getProcessState() == ProcessState.MONITORING - || processContext.getProcessState() == ProcessState.EXECUTING) { - // don't send ack if the process is in MONITORING state, wait until cancel email comes to airavata. - } else { - sendAck(); - Factory.getGfacContext().removeProcess(processContext.getProcessId()); + processState = processContext.getProcessState(); + switch (processState) { + case MONITORING: case EXECUTING: + // don't send ack if the process is in MONITORING or EXECUTING states, wait until cancel email comes to airavata + break; + case CANCELLING: + cancelProcess(); + break; + default: + sendAck(); + Factory.getGfacContext().removeProcess(processContext.getProcessId()); + break; } } } catch (GFacException e) { @@ -143,6 +153,16 @@ public class GFacWorker implements Runnable { } } + private void cancelProcess() throws GFacException { + // do cleanup works before cancel the process. + ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELED); + processStatus.setReason("Process cancellation has been triggered"); + processContext.setProcessStatus(processStatus); + GFacUtils.saveAndPublishProcessStatus(processContext); + sendAck(); + Factory.getGfacContext().removeProcess(processContext.getProcessId()); + } + private void completeProcess() throws GFacException { ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 144465b..c7a6875 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -35,10 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobStatusResult; import org.apache.airavata.gfac.impl.GFacWorker; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.status.JobState; -import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.status.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -333,7 +330,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ // TODO : update job state on process context boolean runOutflowTasks = false; JobStatus jobStatus = new JobStatus(); - JobModel jobModel = taskContext.getParentProcessContext().getJobModel(); + ProcessContext parentProcessContext = taskContext.getParentProcessContext(); + JobModel jobModel = parentProcessContext.getJobModel(); String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId(); // TODO - Handle all other valid JobStates if (resultState == JobState.COMPLETE) { @@ -374,7 +372,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ try { jobModel.setJobStatus(jobStatus); log.info("[EJM]: Publishing status changes to amqp. " + jobDetails); - GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + GFacUtils.saveJobStatus(parentProcessContext, jobModel); } catch (GFacException e) { log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " + "status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel @@ -390,7 +388,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name()); taskContext.setTaskStatus(taskStatus); GFacUtils.saveAndPublishTaskStatus(taskContext); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext())); + if (parentProcessContext.isCancel()) { + ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING); + processStatus.setReason("Process has been cancelled"); + parentProcessContext.setProcessStatus(processStatus); + GFacUtils.saveAndPublishProcessStatus(parentProcessContext); + } + GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(parentProcessContext)); } catch (GFacException e) { log.info("[EJM]: Error while running output tasks", e); }