Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 163D8100B6 for ; Wed, 23 Apr 2014 02:08:28 +0000 (UTC) Received: (qmail 68629 invoked by uid 500); 23 Apr 2014 02:08:25 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 68550 invoked by uid 500); 23 Apr 2014 02:08:25 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 68535 invoked by uid 99); 23 Apr 2014 02:08:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Apr 2014 02:08:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 569029446BF; Wed, 23 Apr 2014 02:08:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samindaw@apache.org To: commits@airavata.apache.org Date: Wed, 23 Apr 2014 02:08:24 -0000 Message-Id: <23266d9783564433a311ad0606875b94@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: introducing workflow node status updater + refactor Repository: airavata Updated Branches: refs/heads/master f2b5df444 -> 49eea6f15 introducing workflow node status updater + refactor Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/67b44a15 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/67b44a15 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/67b44a15 Branch: refs/heads/master Commit: 67b44a15c5957baa18ad3cb883a9e31f3fc0b146 Parents: aed31d3 Author: Saminda Wijeratne Authored: Tue Apr 22 19:07:28 2014 -0700 Committer: Saminda Wijeratne Committed: Tue Apr 22 19:07:28 2014 -0700 ---------------------------------------------------------------------- .../main/resources/airavata-server.properties | 2 +- .../gfac/provider/impl/AbstractProvider.java | 3 +- .../AiravataExperimentStatusUpdator.java | 2 +- .../job/monitor/AiravataJobStatusUpdator.java | 41 +------ .../job/monitor/AiravataTaskStatusUpdator.java | 26 ++-- .../AiravataWorkflowNodeStatusUpdator.java | 71 +++++++---- .../airavata/job/monitor/AMQPMonitorTest.java | 2 +- .../QstatMonitorTestWithMyProxyAuth.java | 2 +- .../server/OrchestratorServerHandler.java | 123 ++++++++++--------- .../jpa/utils/ThriftDataModelConversion.java | 6 +- .../org/apache/airavata/server/ServerMain.java | 14 +++ .../job/monitor/ExperimentIdentity.java | 36 ++++++ .../airavata/job/monitor/JobIdentity.java | 39 ++++++ .../apache/airavata/job/monitor/MonitorID.java | 14 ++- .../airavata/job/monitor/TaskIdentity.java | 38 ++++++ .../job/monitor/WorkflowNodeIdentity.java | 37 ++++++ .../job/monitor/impl/LocalJobMonitor.java | 3 +- .../job/monitor/impl/push/amqp/AMQPMonitor.java | 3 +- .../monitor/impl/push/amqp/BasicConsumer.java | 2 +- .../state/AbstractStateChangeRequest.java | 10 -- .../state/ExperimentStatusChangeRequest.java | 16 ++- .../monitor/state/JobStatusChangeRequest.java | 30 ++++- .../monitor/state/TaskStatusChangeRequest.java | 16 ++- .../state/WorkflowNodeStatusChangeRequest.java | 63 ++++++++++ .../apache/airavata/job/AMQPMonitorTest.java | 2 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 2 +- 26 files changed, 433 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 13f78d5..4a65a71 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede -activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator +activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator ###---------------------------Orchestrator module Configurations---------------------------### job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java index 3ba02b9..5966233 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java @@ -27,6 +27,7 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gfac.provider.GFacProvider; import org.apache.airavata.gfac.provider.GFacProviderException; +import org.apache.airavata.job.monitor.JobIdentity; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.command.TaskCancelRequest; import org.apache.airavata.job.monitor.event.MonitorPublisher; @@ -68,7 +69,7 @@ public abstract class AbstractProvider implements GFacProvider{ JobState jobState = jd.getJobStatus().getJobState(); if (jobState!=JobState.CANCELED || jobState!=JobState.CANCELING || jobState!=JobState.COMPLETE || jobState!=JobState.FAILED){ MonitorID monitorId = new MonitorID(null, jd.getJobID(), request.getTaskId(), request.getExperimentId(), null, null); - monitorPublisher.publish(new JobStatusChangeRequest(monitorId, JobState.CANCELING)); + monitorPublisher.publish(new JobStatusChangeRequest(monitorId, new JobIdentity(monitorId.getExperimentID(), monitorId.getWorkflowNodeID(), monitorId.getTaskID(), monitorId.getJobID()), JobState.CANCELING)); log.debug("Canceling job "+jd.getJobID()); cancelJob(jd.getJobID(), jobExecutionContext); } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java index 5455f1b..f172e6c 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java @@ -50,7 +50,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener ExperimentState state = experimentStatus.getState(); if (state != null) { try { - String experimentID = experimentStatus.getMonitorID().getExperimentID(); + String experimentID = experimentStatus.getIdentity().getExperimentID(); updateExperimentStatus(experimentID, state); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java index 74fdf43..0a56543 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java @@ -70,48 +70,17 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{ JobState state = jobStatus.getState(); if (state != null) { try { - String taskID = jobStatus.getMonitorID().getTaskID(); - String jobID = jobStatus.getMonitorID().getJobID(); + String taskID = jobStatus.getIdentity().getTaskId(); + String jobID = jobStatus.getIdentity().getJobId(); updateJobStatus(taskID, jobID, state); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } + logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString()); switch (state) { - case COMPLETE: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE"); + case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED: jobsToMonitor.remove(jobStatus.getMonitorID()); break; - case UNKNOWN: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN"); - jobsToMonitor.remove(jobStatus.getMonitorID()); - //todo implement this logic - break; - case QUEUED: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED"); - break; - case SUBMITTED: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED"); - break; - case ACTIVE: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE"); - break; - case CANCELED: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED"); - jobsToMonitor.remove(jobStatus.getMonitorID()); - break; - case FAILED: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED"); - jobsToMonitor.remove(jobStatus.getMonitorID()); - break; - case HELD: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD"); - break; - case SUSPENDED: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED"); - jobsToMonitor.remove(jobStatus.getMonitorID()); - break; - case CANCELING: - logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CENCELING"); default: break; } @@ -144,7 +113,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{ break; } logger.debug("Publishing Task Status "+state.toString()); - monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getMonitorID(),state)); + monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state)); } public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java index 40b095a..86ae26a 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java @@ -23,11 +23,11 @@ package org.apache.airavata.job.monitor; import java.util.Calendar; import org.apache.airavata.job.monitor.event.MonitorPublisher; -import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest; import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest; -import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; import org.slf4j.Logger; @@ -55,7 +55,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{ TaskState state = taskStatus.getState(); if (state != null) { try { - String taskID = taskStatus.getMonitorID().getTaskID(); + String taskID = taskStatus.getIdentity().getTaskId(); updateTaskStatus(taskID, state); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); @@ -64,28 +64,28 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{ } @Subscribe - public void setupExperimentStatus(TaskStatusChangeRequest taskStatus){ - ExperimentState state=ExperimentState.UNKNOWN; + public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){ + WorkflowNodeState state=WorkflowNodeState.UNKNOWN; switch(taskStatus.getState()){ case CANCELED: - state=ExperimentState.CANCELED; break; + state=WorkflowNodeState.CANCELED; break; case COMPLETED: - state=ExperimentState.COMPLETED; break; + state=WorkflowNodeState.COMPLETED; break; case CONFIGURING_WORKSPACE: - state=ExperimentState.LAUNCHED; break; + state=WorkflowNodeState.INVOKED; break; case FAILED: - state=ExperimentState.FAILED; break; + state=WorkflowNodeState.FAILED; break; case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING: - state=ExperimentState.EXECUTING; break; + state=WorkflowNodeState.EXECUTING; break; case STARTED: - state=ExperimentState.LAUNCHED; break; + state=WorkflowNodeState.INVOKED; break; case CANCELING: - state=ExperimentState.CANCELING; break; + state=WorkflowNodeState.CANCELING; break; default: break; } logger.debug("Publishing Experiment Status "+state.toString()); - monitorPublisher.publish(new ExperimentStatusChangeRequest(taskStatus.getMonitorID(),state)); + monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state)); } public void updateTaskStatus(String taskId, TaskState state) throws Exception { http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java index f6dc360..cd07a1d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java @@ -22,9 +22,13 @@ package org.apache.airavata.job.monitor; import java.util.Calendar; +import org.apache.airavata.job.monitor.event.MonitorPublisher; import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest; -import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest; import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; import org.slf4j.Logger; @@ -37,7 +41,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen private Registry airavataRegistry; -// private MonitorPublisher monitorPublisher; + private MonitorPublisher monitorPublisher; public Registry getAiravataRegistry() { return airavataRegistry; @@ -48,29 +52,52 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen } @Subscribe - public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) { -// ExperimentState state = experimentStatus.getState(); -// if (state != null) { -// try { -// String experimentID = experimentStatus.getMonitorID().getExperimentID(); -// updateWorkflowNodeStatus(experimentID, state); -// } catch (Exception e) { -// logger.error("Error persisting data" + e.getLocalizedMessage(), e); -// } -// } + public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) { + WorkflowNodeState state = workflowNodeStatus.getState(); + if (state != null) { + try { + String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID(); + updateWorkflowNodeStatus(workflowNodeID, state); + } catch (Exception e) { + logger.error("Error persisting data" + e.getLocalizedMessage(), e); + } + } + } + + @Subscribe + public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){ + ExperimentState state=ExperimentState.UNKNOWN; + switch(nodeStatus.getState()){ + case CANCELED: + state=ExperimentState.CANCELED; break; + case COMPLETED: + state=ExperimentState.COMPLETED; break; + case INVOKED: + state=ExperimentState.LAUNCHED; break; + case FAILED: + state=ExperimentState.FAILED; break; + case EXECUTING: + state=ExperimentState.EXECUTING; break; + case CANCELING: + state=ExperimentState.CANCELING; break; + default: + break; + } + logger.debug("Publishing Experiment Status "+state.toString()); + monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state)); } - public void updateWorkflowNodeStatus(String experimentId, ExperimentState state) throws Exception { - Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId); + public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception { + WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId); if(details == null) { - details = new Experiment(); - details.setExperimentID(experimentId); + details = new WorkflowNodeDetails(); + details.setNodeInstanceId(workflowNodeId); } - org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus(); - status.setExperimentState(state); + WorkflowNodeStatus status = new WorkflowNodeStatus(); + status.setWorkflowNodeState(state); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - details.setExperimentStatus(status); - airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, details, experimentId); + details.setWorkflowNodeStatus(status); + airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId); } @Override @@ -78,8 +105,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen for (Object configuration : configurations) { if (configuration instanceof Registry){ this.airavataRegistry=(Registry)configuration; -// } else if (configuration instanceof MonitorPublisher){ -// this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java index 65ab8d0..cb16540 100644 --- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java +++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java @@ -134,7 +134,7 @@ public class AMQPMonitorTest { String jobID = pbsCluster.submitBatchJob(jobDescriptor); System.out.println(jobID); try { - monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, "ogce")); + monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, null, "ogce")); } catch (AiravataMonitorException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java index 33ffa09..5d7314a 100644 --- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java @@ -131,7 +131,7 @@ public class QstatMonitorTestWithMyProxyAuth { for (int i = 0; i < 1; i++) { String jobID = pbsCluster.submitBatchJob(jobDescriptor); System.out.println("Job submitted successfully, Job ID: " + jobID); - MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce"); + MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce"); monitorID.setAuthenticationInfo(authenticationInfo); try { monitorManager.addAJobToMonitor(monitorID); http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index bb9865d..26f447e 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -49,6 +49,8 @@ import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants; +import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -101,29 +103,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { String proxyPath = ServerSettings.getSetting("proxy.file.path"); String connectionName = ServerSettings.getSetting("connection.name"); - if (monitors == null) { - log.error("Error loading primaryMonitor and there has to be a primary monitor"); - } else { - for (String monitorClass : monitorList) { - Class aClass = Class.forName(monitorClass).asSubclass(Monitor.class); - Monitor monitor = aClass.newInstance(); - if (monitor instanceof PullMonitor) { - if (monitor instanceof QstatMonitor) { - monitorManager.addQstatMonitor((QstatMonitor) monitor); - } - } else if (monitor instanceof PushMonitor) { - if (monitor instanceof AMQPMonitor) { - ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list); - monitorManager.addAMQPMonitor((AMQPMonitor) monitor); - } - } else if(monitor instanceof LocalJobMonitor){ - monitorManager.addLocalMonitor((LocalJobMonitor)monitor); - } else { - log.error("Wrong class is given to primary Monitor"); + for (String monitorClass : monitorList) { + Class aClass = Class.forName(monitorClass).asSubclass(Monitor.class); + Monitor monitor = aClass.newInstance(); + if (monitor instanceof PullMonitor) { + if (monitor instanceof QstatMonitor) { + monitorManager.addQstatMonitor((QstatMonitor) monitor); } + } else if (monitor instanceof PushMonitor) { + if (monitor instanceof AMQPMonitor) { + ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list); + monitorManager.addAMQPMonitor((AMQPMonitor) monitor); + } + } else if(monitor instanceof LocalJobMonitor){ + monitorManager.addLocalMonitor((LocalJobMonitor)monitor); + } else { + log.error("Wrong class is given to primary Monitor"); } - } + monitorManager.registerListener(orchestrator); // Now Monitor Manager is properly configured, now we have to start the monitoring system. // This will initialize all the required threads and required queues @@ -163,46 +161,51 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { if (tasks.size() > 1) { log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs"); } - for (TaskDetails taskID : tasks) { - //iterate through all the generated tasks and performs the job submisssion+monitoring - String jobID = null; - Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId); - if (experiment == null) { - log.error("Error retrieving the Experiment by the given experimentID: " + experimentId); - return false; - } - String userName = experiment.getUserName(); - - HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID); - - // creating monitorID to register with monitoring queue - // this is a special case because amqp has to be in place before submitting the job - if ((hostDescription instanceof GsisshHostType) && - Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) { - monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName); - monitorManager.addAJobToMonitor(monitorID); - jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); - if("none".equals(jobID)) { - log.error("Job submission Failed, so we remove the job from monitoring"); - - }else{ - log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); - } - } else { - // Launching job for each task - // if the monitoring is pull mode then we add the monitorID for each task after submitting - // the job with the jobID, otherwise we don't need the jobID - jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); - log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); - monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo); - if("none".equals(jobID)) { - log.error("Job submission Failed, so we remove the job from monitoring"); - - }else{ - monitorManager.addAJobToMonitor(monitorID); - } - } - } + List ids = registry.getIds(DataType.WORKFLOW_NODE_DETAIL,WorkflowNodeConstants.EXPERIMENT_ID,experimentId); + for (String workflowNodeId : ids) { + List taskDetailList = registry.get(DataType.TASK_DETAIL,TaskDetailConstants.NODE_ID,workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskID=(TaskDetails)o; + //iterate through all the generated tasks and performs the job submisssion+monitoring + String jobID = null; + Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId); + if (experiment == null) { + log.error("Error retrieving the Experiment by the given experimentID: " + experimentId); + return false; + } + String userName = experiment.getUserName(); + + HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID); + + // creating monitorID to register with monitoring queue + // this is a special case because amqp has to be in place before submitting the job + if ((hostDescription instanceof GsisshHostType) && + Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) { + monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), workflowNodeId, experimentId, userName); + monitorManager.addAJobToMonitor(monitorID); + jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); + if("none".equals(jobID)) { + log.error("Job submission Failed, so we remove the job from monitoring"); + + }else{ + log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); + } + } else { + // Launching job for each task + // if the monitoring is pull mode then we add the monitorID for each task after submitting + // the job with the jobID, otherwise we don't need the jobID + jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID()); + log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); + monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), workflowNodeId, experimentId, userName, authenticationInfo); + if("none".equals(jobID)) { + log.error("Job submission Failed, so we remove the job from monitoring"); + + }else{ + monitorManager.addAJobToMonitor(monitorID); + } + } + } + } } catch (Exception e) { throw new TException(e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java ---------------------------------------------------------------------- diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java index 5ed0bce..409f8a6 100644 --- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java +++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java @@ -39,8 +39,10 @@ public class ThriftDataModelConversion { if (pr != null) { project.setProjectID(pr.getName()); project.setName(pr.getName()); - project.setCreationTime(pr.getCreationTime().getTime()); - project.setDescription(pr.getDescription()); + if (pr.getCreationTime()!=null) { + project.setCreationTime(pr.getCreationTime().getTime()); + } + project.setDescription(pr.getDescription()); project.setOwner(pr.getWorker().getUser()); List projectUserList = pr.getProjectUserList(); List sharedUsers = new ArrayList(); http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 69964ef..80e0c99 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -49,6 +49,7 @@ public class ServerMain { private static int serverPID=-1; private static final String serverStartedFileNamePrefix = "server-start"; private static boolean systemShutDown=false; + private static boolean shutdownHookCalledBefore=false; static{ servers = new ArrayList(); } @@ -89,6 +90,19 @@ public class ServerMain { }); } +// private static void addSecondaryShutdownHook(){ +// Runtime.getRuntime().addShutdownHook(new Thread(){ +// @Override +// public void run() { +// System.out.print("Graceful shutdown attempt is still active. Do you want to exit instead? (y/n)"); +// String command=System.console().readLine().trim().toLowerCase(); +// if (command.equals("yes") || command.equals("y")){ +// System.exit(1); +// } +// } +// }); +// } + public static void main(String args[]) throws ParseException, IOException { AiravataUtils.setExecutionAsServer(); CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args); http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java new file mode 100644 index 0000000..652ad1d --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.job.monitor; + +public class ExperimentIdentity { + private String experimentID; + public ExperimentIdentity(String experimentId) { + setExperimentID(experimentId); + } + public String getExperimentID() { + return experimentID; + } + + public void setExperimentID(String experimentID) { + this.experimentID = experimentID; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java new file mode 100644 index 0000000..5753d9d --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.job.monitor; + +public class JobIdentity extends TaskIdentity { + private String jobId; + + public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) { + super(experimentId,workflowNodeId,taskId); + setJobId(jobId); + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java index bd6bfcb..241e3b0 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java @@ -52,7 +52,7 @@ public class MonitorID { private String experimentID; -// private String workflowNodeID; + private String workflowNodeID; private String taskID; @@ -62,7 +62,7 @@ public class MonitorID { private JobState state; - public MonitorID(HostDescription host, String jobID,String taskID, String experimentID, String userName) { + public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) { this.host = host; this.jobStartedTime = new Timestamp((new Date()).getTime()); this.userName = userName; @@ -71,7 +71,7 @@ public class MonitorID { this.experimentID = experimentID; } - public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName,AuthenticationInfo authenticationInfo) { + public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) { this.host = host; this.jobStartedTime = new Timestamp((new Date()).getTime()); this.authenticationInfo = authenticationInfo; @@ -207,6 +207,14 @@ public class MonitorID { } } + public String getWorkflowNodeID() { + return workflowNodeID; + } + + public void setWorkflowNodeID(String workflowNodeID) { + this.workflowNodeID = workflowNodeID; + } + // public String getWorkflowNodeID() { // return workflowNodeID; // } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java new file mode 100644 index 0000000..f7bc785 --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.job.monitor; + +public class TaskIdentity extends WorkflowNodeIdentity { + private String taskId; + + public TaskIdentity(String experimentId, String workflowNodeId, String taskId) { + super(experimentId,workflowNodeId); + setTaskId(taskId); + } + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java new file mode 100644 index 0000000..a8fe09f --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.job.monitor; + +public class WorkflowNodeIdentity extends ExperimentIdentity { + private String workflowNodeID; + public WorkflowNodeIdentity(String experimentId, String workflowNodeId) { + super(experimentId); + setWorkflowNodeID(workflowNodeId); + } + public String getWorkflowNodeID() { + return workflowNodeID; + } + + public void setWorkflowNodeID(String workflowNodeID) { + this.workflowNodeID = workflowNodeID; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java index de7cf90..ec4cb40 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java @@ -21,6 +21,7 @@ package org.apache.airavata.job.monitor.impl; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.job.monitor.JobIdentity; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor; import org.apache.airavata.job.monitor.state.JobStatusChangeRequest; @@ -41,7 +42,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor { do { try { MonitorID take = jobQueue.take(); - getPublisher().publish(new JobStatusChangeRequest(take, JobState.COMPLETE)); + getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE)); } catch (Exception e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java index 88a5198..dc6d193 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java @@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.job.monitor.JobIdentity; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.event.MonitorPublisher; @@ -201,7 +202,7 @@ public class AMQPMonitor extends PushMonitor { } } next.setStatus(monitorID.getStatus()); - publisher.publish(new JobStatusChangeRequest(next,next.getStatus())); + publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus())); return true; } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java index 53bcc8b..5a2d40d 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java @@ -76,7 +76,7 @@ public class BasicConsumer implements Consumer { logger.debug("************************************************************"); try { String jobID = envelope.getRoutingKey().split("\\.")[0]; - MonitorID monitorID = new MonitorID(null, jobID, null, null, null); + MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null); monitorID.setStatus(parser.parseMessage(message)); publisher.publish(monitorID); } catch (AiravataMonitorException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java index 909f10e..bacd8df 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java @@ -21,17 +21,7 @@ package org.apache.airavata.job.monitor.state; -import org.apache.airavata.job.monitor.MonitorID; public abstract class AbstractStateChangeRequest implements PublisherMessage{ - private MonitorID monitorID; - - public MonitorID getMonitorID() { - return monitorID; - } - - public void setMonitorID(MonitorID monitorID) { - this.monitorID = monitorID; - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java index d664161..9bee5ca 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.job.monitor.state; -import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.ExperimentIdentity; import org.apache.airavata.model.workspace.experiment.ExperimentState; /** @@ -32,15 +32,15 @@ import org.apache.airavata.model.workspace.experiment.ExperimentState; */ public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{ private ExperimentState state; - + private ExperimentIdentity identity; // this constructor can be used in Qstat monitor to handle errors public ExperimentStatusChangeRequest() { } - public ExperimentStatusChangeRequest(MonitorID monitorID, ExperimentState state) { - setMonitorID(monitorID); + public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) { this.state = state; + setIdentity(experimentIdentity); } public ExperimentState getState() { @@ -51,5 +51,13 @@ public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{ this.state = state; } + public ExperimentIdentity getIdentity() { + return identity; + } + + public void setIdentity(ExperimentIdentity identity) { + this.identity = identity; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java index 9669b75..0db9da6 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java @@ -20,11 +20,10 @@ */ package org.apache.airavata.job.monitor.state; +import org.apache.airavata.job.monitor.JobIdentity; import org.apache.airavata.job.monitor.MonitorID; import org.apache.airavata.model.workspace.experiment.JobState; -import java.util.Properties; - /** * This is the primary job state object used in * through out the monitor module. This use airavata-data-model JobState enum @@ -34,15 +33,18 @@ import java.util.Properties; */ public class JobStatusChangeRequest extends AbstractStateChangeRequest{ private JobState state; + private JobIdentity identity; - + private MonitorID monitorID; + // this constructor can be used in Qstat monitor to handle errors public JobStatusChangeRequest() { } - public JobStatusChangeRequest(MonitorID monitorID, JobState state) { - setMonitorID(monitorID); - this.state = state; + public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) { + setIdentity(jobId); + setMonitorID(monitorID); + this.state = state; } public JobState getState() { @@ -53,4 +55,20 @@ public class JobStatusChangeRequest extends AbstractStateChangeRequest{ this.state = state; } + public JobIdentity getIdentity() { + return identity; + } + + public void setIdentity(JobIdentity identity) { + this.identity = identity; + } + + public MonitorID getMonitorID() { + return monitorID; + } + + public void setMonitorID(MonitorID monitorID) { + this.monitorID = monitorID; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java index f35310b..e8e58db 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.job.monitor.state; -import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.TaskIdentity; import org.apache.airavata.model.workspace.experiment.TaskState; /** @@ -32,14 +32,14 @@ import org.apache.airavata.model.workspace.experiment.TaskState; */ public class TaskStatusChangeRequest extends AbstractStateChangeRequest{ private TaskState state; - + private TaskIdentity identity; // this constructor can be used in Qstat monitor to handle errors public TaskStatusChangeRequest() { } - public TaskStatusChangeRequest(MonitorID monitorID, TaskState state) { - setMonitorID(monitorID); + public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) { this.state = state; + setIdentity(taskIdentity); } public TaskState getState() { @@ -50,4 +50,12 @@ public class TaskStatusChangeRequest extends AbstractStateChangeRequest{ this.state = state; } + public TaskIdentity getIdentity() { + return identity; + } + + public void setIdentity(TaskIdentity identity) { + this.identity = identity; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java new file mode 100644 index 0000000..7e58e35 --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.state; + +import org.apache.airavata.job.monitor.WorkflowNodeIdentity; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest{ + private WorkflowNodeState state; + private WorkflowNodeIdentity identity; + + // this constructor can be used in Qstat monitor to handle errors + public WorkflowNodeStatusChangeRequest() { + } + + public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) { + this.state = state; + setIdentity(identity); + } + + public WorkflowNodeState getState() { + return state; + } + + public void setState(WorkflowNodeState state) { + this.state = state; + } + + public WorkflowNodeIdentity getIdentity() { + return identity; + } + + public void setIdentity(WorkflowNodeIdentity identity) { + this.identity = identity; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java index 980c2fa..c0e579e 100644 --- a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java +++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java @@ -150,7 +150,7 @@ public class AMQPMonitorTest { String jobID = pbsCluster.submitBatchJob(jobDescriptor); System.out.println(jobID); try { - pushQueue.add(new MonitorID(hostDescription, jobID,null,null, "ogce")); + pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce")); } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java index 735d1d2..d85f465 100644 --- a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java +++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java @@ -144,7 +144,7 @@ public class QstatMonitorTestWithMyProxyAuth { for (int i = 0; i < 1; i++) { String jobID = pbsCluster.submitBatchJob(jobDescriptor); System.out.println("Job submitted successfully, Job ID: " + jobID); - MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce"); + MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce"); monitorID.setAuthenticationInfo(authenticationInfo); try { org.apache.airavata.job.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID);