airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samin...@apache.org
Subject [1/2] git commit: introducing workflow node status updater + refactor
Date Wed, 23 Apr 2014 02:08:24 GMT
Repository: airavata
Updated Branches:
  refs/heads/master f2b5df444 -> 49eea6f15


introducing workflow node status updater + refactor


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/67b44a15
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/67b44a15
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/67b44a15

Branch: refs/heads/master
Commit: 67b44a15c5957baa18ad3cb883a9e31f3fc0b146
Parents: aed31d3
Author: Saminda Wijeratne <samindaw@gmail.com>
Authored: Tue Apr 22 19:07:28 2014 -0700
Committer: Saminda Wijeratne <samindaw@gmail.com>
Committed: Tue Apr 22 19:07:28 2014 -0700

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |   2 +-
 .../gfac/provider/impl/AbstractProvider.java    |   3 +-
 .../AiravataExperimentStatusUpdator.java        |   2 +-
 .../job/monitor/AiravataJobStatusUpdator.java   |  41 +------
 .../job/monitor/AiravataTaskStatusUpdator.java  |  26 ++--
 .../AiravataWorkflowNodeStatusUpdator.java      |  71 +++++++----
 .../airavata/job/monitor/AMQPMonitorTest.java   |   2 +-
 .../QstatMonitorTestWithMyProxyAuth.java        |   2 +-
 .../server/OrchestratorServerHandler.java       | 123 ++++++++++---------
 .../jpa/utils/ThriftDataModelConversion.java    |   6 +-
 .../org/apache/airavata/server/ServerMain.java  |  14 +++
 .../job/monitor/ExperimentIdentity.java         |  36 ++++++
 .../airavata/job/monitor/JobIdentity.java       |  39 ++++++
 .../apache/airavata/job/monitor/MonitorID.java  |  14 ++-
 .../airavata/job/monitor/TaskIdentity.java      |  38 ++++++
 .../job/monitor/WorkflowNodeIdentity.java       |  37 ++++++
 .../job/monitor/impl/LocalJobMonitor.java       |   3 +-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |   3 +-
 .../monitor/impl/push/amqp/BasicConsumer.java   |   2 +-
 .../state/AbstractStateChangeRequest.java       |  10 --
 .../state/ExperimentStatusChangeRequest.java    |  16 ++-
 .../monitor/state/JobStatusChangeRequest.java   |  30 ++++-
 .../monitor/state/TaskStatusChangeRequest.java  |  16 ++-
 .../state/WorkflowNodeStatusChangeRequest.java  |  63 ++++++++++
 .../apache/airavata/job/AMQPMonitorTest.java    |   2 +-
 .../job/QstatMonitorTestWithMyProxyAuth.java    |   2 +-
 26 files changed, 433 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 13f78d5..4a65a71 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index 3ba02b9..5966233 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -27,6 +27,7 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.job.monitor.JobIdentity;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.command.TaskCancelRequest;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -68,7 +69,7 @@ public abstract class AbstractProvider implements GFacProvider{
 				JobState jobState = jd.getJobStatus().getJobState();
 				if (jobState!=JobState.CANCELED || jobState!=JobState.CANCELING || jobState!=JobState.COMPLETE || jobState!=JobState.FAILED){
 					MonitorID monitorId = new MonitorID(null, jd.getJobID(), request.getTaskId(), request.getExperimentId(), null, null);
-					monitorPublisher.publish(new JobStatusChangeRequest(monitorId, JobState.CANCELING));
+					monitorPublisher.publish(new JobStatusChangeRequest(monitorId, new JobIdentity(monitorId.getExperimentID(), monitorId.getWorkflowNodeID(), monitorId.getTaskID(), monitorId.getJobID()), JobState.CANCELING));
 					log.debug("Canceling job "+jd.getJobID());
 					cancelJob(jd.getJobID(), jobExecutionContext);
 				}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
index 5455f1b..f172e6c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
@@ -50,7 +50,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
         ExperimentState state = experimentStatus.getState();
         if (state != null) {
             try {
-                String experimentID = experimentStatus.getMonitorID().getExperimentID();
+                String experimentID = experimentStatus.getIdentity().getExperimentID();
                 updateExperimentStatus(experimentID, state);
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 74fdf43..0a56543 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -70,48 +70,17 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
         JobState state = jobStatus.getState();
         if (state != null) {
             try {
-                String taskID = jobStatus.getMonitorID().getTaskID();
-                String jobID = jobStatus.getMonitorID().getJobID();
+                String taskID = jobStatus.getIdentity().getTaskId();
+                String jobID = jobStatus.getIdentity().getJobId();
                 updateJobStatus(taskID, jobID, state);
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             }
+            logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
             switch (state) {
-                case COMPLETE:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
+                case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
                     jobsToMonitor.remove(jobStatus.getMonitorID());
                     break;
-                case UNKNOWN:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    //todo implement this logic
-                    break;
-                case QUEUED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED");
-                    break;
-                case SUBMITTED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED");
-                    break;
-                case ACTIVE:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE");
-                    break;
-                case CANCELED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case FAILED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case HELD:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD");
-                    break;
-                case SUSPENDED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case CANCELING:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CENCELING");
 			default:
 				break;
             }
@@ -144,7 +113,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
 			break;
     	}
     	logger.debug("Publishing Task Status "+state.toString());
-    	monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getMonitorID(),state));
+    	monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
     }
     
     public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
index 40b095a..86ae26a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
@@ -23,11 +23,11 @@ package org.apache.airavata.job.monitor;
 import java.util.Calendar;
 
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
 import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
         TaskState state = taskStatus.getState();
         if (state != null) {
             try {
-                String taskID = taskStatus.getMonitorID().getTaskID();
+                String taskID = taskStatus.getIdentity().getTaskId();
                 updateTaskStatus(taskID, state);
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
@@ -64,28 +64,28 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
     }
     
     @Subscribe
-    public void setupExperimentStatus(TaskStatusChangeRequest taskStatus){
-    	ExperimentState state=ExperimentState.UNKNOWN;
+    public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
+    	WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
     	switch(taskStatus.getState()){
     	case CANCELED:
-    		state=ExperimentState.CANCELED; break;
+    		state=WorkflowNodeState.CANCELED; break;
     	case COMPLETED:
-    		state=ExperimentState.COMPLETED; break;
+    		state=WorkflowNodeState.COMPLETED; break;
     	case CONFIGURING_WORKSPACE:
-    		state=ExperimentState.LAUNCHED; break;
+    		state=WorkflowNodeState.INVOKED; break;
     	case FAILED:
-    		state=ExperimentState.FAILED; break;
+    		state=WorkflowNodeState.FAILED; break;
     	case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
-    		state=ExperimentState.EXECUTING; break;
+    		state=WorkflowNodeState.EXECUTING; break;
     	case STARTED:
-    		state=ExperimentState.LAUNCHED; break;
+    		state=WorkflowNodeState.INVOKED; break;
     	case CANCELING:
-    		state=ExperimentState.CANCELING; break;
+    		state=WorkflowNodeState.CANCELING; break;
 		default:
 			break;
     	}
     	logger.debug("Publishing Experiment Status "+state.toString());
-    	monitorPublisher.publish(new ExperimentStatusChangeRequest(taskStatus.getMonitorID(),state));
+    	monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
     }
     
     public  void updateTaskStatus(String taskId, TaskState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
index f6dc360..cd07a1d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -22,9 +22,13 @@ package org.apache.airavata.job.monitor;
 
 import java.util.Calendar;
 
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.slf4j.Logger;
@@ -37,7 +41,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
 
     private Registry airavataRegistry;
     
-//    private MonitorPublisher monitorPublisher;
+    private MonitorPublisher monitorPublisher;
 
     public Registry getAiravataRegistry() {
         return airavataRegistry;
@@ -48,29 +52,52 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
     }
 
     @Subscribe
-    public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
-//        ExperimentState state = experimentStatus.getState();
-//        if (state != null) {
-//            try {
-//                String experimentID = experimentStatus.getMonitorID().getExperimentID();
-//                updateWorkflowNodeStatus(experimentID, state);
-//            } catch (Exception e) {
-//                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
-//            }
-//        }
+    public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
+        WorkflowNodeState state = workflowNodeStatus.getState();
+        if (state != null) {
+            try {
+                String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
+                updateWorkflowNodeStatus(workflowNodeID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+    
+    @Subscribe
+    public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
+    	ExperimentState state=ExperimentState.UNKNOWN;
+    	switch(nodeStatus.getState()){
+    	case CANCELED:
+    		state=ExperimentState.CANCELED; break;
+    	case COMPLETED:
+    		state=ExperimentState.COMPLETED; break;
+    	case INVOKED:
+    		state=ExperimentState.LAUNCHED; break;
+    	case FAILED:
+    		state=ExperimentState.FAILED; break;
+    	case EXECUTING:
+    		state=ExperimentState.EXECUTING; break;
+    	case CANCELING:
+    		state=ExperimentState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Experiment Status "+state.toString());
+    	monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
     }
     
-    public  void updateWorkflowNodeStatus(String experimentId, ExperimentState state) throws Exception {
-    	Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+    public  void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+    	WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
         if(details == null) {
-            details = new Experiment();
-            details.setExperimentID(experimentId);
+            details = new WorkflowNodeDetails();
+            details.setNodeInstanceId(workflowNodeId);
         }
-        org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
-        status.setExperimentState(state);
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
         status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-        details.setExperimentStatus(status);
-        airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, details, experimentId);
+        details.setWorkflowNodeStatus(status);
+        airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
     }
 
 	@Override
@@ -78,8 +105,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
 		for (Object configuration : configurations) {
 			if (configuration instanceof Registry){
 				this.airavataRegistry=(Registry)configuration;
-//			} else if (configuration instanceof MonitorPublisher){
-//				this.monitorPublisher=(MonitorPublisher) configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
 			} 
 		}
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index 65ab8d0..cb16540 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -134,7 +134,7 @@ public class AMQPMonitorTest {
         String jobID = pbsCluster.submitBatchJob(jobDescriptor);
         System.out.println(jobID);
         try {
-            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, "ogce"));
+            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, null, "ogce"));
         } catch (AiravataMonitorException e) {
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
index 33ffa09..5d7314a 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
@@ -131,7 +131,7 @@ public class QstatMonitorTestWithMyProxyAuth {
         for (int i = 0; i < 1; i++) {
             String jobID = pbsCluster.submitBatchJob(jobDescriptor);
             System.out.println("Job submitted successfully, Job ID: " +  jobID);
-            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce");
             monitorID.setAuthenticationInfo(authenticationInfo);
             try {
                 monitorManager.addAJobToMonitor(monitorID);

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index bb9865d..26f447e 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -49,6 +49,8 @@ import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -101,29 +103,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             String proxyPath = ServerSettings.getSetting("proxy.file.path");
             String connectionName = ServerSettings.getSetting("connection.name");
 
-            if (monitors == null) {
-                log.error("Error loading primaryMonitor and there has to be a primary monitor");
-            } else {
-                for (String monitorClass : monitorList) {
-                    Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
-                    Monitor monitor = aClass.newInstance();
-                    if (monitor instanceof PullMonitor) {
-                        if (monitor instanceof QstatMonitor) {
-                            monitorManager.addQstatMonitor((QstatMonitor) monitor);
-                        }
-                    } else if (monitor instanceof PushMonitor) {
-                        if (monitor instanceof AMQPMonitor) {
-                            ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
-                            monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
-                        }
-                    } else if(monitor instanceof LocalJobMonitor){
-                        monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
-                    } else {
-                        log.error("Wrong class is given to primary Monitor");
+            for (String monitorClass : monitorList) {
+                Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
+                Monitor monitor = aClass.newInstance();
+                if (monitor instanceof PullMonitor) {
+                    if (monitor instanceof QstatMonitor) {
+                        monitorManager.addQstatMonitor((QstatMonitor) monitor);
                     }
+                } else if (monitor instanceof PushMonitor) {
+                    if (monitor instanceof AMQPMonitor) {
+                        ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+                        monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
+                    }
+                } else if(monitor instanceof LocalJobMonitor){
+                    monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
+                } else {
+                    log.error("Wrong class is given to primary Monitor");
                 }
-
             }
+
             monitorManager.registerListener(orchestrator);
             // Now Monitor Manager is properly configured, now we have to start the monitoring system.
             // This will initialize all the required threads and required queues
@@ -163,46 +161,51 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             if (tasks.size() > 1) {
                 log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
             }
-            for (TaskDetails taskID : tasks) {
-                //iterate through all the generated tasks and performs the job submisssion+monitoring
-                String jobID = null;
-                Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
-                if (experiment == null) {
-                    log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
-                    return false;
-                }
-                String userName = experiment.getUserName();
-
-                HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
-
-                // creating monitorID to register with monitoring queue
-                // this is a special case because amqp has to be in place before submitting the job
-                if ((hostDescription instanceof GsisshHostType) &&
-                        Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
-                    monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName);
-                    monitorManager.addAJobToMonitor(monitorID);
-                    jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
-                    if("none".equals(jobID)) {
-                        log.error("Job submission Failed, so we remove the job from monitoring");
-
-                    }else{
-                        log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
-                    }
-                } else {
-                    // Launching job for each task
-                    // if the monitoring is pull mode then we add the monitorID for each task after submitting
-                    // the job with the jobID, otherwise we don't need the jobID
-                    jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
-                    log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
-                    monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo);
-                    if("none".equals(jobID)) {
-                        log.error("Job submission Failed, so we remove the job from monitoring");
-
-                    }else{
-                            monitorManager.addAJobToMonitor(monitorID);
-                    }
-                }
-            }
+            List<String> ids = registry.getIds(DataType.WORKFLOW_NODE_DETAIL,WorkflowNodeConstants.EXPERIMENT_ID,experimentId);
+            for (String workflowNodeId : ids) {
+				List<Object> taskDetailList = registry.get(DataType.TASK_DETAIL,TaskDetailConstants.NODE_ID,workflowNodeId);
+				for (Object o : taskDetailList) {
+					TaskDetails taskID=(TaskDetails)o;
+					//iterate through all the generated tasks and performs the job submisssion+monitoring
+	                String jobID = null;
+	                Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
+	                if (experiment == null) {
+	                    log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
+	                    return false;
+	                }
+	                String userName = experiment.getUserName();
+
+	                HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
+
+	                // creating monitorID to register with monitoring queue
+	                // this is a special case because amqp has to be in place before submitting the job
+	                if ((hostDescription instanceof GsisshHostType) &&
+	                        Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
+	                    monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), workflowNodeId, experimentId, userName);
+	                    monitorManager.addAJobToMonitor(monitorID);
+	                    jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+	                    if("none".equals(jobID)) {
+	                        log.error("Job submission Failed, so we remove the job from monitoring");
+
+	                    }else{
+	                        log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+	                    }
+	                } else {
+	                    // Launching job for each task
+	                    // if the monitoring is pull mode then we add the monitorID for each task after submitting
+	                    // the job with the jobID, otherwise we don't need the jobID
+	                    jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+	                    log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+	                    monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), workflowNodeId, experimentId, userName, authenticationInfo);
+	                    if("none".equals(jobID)) {
+	                        log.error("Job submission Failed, so we remove the job from monitoring");
+
+	                    }else{
+	                            monitorManager.addAJobToMonitor(monitorID);
+	                    }
+	                }
+				}
+			}
         } catch (Exception e) {
             throw new TException(e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
index 5ed0bce..409f8a6 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
@@ -39,8 +39,10 @@ public class ThriftDataModelConversion {
         if (pr != null) {
             project.setProjectID(pr.getName());
             project.setName(pr.getName());
-            project.setCreationTime(pr.getCreationTime().getTime());
-            project.setDescription(pr.getDescription());
+            if (pr.getCreationTime()!=null) {
+				project.setCreationTime(pr.getCreationTime().getTime());
+			}
+			project.setDescription(pr.getDescription());
             project.setOwner(pr.getWorker().getUser());
             List<ProjectUserResource> projectUserList = pr.getProjectUserList();
             List<String> sharedUsers = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 69964ef..80e0c99 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -49,6 +49,7 @@ public class ServerMain {
 	private static int serverPID=-1;
 	private static final String serverStartedFileNamePrefix = "server-start";
 	private static boolean systemShutDown=false;
+	private static boolean shutdownHookCalledBefore=false;
     static{
 		servers = new ArrayList<IServer>();
     }
@@ -89,6 +90,19 @@ public class ServerMain {
 		});
 	}
 
+//	private static void addSecondaryShutdownHook(){
+//		Runtime.getRuntime().addShutdownHook(new Thread(){
+//			@Override
+//			public void run() {
+//				System.out.print("Graceful shutdown attempt is still active. Do you want to exit instead? (y/n)");
+//				String command=System.console().readLine().trim().toLowerCase();
+//				if (command.equals("yes") || command.equals("y")){
+//					System.exit(1);
+//				}
+//			}
+//		});
+//	}
+	
 	public static void main(String args[]) throws ParseException, IOException {
 		AiravataUtils.setExecutionAsServer();
 		CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args);

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
new file mode 100644
index 0000000..652ad1d
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class ExperimentIdentity {
+	private String experimentID;
+	public ExperimentIdentity(String experimentId) {
+		setExperimentID(experimentId);
+	}
+	public String getExperimentID() {
+		return experimentID;
+	}
+
+	public void setExperimentID(String experimentID) {
+		this.experimentID = experimentID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
new file mode 100644
index 0000000..5753d9d
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class JobIdentity extends TaskIdentity {
+	private String jobId;
+	
+	public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+		super(experimentId,workflowNodeId,taskId);
+		setJobId(jobId);
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(String jobId) {
+		this.jobId = jobId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index bd6bfcb..241e3b0 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -52,7 +52,7 @@ public class MonitorID {
 
     private String experimentID;
 
-//    private String workflowNodeID;
+    private String workflowNodeID;
 
     private String taskID;
 
@@ -62,7 +62,7 @@ public class MonitorID {
 
     private JobState state;
 
-    public MonitorID(HostDescription host, String jobID,String taskID, String experimentID, String userName) {
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
         this.host = host;
         this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.userName = userName;
@@ -71,7 +71,7 @@ public class MonitorID {
         this.experimentID = experimentID;
     }
 
-    public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName,AuthenticationInfo authenticationInfo) {
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
         this.host = host;
         this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.authenticationInfo = authenticationInfo;
@@ -207,6 +207,14 @@ public class MonitorID {
             }
     }
 
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+
 //	public String getWorkflowNodeID() {
 //		return workflowNodeID;
 //	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
new file mode 100644
index 0000000..f7bc785
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+	private String taskId;
+
+	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+		super(experimentId,workflowNodeId);
+		setTaskId(taskId);
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
new file mode 100644
index 0000000..a8fe09f
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+	private String workflowNodeID;
+	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+		super(experimentId);
+		setWorkflowNodeID(workflowNodeId);
+	}
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index de7cf90..ec4cb40 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.impl;
 
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.JobIdentity;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor;
 import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
@@ -41,7 +42,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
         do {
             try {
                 MonitorID take = jobQueue.take();
-                getPublisher().publish(new JobStatusChangeRequest(take, JobState.COMPLETE));
+                getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
             } catch (Exception e) {
                 e.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 88a5198..dc6d193 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue;
 
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.JobIdentity;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -201,7 +202,7 @@ public class AMQPMonitor extends PushMonitor {
             }
         }
         next.setStatus(monitorID.getStatus());
-        publisher.publish(new JobStatusChangeRequest(next,next.getStatus()));
+        publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
         return true;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index 53bcc8b..5a2d40d 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -76,7 +76,7 @@ public class BasicConsumer implements Consumer {
         logger.debug("************************************************************");
         try {
             String jobID = envelope.getRoutingKey().split("\\.")[0];
-            MonitorID monitorID = new MonitorID(null, jobID, null, null, null);
+            MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
             monitorID.setStatus(parser.parseMessage(message));
             publisher.publish(monitorID);
         } catch (AiravataMonitorException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
index 909f10e..bacd8df 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
@@ -21,17 +21,7 @@
 
 package org.apache.airavata.job.monitor.state;
 
-import org.apache.airavata.job.monitor.MonitorID;
 
 public abstract class AbstractStateChangeRequest implements PublisherMessage{
-    private MonitorID monitorID;
-	
-	public MonitorID getMonitorID() {
-	    return monitorID;
-	}
-	
-	public void setMonitorID(MonitorID monitorID) {
-	    this.monitorID = monitorID;
-	}
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
index d664161..9bee5ca 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.job.monitor.state;
 
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.ExperimentIdentity;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 
 /**
@@ -32,15 +32,15 @@ import org.apache.airavata.model.workspace.experiment.ExperimentState;
  */
 public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{
     private ExperimentState state;
-
+    private ExperimentIdentity identity;
 
     // this constructor can be used in Qstat monitor to handle errors
     public ExperimentStatusChangeRequest() {
     }
 
-    public ExperimentStatusChangeRequest(MonitorID monitorID, ExperimentState state) {
-        setMonitorID(monitorID);
+    public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
         this.state = state;
+        setIdentity(experimentIdentity);
     }
 
     public ExperimentState getState() {
@@ -51,5 +51,13 @@ public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{
        this.state = state;
     }
 
+	public ExperimentIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(ExperimentIdentity identity) {
+		this.identity = identity;
+	}
+
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
index 9669b75..0db9da6 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
@@ -20,11 +20,10 @@
 */
 package org.apache.airavata.job.monitor.state;
 
+import org.apache.airavata.job.monitor.JobIdentity;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.model.workspace.experiment.JobState;
 
-import java.util.Properties;
-
 /**
  * This is the primary job state object used in
  * through out the monitor module. This use airavata-data-model JobState enum
@@ -34,15 +33,18 @@ import java.util.Properties;
  */
 public class JobStatusChangeRequest  extends AbstractStateChangeRequest{
     private JobState state;
+    private JobIdentity identity;
 
-
+    private MonitorID monitorID;
+    
     // this constructor can be used in Qstat monitor to handle errors
     public JobStatusChangeRequest() {
     }
 
-    public JobStatusChangeRequest(MonitorID monitorID, JobState state) {
-        setMonitorID(monitorID);
-        this.state = state;
+    public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
+    	setIdentity(jobId);
+    	setMonitorID(monitorID);
+    	this.state = state;
     }
 
     public JobState getState() {
@@ -53,4 +55,20 @@ public class JobStatusChangeRequest  extends AbstractStateChangeRequest{
        this.state = state;
     }
 
+	public JobIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(JobIdentity identity) {
+		this.identity = identity;
+	}
+
+	public MonitorID getMonitorID() {
+		return monitorID;
+	}
+
+	public void setMonitorID(MonitorID monitorID) {
+		this.monitorID = monitorID;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
index f35310b..e8e58db 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.job.monitor.state;
 
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.TaskIdentity;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 
 /**
@@ -32,14 +32,14 @@ import org.apache.airavata.model.workspace.experiment.TaskState;
  */
 public class TaskStatusChangeRequest extends AbstractStateChangeRequest{
     private TaskState state;
-
+    private TaskIdentity identity; 
     // this constructor can be used in Qstat monitor to handle errors
     public TaskStatusChangeRequest() {
     }
 
-    public TaskStatusChangeRequest(MonitorID monitorID, TaskState state) {
-        setMonitorID(monitorID);
+    public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) {
         this.state = state;
+        setIdentity(taskIdentity);
     }
 
     public TaskState getState() {
@@ -50,4 +50,12 @@ public class TaskStatusChangeRequest extends AbstractStateChangeRequest{
        this.state = state;
     }
 
+	public TaskIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(TaskIdentity identity) {
+		this.identity = identity;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
new file mode 100644
index 0000000..7e58e35
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.state;
+
+import org.apache.airavata.job.monitor.WorkflowNodeIdentity;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest{
+    private WorkflowNodeState state;
+    private WorkflowNodeIdentity identity;
+
+    // this constructor can be used in Qstat monitor to handle errors
+    public WorkflowNodeStatusChangeRequest() {
+    }
+
+    public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
+        this.state = state;
+        setIdentity(identity);
+    }
+
+    public WorkflowNodeState getState() {
+        return state;
+    }
+
+    public void setState(WorkflowNodeState state) {
+       this.state = state;
+    }
+
+	public WorkflowNodeIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(WorkflowNodeIdentity identity) {
+		this.identity = identity;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 980c2fa..c0e579e 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -150,7 +150,7 @@ public class AMQPMonitorTest {
         String jobID = pbsCluster.submitBatchJob(jobDescriptor);
         System.out.println(jobID);
         try {
-            pushQueue.add(new MonitorID(hostDescription, jobID,null,null, "ogce"));
+            pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce"));
         } catch (Exception e) {
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 735d1d2..d85f465 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -144,7 +144,7 @@ public class QstatMonitorTestWithMyProxyAuth {
         for (int i = 0; i < 1; i++) {
             String jobID = pbsCluster.submitBatchJob(jobDescriptor);
             System.out.println("Job submitted successfully, Job ID: " +  jobID);
-            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce");
             monitorID.setAuthenticationInfo(authenticationInfo);
             try {
                 org.apache.airavata.job.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID);


Mime
View raw message