Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C45F11170 for ; Sat, 19 Jul 2014 14:08:30 +0000 (UTC) Received: (qmail 97419 invoked by uid 500); 19 Jul 2014 14:08:30 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 97371 invoked by uid 500); 19 Jul 2014 14:08:30 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 97360 invoked by uid 99); 19 Jul 2014 14:08:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Jul 2014 14:08:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D9DF494FD0D; Sat, 19 Jul 2014 14:08:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samindaw@apache.org To: commits@airavata.apache.org Date: Sat, 19 Jul 2014 14:08:29 -0000 Message-Id: <8d17a729976144f3b4176ede33f75636@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: removing unwanted expeirment status publishing in the gfac Repository: airavata Updated Branches: refs/heads/workflow-support 4752d1a3f -> 92838e4a8 removing unwanted expeirment status publishing in the gfac Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/601d5add Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/601d5add Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/601d5add Branch: refs/heads/workflow-support Commit: 601d5add58e51046ffffa974f0dcb3b686c7db4e Parents: 4752d1a Author: Saminda Wijeratne Authored: Sat Jul 12 16:20:46 2014 -0400 Committer: Saminda Wijeratne Committed: Sat Jul 12 16:20:46 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 50 ++++++------ .../apache/airavata/gfac/core/cpi/GFacImpl.java | 25 +++--- .../AiravataExperimentStatusUpdator.java | 45 ++++++++++- .../core/monitor/AiravataJobStatusUpdator.java | 44 ++--------- .../core/monitor/AiravataTaskStatusUpdator.java | 75 ++++++++++-------- .../AiravataWorkflowNodeStatusUpdator.java | 80 +++++++++----------- .../state/ExperimentStatusChangeRequest.java | 63 --------------- .../state/ExperimentStatusChangedEvent.java | 63 +++++++++++++++ .../monitor/state/JobStatusChangedEvent.java | 80 ++++++++++++++++++++ .../monitor/state/TaskStatusChangedEvent.java | 61 +++++++++++++++ .../state/WorkflowNodeStatusChangeRequest.java | 63 --------------- .../state/WorkflowNodeStatusChangedEvent.java | 63 +++++++++++++++ .../monitor/impl/pull/qstat/HPCPullMonitor.java | 9 ++- 13 files changed, 439 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 24ab0c3..62c44ab 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -56,10 +56,11 @@ import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; import org.apache.airavata.gfac.core.monitor.JobIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.monitor.TaskIdentity; -import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent; import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.core.notification.listeners.LoggingListener; @@ -540,15 +541,15 @@ public class BetterGfacImpl implements GFac { try { // we make the experiment as failed due to exception scenario monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.FAILED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED - )); +// monitorPublisher.publish(new TaskStatusChangedEvent( +// new TaskIdentity(jobExecutionContext.getExperimentID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), +// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED +// )); monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), @@ -585,15 +586,15 @@ public class BetterGfacImpl implements GFac { try { // we make the experiment as failed due to exception scenario monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.FAILED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED - )); +// monitorPublisher.publish(new TaskStatusChangeRequest( +// new TaskIdentity(jobExecutionContext.getExperimentID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), +// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED +// )); monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), @@ -771,9 +772,9 @@ public class BetterGfacImpl implements GFac { // At this point all the execution is finished so we update the task and experiment statuses. // Handler authors does not have to worry about updating experiment or task statuses. - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.COMPLETED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.COMPLETED)); // Updating the task status if there's any task associated monitorPublisher.publish(new TaskStatusChangeRequest( new TaskIdentity(jobExecutionContext.getExperimentID(), @@ -900,11 +901,12 @@ public class BetterGfacImpl implements GFac { // At this point all the execution is finished so we update the task and experiment statuses. // Handler authors does not have to worry about updating experiment or task statuses. - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.COMPLETED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( + + monitorPublisher.publish(new TaskStatusChangedEvent( new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index 34800e9..2065cee 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -44,9 +44,10 @@ import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.monitor.*; -import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent; import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.core.notification.listeners.LoggingListener; @@ -322,15 +323,15 @@ public class GFacImpl implements GFac { } catch (Exception e) { try { // we make the experiment as failed due to exception scenario - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.FAILED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED - )); +// monitorPublisher.publish(new TaskStatusChangedEvent( +// new TaskIdentity(jobExecutionContext.getExperimentID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), +// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED +// )); monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), @@ -449,9 +450,9 @@ public class GFacImpl implements GFac { monitorPublisher.publish(GfacExperimentState.COMPLETED); // At this point all the execution is finished so we update the task and experiment statuses. // Handler authors does not have to worry about updating experiment or task statuses. - monitorPublisher.publish(new - ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()), - ExperimentState.COMPLETED)); +// monitorPublisher.publish(new +// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), +// ExperimentState.COMPLETED)); // Updating the task status if there's any task associated monitorPublisher.publish(new TaskStatusChangeRequest( new TaskIdentity(jobExecutionContext.getExperimentID(), http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java index f5d82ff..664f237 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java @@ -21,7 +21,10 @@ package org.apache.airavata.gfac.core.monitor; import com.google.common.eventbus.Subscribe; -import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest; + +import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent; +import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent; +import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.registry.cpi.RegistryModelType; @@ -35,6 +38,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class); private Registry airavataRegistry; + private MonitorPublisher monitorPublisher; public Registry getAiravataRegistry() { return airavataRegistry; @@ -45,7 +49,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener } @Subscribe - public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) { + public void updateRegistry(ExperimentStatusChangedEvent experimentStatus) { ExperimentState state = experimentStatus.getState(); if (state != null) { try { @@ -57,6 +61,41 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener } } + + @Subscribe + public void setupExperimentStatus(WorkflowNodeStatusChangedEvent nodeStatus) { + ExperimentState state = ExperimentState.UNKNOWN; + switch (nodeStatus.getState()) { + case CANCELED: + state = ExperimentState.CANCELED; + break; + case COMPLETED: + state = ExperimentState.COMPLETED; + break; + case INVOKED: + state = ExperimentState.LAUNCHED; + break; + case FAILED: + state = ExperimentState.FAILED; + break; + case EXECUTING: + state = ExperimentState.EXECUTING; + break; + case CANCELING: + state = ExperimentState.CANCELING; + break; + default: + break; + } + try { + updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state); + logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString()); + monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state)); + } catch (Exception e) { + logger.error("Error persisting data" + e.getLocalizedMessage(), e); + } + } + public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception { logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString()); Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId); @@ -76,6 +115,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener for (Object configuration : configurations) { if (configuration instanceof Registry){ this.airavataRegistry=(Registry)configuration; + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index 29ee82f..6ad55d2 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -20,21 +20,20 @@ */ package org.apache.airavata.gfac.core.monitor; -import com.google.common.eventbus.Subscribe; +import java.util.Calendar; + import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.registry.cpi.CompositeIdentifier; -import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Calendar; -import java.util.concurrent.BlockingQueue; +import com.google.common.eventbus.Subscribe; public class AiravataJobStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class); @@ -64,43 +63,16 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { String taskID = jobStatus.getIdentity().getTaskId(); String jobID = jobStatus.getIdentity().getJobId(); updateJobStatus(taskID, jobID, state); + logger.debug("Publishing job status for "+jobStatus.getIdentity().getJobId()+":"+state.toString()); + monitorPublisher.publish(new JobStatusChangedEvent(jobStatus.getMonitorID(),jobStatus.getIdentity(),state)); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } - logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString()); } } - @Subscribe - public void setupTaskStatus(JobStatusChangeRequest jobStatus){ - TaskState state=TaskState.UNKNOWN; - switch(jobStatus.getState()){ - case ACTIVE: - state=TaskState.EXECUTING; break; - case CANCELED: - state=TaskState.CANCELED; break; - case COMPLETE: - state=TaskState.COMPLETED; break; - case FAILED: - state=TaskState.FAILED; break; - case HELD: case SUSPENDED: case QUEUED: - state=TaskState.WAITING; break; - case SETUP: - state=TaskState.PRE_PROCESSING; break; - case SUBMITTED: - state=TaskState.STARTED; break; - case UN_SUBMITTED: - state=TaskState.CANCELED; break; - case CANCELING: - state=TaskState.CANCELING; break; - default: - break; - } - logger.debug("Publishing Task Status "+state.toString()); - monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state)); - } - public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { + logger.debug("Updating job status for "+jobID+":"+state.toString()); CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); JobDetails details = (JobDetails)airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids); if(details == null) { http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index a6ab613..26d49c0 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -20,19 +20,20 @@ */ package org.apache.airavata.gfac.core.monitor; -import com.google.common.eventbus.Subscribe; +import java.util.Calendar; + +import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent; import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; -import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Calendar; +import com.google.common.eventbus.Subscribe; public class AiravataTaskStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class); @@ -48,46 +49,54 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { public void setAiravataRegistry(Registry airavataRegistry) { this.airavataRegistry = airavataRegistry; } - + @Subscribe - public void updateRegistry(TaskStatusChangeRequest taskStatus) { - TaskState state = taskStatus.getState(); - if (state != null) { - try { - String taskID = taskStatus.getIdentity().getTaskId(); - updateTaskStatus(taskID, state); - } catch (Exception e) { - logger.error("Error persisting data" + e.getLocalizedMessage(), e); - } - } + public void setupTaskStatus(TaskStatusChangeRequest taskStatus){ + try { + updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState()); + logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString()); + monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState())); + } catch (Exception e) { + logger.error("Error persisting data" + e.getLocalizedMessage(), e); + } } - + @Subscribe - public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){ - WorkflowNodeState state=WorkflowNodeState.UNKNOWN; - switch(taskStatus.getState()){ + public void setupTaskStatus(JobStatusChangedEvent jobStatus){ + TaskState state=TaskState.UNKNOWN; + switch(jobStatus.getState()){ + case ACTIVE: + state=TaskState.EXECUTING; break; case CANCELED: - state=WorkflowNodeState.CANCELED; break; - case COMPLETED: - state=WorkflowNodeState.COMPLETED; break; - case CONFIGURING_WORKSPACE: - state=WorkflowNodeState.INVOKED; break; + state=TaskState.CANCELED; break; + case COMPLETE: + state=TaskState.COMPLETED; break; case FAILED: - state=WorkflowNodeState.FAILED; break; - case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING: - state=WorkflowNodeState.EXECUTING; break; - case STARTED: - state=WorkflowNodeState.INVOKED; break; + state=TaskState.FAILED; break; + case HELD: case SUSPENDED: case QUEUED: + state=TaskState.WAITING; break; + case SETUP: + state=TaskState.PRE_PROCESSING; break; + case SUBMITTED: + state=TaskState.STARTED; break; + case UN_SUBMITTED: + state=TaskState.CANCELED; break; case CANCELING: - state=WorkflowNodeState.CANCELING; break; + state=TaskState.CANCELING; break; default: break; } - logger.debug("Publishing Experiment Status "+state.toString()); - monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state)); + try { + updateTaskStatus(jobStatus.getIdentity().getTaskId(), state); + logger.debug("Publishing task status for "+jobStatus.getIdentity().getTaskId()+":"+state.toString()); + monitorPublisher.publish(new TaskStatusChangedEvent(jobStatus.getIdentity(),state)); + } catch (Exception e) { + logger.error("Error persisting data" + e.getLocalizedMessage(), e); + } } public void updateTaskStatus(String taskId, TaskState state) throws Exception { + logger.debug("Updating task status for "+taskId+":"+state.toString()); TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId); if(details == null) { details = new TaskDetails(); http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java index 8e92e87..5f6629c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java @@ -20,20 +20,20 @@ */ package org.apache.airavata.gfac.core.monitor; -import com.google.common.eventbus.Subscribe; -import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangeRequest; +import java.util.Calendar; + +import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; +import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; -import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; -import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Calendar; +import com.google.common.eventbus.Subscribe; public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class); @@ -51,49 +51,37 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen } @Subscribe - public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) { - WorkflowNodeState state = workflowNodeStatus.getState(); - if (state != null) { - try { - String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID(); - updateWorkflowNodeStatus(workflowNodeID, state); - } catch (Exception e) { - logger.error("Error persisting data" + e.getLocalizedMessage(), e); - } - } - } - - @Subscribe - public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus) { - ExperimentState state = ExperimentState.UNKNOWN; - switch (nodeStatus.getState()) { - case CANCELED: - state = ExperimentState.CANCELED; - break; - case COMPLETED: - state = ExperimentState.COMPLETED; - break; - case INVOKED: - state = ExperimentState.LAUNCHED; - break; - case FAILED: - state = ExperimentState.FAILED; - break; - case EXECUTING: - state = ExperimentState.EXECUTING; - break; - case CANCELING: - state = ExperimentState.CANCELING; - break; - default: - break; - } - logger.debug("Publishing Experiment Status " + state.toString()); - monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(), state)); + public void setupWorkflowNodeStatus(TaskStatusChangedEvent taskStatus){ + WorkflowNodeState state=WorkflowNodeState.UNKNOWN; + switch(taskStatus.getState()){ + case CANCELED: + state=WorkflowNodeState.CANCELED; break; + case COMPLETED: + state=WorkflowNodeState.COMPLETED; break; + case CONFIGURING_WORKSPACE: + state=WorkflowNodeState.INVOKED; break; + case FAILED: + state=WorkflowNodeState.FAILED; break; + case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING: + state=WorkflowNodeState.EXECUTING; break; + case STARTED: + state=WorkflowNodeState.INVOKED; break; + case CANCELING: + state=WorkflowNodeState.CANCELING; break; + default: + break; + } + try { + updateWorkflowNodeStatus(taskStatus.getIdentity().getWorkflowNodeID(), state); + logger.debug("Publishing workflow node status for "+taskStatus.getIdentity().getWorkflowNodeID()+":"+state.toString()); + monitorPublisher.publish(new WorkflowNodeStatusChangedEvent(taskStatus.getIdentity(),state)); + } catch (Exception e) { + logger.error("Error persisting data" + e.getLocalizedMessage(), e); + } } - public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception { + logger.debug("Updating workflow node status for "+workflowNodeId+":"+state.toString()); WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); if(details == null) { details = new WorkflowNodeDetails(); http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java deleted file mode 100644 index a8bc6b4..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.core.monitor.state; - -import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; -import org.apache.airavata.model.workspace.experiment.ExperimentState; - -/** - * This is the primary job state object used in - * through out the monitor module. This use airavata-data-model JobState enum - * Ideally after processing each event or monitoring message from remote system - * Each monitoring implementation has to return this object with a state and - * the monitoring ID - */ -public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest { - private ExperimentState state; - private ExperimentIdentity identity; - - // this constructor can be used in Qstat monitor to handle errors - public ExperimentStatusChangeRequest() { - } - - public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) { - this.state = state; - setIdentity(experimentIdentity); - } - - public ExperimentState getState() { - return state; - } - - public void setState(ExperimentState state) { - this.state = state; - } - - public ExperimentIdentity getIdentity() { - return identity; - } - - public void setIdentity(ExperimentIdentity identity) { - this.identity = identity; - } - - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java new file mode 100644 index 0000000..a95d46c --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.monitor.state; + +import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; +import org.apache.airavata.model.workspace.experiment.ExperimentState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class ExperimentStatusChangedEvent extends AbstractStateChangeRequest { + private ExperimentState state; + private ExperimentIdentity identity; + + // this constructor can be used in Qstat monitor to handle errors + public ExperimentStatusChangedEvent() { + } + + public ExperimentStatusChangedEvent(ExperimentIdentity experimentIdentity, ExperimentState state) { + this.state = state; + setIdentity(experimentIdentity); + } + + public ExperimentState getState() { + return state; + } + + public void setState(ExperimentState state) { + this.state = state; + } + + public ExperimentIdentity getIdentity() { + return identity; + } + + public void setIdentity(ExperimentIdentity identity) { + this.identity = identity; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java new file mode 100644 index 0000000..d995f03 --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.monitor.state; + +import org.apache.airavata.gfac.core.monitor.JobIdentity; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.model.workspace.experiment.JobState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class JobStatusChangedEvent extends AbstractStateChangeRequest { + private JobState state; + private JobIdentity identity; + + private MonitorID monitorID; + + // this constructor can be used in Qstat monitor to handle errors + public JobStatusChangedEvent() { + } + + public JobStatusChangedEvent(MonitorID monitorID) { + setIdentity(new JobIdentity(monitorID.getExperimentID(),monitorID.getWorkflowNodeID(), + monitorID.getTaskID(),monitorID.getJobID())); + setMonitorID(monitorID); + this.state = monitorID.getStatus(); + } + public JobStatusChangedEvent(MonitorID monitorID, JobIdentity jobId, JobState state) { + setIdentity(jobId); + setMonitorID(monitorID); + this.state = state; + } + + public JobState getState() { + return state; + } + + public void setState(JobState state) { + this.state = state; + } + + public JobIdentity getIdentity() { + return identity; + } + + public void setIdentity(JobIdentity identity) { + this.identity = identity; + } + + public MonitorID getMonitorID() { + return monitorID; + } + + public void setMonitorID(MonitorID monitorID) { + this.monitorID = monitorID; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java new file mode 100644 index 0000000..ec217bc --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.monitor.state; + +import org.apache.airavata.gfac.core.monitor.TaskIdentity; +import org.apache.airavata.model.workspace.experiment.TaskState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class TaskStatusChangedEvent extends AbstractStateChangeRequest { + private TaskState state; + private TaskIdentity identity; + // this constructor can be used in Qstat monitor to handle errors + public TaskStatusChangedEvent() { + } + + public TaskStatusChangedEvent(TaskIdentity taskIdentity, TaskState state) { + this.state = state; + setIdentity(taskIdentity); + } + + public TaskState getState() { + return state; + } + + public void setState(TaskState state) { + this.state = state; + } + + public TaskIdentity getIdentity() { + return identity; + } + + public void setIdentity(TaskIdentity identity) { + this.identity = identity; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java deleted file mode 100644 index 9e52dd4..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.core.monitor.state; - -import org.apache.airavata.gfac.core.monitor.WorkflowNodeIdentity; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; - -/** - * This is the primary job state object used in - * through out the monitor module. This use airavata-data-model JobState enum - * Ideally after processing each event or monitoring message from remote system - * Each monitoring implementation has to return this object with a state and - * the monitoring ID - */ -public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest { - private WorkflowNodeState state; - private WorkflowNodeIdentity identity; - - // this constructor can be used in Qstat monitor to handle errors - public WorkflowNodeStatusChangeRequest() { - } - - public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) { - this.state = state; - setIdentity(identity); - } - - public WorkflowNodeState getState() { - return state; - } - - public void setState(WorkflowNodeState state) { - this.state = state; - } - - public WorkflowNodeIdentity getIdentity() { - return identity; - } - - public void setIdentity(WorkflowNodeIdentity identity) { - this.identity = identity; - } - - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java new file mode 100644 index 0000000..6671add --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.monitor.state; + +import org.apache.airavata.gfac.core.monitor.WorkflowNodeIdentity; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class WorkflowNodeStatusChangedEvent extends AbstractStateChangeRequest { + private WorkflowNodeState state; + private WorkflowNodeIdentity identity; + + // this constructor can be used in Qstat monitor to handle errors + public WorkflowNodeStatusChangedEvent() { + } + + public WorkflowNodeStatusChangedEvent(WorkflowNodeIdentity identity, WorkflowNodeState state) { + this.state = state; + setIdentity(identity); + } + + public WorkflowNodeState getState() { + return state; + } + + public void setState(WorkflowNodeState state) { + this.state = state; + } + + public WorkflowNodeIdentity getIdentity() { + return identity; + } + + public void setIdentity(WorkflowNodeIdentity identity) { + this.identity = identity; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index a1f38fc..fa6714d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -29,9 +29,10 @@ import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.monitor.TaskIdentity; -import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent; import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; +import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; @@ -178,8 +179,10 @@ public class HPCPullMonitor extends PullMonitor { } catch (GFacException e) { publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getTaskID()), TaskState.FAILED)); - publisher.publish(new ExperimentStatusChangeRequest(new ExperimentIdentity(iMonitorID.getExperimentID()), - ExperimentState.FAILED)); + //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status + //should be done understanding whole workflow of job submission and data transfer +// publisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(iMonitorID.getExperimentID()), +// ExperimentState.FAILED)); logger.info(e.getLocalizedMessage(), e); } } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {