airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [1/2] adding gateway id to messaging events
Date Tue, 21 Oct 2014 21:29:57 GMT
Repository: airavata
Updated Branches:
  refs/heads/master cbd947cfd -> adb9b956b


http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 06a8d37..837b728 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -59,7 +59,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener
{
             TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(),
taskStatus.getTaskIdentity());
             monitorPublisher.publish(event);
             String messageId = AiravataUtils.getId("TASK");
-            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId);
+            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,
taskStatus.getTaskIdentity().getGatewayId());
             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             if ( ServerSettings.isRabbitMqPublishEnabled()){
                 publisher.publish(msgCntxt);
@@ -101,11 +101,12 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener
{
 			logger.debug("Publishing task status for "+jobStatus.getJobIdentity().getTaskId()+":"+state.toString());
             TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(),
                                                          jobStatus.getJobIdentity().getWorkflowNodeId(),
-                                                         jobStatus.getJobIdentity().getExperimentId());
+                                                         jobStatus.getJobIdentity().getExperimentId(),
+                                                         jobStatus.getJobIdentity().getGatewayId());
             TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity);
             monitorPublisher.publish(event);
             String messageId = AiravataUtils.getId("TASK");
-            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId);
+            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId());
             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             if ( ServerSettings.isRabbitMqPublishEnabled()){
                 publisher.publish(msgCntxt);

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index 268677e..8a93c40 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -81,11 +81,13 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
     	try {
 			updateWorkflowNodeStatus(taskStatus.getTaskIdentity().getWorkflowNodeId(), state);
 			logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
-            WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(),
taskStatus.getTaskIdentity().getExperimentId());
+            WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(),
+                                                                         taskStatus.getTaskIdentity().getExperimentId(),
+                                                                         taskStatus.getTaskIdentity().getGatewayId());
             WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state,
workflowIdentity);
             monitorPublisher.publish(event);
             String messageId = AiravataUtils.getId("WFNODE");
-            MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE,
messageId);
+            MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE,
messageId, taskStatus.getTaskIdentity().getGatewayId());
             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
 
             if ( ServerSettings.isRabbitMqPublishEnabled()){

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index e7a1297..545bcd3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -35,7 +35,8 @@ public class GfacExperimentStateChangeRequest {
         setIdentity(new JobIdentifier(monitorID.getJobID(),
                 monitorID.getTaskID(),
                 monitorID.getWorkflowNodeID(),
-                monitorID.getExperimentID()));
+                monitorID.getExperimentID(),
+                monitorID.getJobExecutionContext().getGatewayID()));
         setMonitorID(monitorID);
         this.state = state;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
index 0e56fc7..d279bbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -50,7 +50,7 @@ public class OutHandlerWorker implements Runnable {
         try {
             gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
         } catch (GFacException e) {
-            TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID());
+            TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(),
monitorID.getJobExecutionContext().getGatewayID());
             monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
             //FIXME this is a case where the output retrieving fails even if the job execution
was a success. Thus updating the task status
             logger.info(e.getLocalizedMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index be44991..51da68a 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -177,7 +177,8 @@ public class LocalProvider extends AbstractProvider {
             JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
                     jobExecutionContext.getTaskData().getTaskID(),
                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                    jobExecutionContext.getExperimentID());
+                    jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getGatewayID());
             this.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE,
jobIdentity));
         } catch (IOException io) {
             throw new GFacProviderException(io.getMessage(), io);
@@ -234,7 +235,8 @@ public class LocalProvider extends AbstractProvider {
             registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
             TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                    jobExecutionContext.getExperimentID());
+                    jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getGatewayID());
             getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
         } catch (XmlException e) {
             throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 91d5369..26ae92e 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -232,7 +232,11 @@ public class HPCPullMonitor extends PullMonitor {
                         }
                         jobStatus = new JobStatusChangeRequestEvent();
                         iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
   //IMPORTANT this is not a simple setter we have a logic
-                        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID());
+                        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
+                                                                      iMonitorID.getTaskID(),
+                                                                      iMonitorID.getWorkflowNodeID(),
+                                                                      iMonitorID.getExperimentID(),
+                                                                      iMonitorID.getJobExecutionContext().getGatewayID());
                         jobStatus.setJobIdentity(jobIdentity);
                         jobStatus.setState(iMonitorID.getStatus());
                         // we have this JobStatus class to handle amqp monitoring
@@ -321,12 +325,13 @@ public class HPCPullMonitor extends PullMonitor {
             if (e.getMessage().contains("Unknown Job Id Error")) {
                 // in this case job is finished or may be the given job ID is wrong
                 jobStatus.setState(JobState.UNKNOWN);
-                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN",
"UNKNOWN");
+                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN",
"UNKNOWN", "UNKNOWN");
                 if (currentMonitorID != null){
                     jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
                     jobIdentifier.setTaskId(currentMonitorID.getTaskID());
                     jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
                     jobIdentifier.setJobId(currentMonitorID.getJobID());
+                    jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
                 }
                 jobStatus.setJobIdentity(jobIdentifier);
                 publisher.publish(jobStatus);

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index 9422d05..baab7b4 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -203,7 +203,10 @@ public class AMQPMonitor extends PushMonitor {
         }
         next.setStatus(monitorID.getStatus());
         JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(),
-                                                                            next.getTaskID(),
next.getWorkflowNodeID(), next.getExperimentID());
+                                                     next.getTaskID(),
+                                                     next.getWorkflowNodeID(),
+                                                     next.getExperimentID(),
+                                                     next.getJobExecutionContext().getGatewayID());
         publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
         return true;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 00a22bb..0a39d92 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -30,13 +30,15 @@ public class MessageContext {
     private final TBase event;
     private final MessageType type;
     private final String messageId;
+    private final String gatewayId;
     private Timestamp updatedTime;
 
 
-    public MessageContext(TBase message, MessageType type, String messageId) {
+    public MessageContext(TBase message, MessageType type, String messageId, String gatewayId)
{
         this.event = message;
         this.type = type;
         this.messageId = messageId;
+        this.gatewayId = gatewayId;
     }
 
     public TBase getEvent() {
@@ -58,4 +60,8 @@ public class MessageContext {
     public void setUpdatedTime(Timestamp updatedTime) {
         this.updatedTime = updatedTime;
     }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
index 84d4b43..e9c5b82 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
@@ -149,6 +149,7 @@ public class RabbitMQConsumer implements Consumer {
                     try {
                         ThriftUtils.createThriftFromBytes(body, message);
                         TBase event = null;
+                        String gatewayId = null;
                         if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
                             ExperimentStatusChangeEvent experimentStatusChangeEvent = new
ExperimentStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
@@ -156,6 +157,7 @@ public class RabbitMQConsumer implements Consumer {
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " +
                                     experimentStatusChangeEvent.getState());
                             event = experimentStatusChangeEvent;
+                            gatewayId = experimentStatusChangeEvent.getGatewayId();
                         } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE))
{
                             WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
@@ -163,6 +165,7 @@ public class RabbitMQConsumer implements Consumer {
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " +
                                     wfnStatusChangeEvent.getState());
                             event = wfnStatusChangeEvent;
+                            gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
                         } else if (message.getMessageType().equals(MessageType.TASK)) {
                             TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
@@ -170,6 +173,7 @@ public class RabbitMQConsumer implements Consumer {
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " +
                                     taskStatusChangeEvent.getState());
                             event = taskStatusChangeEvent;
+                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
                         } else if (message.getMessageType().equals(MessageType.JOB)) {
                             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
@@ -177,9 +181,9 @@ public class RabbitMQConsumer implements Consumer {
                                     + "' and with message type '" + message.getMessageType()
+ "'  with status " +
                                     jobStatusChangeEvent.getState());
                             event = jobStatusChangeEvent;
+                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
                         }
-
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId());
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId(), gatewayId);
                         messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                         handler.onMessage(messageContext);
                     } catch (TException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/adb9b956/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
index ce332db..906b954 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
@@ -44,7 +44,9 @@ import javax.xml.xpath.XPathFactory;
 
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.common.utils.XMLUtil;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
@@ -148,6 +150,16 @@ public class WorkflowInterpreter implements AbstractActivityListener{
 	private Experiment experiment;
 	private Registry registry;
 
+    public void setGatewayId(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
+
+    private String gatewayId;
+
 	private OrchestratorService.Client orchestratorClient;
 	
 	private Map<String, Node> awaitingTasks;
@@ -171,6 +183,14 @@ public class WorkflowInterpreter implements AbstractActivityListener{
 		this.interactor = new SSWorkflowInterpreterInteractorImpl();
 		this.orchestratorClient = orchestratorClient;
         this.publisher = publisher;
+        // if gateway id is not set, we will get it from airavata server properties
+        if (gatewayId == null){
+            try {
+                gatewayId = ServerSettings.getDefaultUserGateway();
+            } catch (ApplicationSettingsException e) {
+                log.error("error while reading airavata-server properties..", e);
+            }
+        }
 		//TODO set act of provenance
 		nodeInstanceList=new HashMap<Node, WorkflowNodeDetails>();
         setWorkflowInterpreterConfigurationThreadLocal(config);
@@ -387,8 +407,8 @@ public class WorkflowInterpreter implements AbstractActivityListener{
 		} finally{
         	cleanup();
 			this.getWorkflow().setExecutionState(WorkflowExecutionState.NONE);
-            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
experiment.getExperimentID());
-            MessageContext msgCtx = new MessageContext(event, MessageType.EXPERIMENT, AiravataUtils.getId("EXPERIMENT"));
+            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
experiment.getExperimentID(), gatewayId);
+            MessageContext msgCtx = new MessageContext(event, MessageType.EXPERIMENT, AiravataUtils.getId("EXPERIMENT"),
gatewayId);
             msgCtx.setUpdatedTime(new Timestamp(Calendar.getInstance().getTimeInMillis()));
             publisher.publish(msgCtx);
         }
@@ -398,7 +418,7 @@ public class WorkflowInterpreter implements AbstractActivityListener{
             throws AiravataException {
         if (publisher != null) {
             MessageContext msgCtx = new MessageContext(new WorkflowNodeStatusChangeEvent(state,
new WorkflowIdentifier(nodeId,
-                    expId)), MessageType.WORKFLOWNODE, AiravataUtils.getId("NODE"));
+                    expId, gatewayId)), MessageType.WORKFLOWNODE, AiravataUtils.getId("NODE"),
gatewayId);
             msgCtx.setUpdatedTime(new Timestamp(Calendar.getInstance().getTimeInMillis()));
             publisher.publish(msgCtx);
         } else {


Mime
View raw message