Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EB6A518658 for ; Tue, 3 Nov 2015 19:47:26 +0000 (UTC) Received: (qmail 86239 invoked by uid 500); 3 Nov 2015 19:47:26 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 86170 invoked by uid 500); 3 Nov 2015 19:47:26 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 85909 invoked by uid 99); 3 Nov 2015 19:47:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 19:47:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3729CE095E; Tue, 3 Nov 2015 19:47:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chathuri@apache.org To: commits@airavata.apache.org Date: Tue, 03 Nov 2015 19:47:29 -0000 Message-Id: <943e72a97cf7460181117db1eff947a1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/51] [abbrv] airavata git commit: AutoScheduling gfac side changes http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 946055e..52cd395 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -21,22 +21,24 @@ package org.apache.airavata.gfac.impl; -import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.monitor.JobMonitor; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.task.TaskModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.io.StringWriter; import java.text.MessageFormat; +import java.util.List; +import java.util.Map; public class GFacWorker implements Runnable { @@ -46,7 +48,7 @@ public class GFacWorker implements Runnable { private String processId; private String gatewayId; private String tokenId; - private boolean runOutflow = false; + private boolean continueTaskFlow = false; /** @@ -61,7 +63,7 @@ public class GFacWorker implements Runnable { this.tokenId = processContext.getTokenId(); engine = Factory.getGFacEngine(); this.processContext = processContext; - runOutflow = true; + continueTaskFlow = true; } /** @@ -98,19 +100,15 @@ public class GFacWorker implements Runnable { case CONFIGURING_WORKSPACE: case INPUT_DATA_STAGING: case EXECUTING: - recoverProcess(); - break; case MONITORING: - if (runOutflow) { - runProcessOutflow(); - } else { - monitorProcess(); - } - break; - case OUTPUT_DATA_STAGING: - case POST_PROCESSING: - recoverProcessOutflow(); - break; + case OUTPUT_DATA_STAGING: + case POST_PROCESSING: + if (continueTaskFlow) { + continueTaskExecution(); + } else { + recoverProcess(); + } + break; case COMPLETED: completeProcess(); break; @@ -160,56 +158,94 @@ public class GFacWorker implements Runnable { Factory.getGfacContext().removeProcess(processContext.getProcessId()); } - private void recoverProcessOutflow() throws GFacException { - engine.recoverProcessOutflow(processContext); - if (processContext.isInterrupted()) { - return; - } - completeProcess(); - } + private void continueTaskExecution() throws GFacException { + // checkpoint + if (processContext.isInterrupted()) { + return; + } + processContext.setPauseTaskExecution(false); + List taskExecutionOrder = processContext.getTaskExecutionOrder(); + String currentExecutingTaskId = processContext.getCurrentExecutingTaskId(); + boolean found = false; + String nextTaskId = null; + for (String taskId : taskExecutionOrder) { + if (!found) { + if (taskId.equalsIgnoreCase(currentExecutingTaskId)) { + found = true; + } + continue; + } else { + nextTaskId = taskId; + break; + } + } + if (nextTaskId != null) { + engine.continueProcess(processContext, nextTaskId); + } + // checkpoint + if (processContext.isInterrupted()) { + return; + } - private void runProcessOutflow() throws GFacException { - engine.runProcessOutflow(processContext); - if (processContext.isInterrupted()) { - return; - } - completeProcess(); - } + if (processContext.isComplete()) { + completeProcess(); + } + } private void recoverProcess() throws GFacException { - engine.recoverProcess(processContext); - if (processContext.isInterrupted()) { - return; - } - monitorProcess(); - } + + String taskDag = processContext.getProcessModel().getTaskDag(); + List taskExecutionOrder = GFacUtils.parseTaskDag(taskDag); + Map taskMap = processContext.getTaskMap(); + String recoverTaskId = null; + for (String taskId : taskExecutionOrder) { + TaskModel taskModel = taskMap.get(taskId); + TaskState state = taskModel.getTaskStatus().getState(); + if (state == TaskState.CREATED || state == TaskState.EXECUTING) { + recoverTaskId = taskId; + break; + } + } + + engine.recoverProcess(processContext, recoverTaskId); + if (processContext.isInterrupted()) { + return; + } + + if (processContext.isComplete()) { + completeProcess(); + } + } private void executeProcess() throws GFacException { engine.executeProcess(processContext); if (processContext.isInterrupted()) { return; } - monitorProcess(); - } - private void monitorProcess() throws GFacException { - try { - JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); - if (monitorService != null) { - monitorService.monitor(processContext.getJobModel().getJobId(), processContext); - ProcessStatus status = new ProcessStatus(ProcessState.MONITORING); - status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - processContext.setProcessStatus(status); - GFacUtils.saveAndPublishProcessStatus(processContext); - } else { - // we directly invoke outflow - runProcessOutflow(); - } - } catch (AiravataException e) { - throw new GFacException("Error while retrieving moniot service", e); - } + if (processContext.isComplete()) { + completeProcess(); + } } +// private void monitorProcess() throws GFacException { +// try { +// JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); +// if (monitorService != null) { +// monitorService.monitor(processContext.getJobModel().getJobId(), processContext); +// ProcessStatus status = new ProcessStatus(ProcessState.MONITORING); +// status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); +// processContext.setProcessStatus(status); +// GFacUtils.saveAndPublishProcessStatus(processContext); +// } else { +// // we directly invoke outflow +// continueTaskExecution(); +// } +// } catch (AiravataException e) { +// throw new GFacException("Error while retrieving moniot service", e); +// } +// } + private void sendAck() { try { long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 359d39c..c4d4676 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -32,10 +32,6 @@ import org.apache.airavata.gfac.core.monitor.EmailParser; import org.apache.airavata.gfac.core.monitor.JobMonitor; import org.apache.airavata.gfac.core.monitor.JobStatusResult; import org.apache.airavata.gfac.impl.GFacWorker; -import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser; -import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser; -import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser; -import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.status.JobState; @@ -52,7 +48,6 @@ import javax.mail.Session; import javax.mail.Store; import javax.mail.search.FlagTerm; import javax.mail.search.SearchTerm; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -124,6 +119,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ public void monitor(String jobId, ProcessContext processContext) { log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map"); jobMonitorMap.put(jobId, processContext); + processContext.setPauseTaskExecution(true); } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index b892a00..4468a1c 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -371,7 +371,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskModel.setTaskStatus(taskStatus); taskModel.setTaskType(TaskTypes.JOB_SUBMISSION); JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel(); - submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); +// submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask); taskModel.setSubTaskModel(bytes); orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,