airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [4/4] airavata git commit: Fixed AIRAVATA-1591 , AIRAVATA-1592 , AIRAVATA-1593
Date Wed, 25 Feb 2015 22:13:33 GMT
Fixed AIRAVATA-1591 ,AIRAVATA-1592 , AIRAVATA-1593


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d25441a0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d25441a0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d25441a0

Branch: refs/heads/new-workflow-design-rabbitmq
Commit: d25441a017f2e443165a22c9c5966a6d9e6aad9e
Parents: 97ff3b7
Author: shamrath <shameerainfo@gmail.com>
Authored: Wed Feb 25 17:12:18 2015 -0500
Committer: shamrath <shameerainfo@gmail.com>
Committed: Wed Feb 25 17:12:18 2015 -0500

----------------------------------------------------------------------
 .../lib/airavata/messagingEvents_types.cpp      |  88 +++-
 .../lib/airavata/messagingEvents_types.h        |  47 +-
 .../Airavata/Model/Messaging/Event/Types.php    |  94 ++++
 .../model/messaging/event/MessageType.java      |   5 +-
 .../messaging/event/ProcessSubmitEvent.java     | 492 +++++++++++++++++++
 .../messagingEvents.thrift                      |   8 +-
 .../core/monitor/AiravataTaskStatusUpdator.java |  13 +
 .../core/impl/RabbitMQProcessConsumer.java      | 158 ++++++
 .../core/impl/RabbitMQProcessPublisher.java     |  84 ++++
 .../core/impl/RabbitMQStatusConsumer.java       |  10 +-
 .../core/impl/RabbitMQStatusPublisher.java      |  20 +-
 .../server/OrchestratorServerHandler.java       |  76 ++-
 modules/simple-workflow/pom.xml                 |   5 +
 .../simple/workflow/engine/ProcessContext.java  |  62 +++
 .../simple/workflow/engine/ProcessPack.java     |  62 ---
 .../engine/SimpleWorkflowInterpreter.java       | 378 +++++++-------
 .../workflow/engine/WorkflowFactoryImpl.java    |   4 +-
 .../engine/parser/AiravataDefaultParser.java    | 293 -----------
 .../engine/parser/AiravataWorkflowParser.java   | 291 +++++++++++
 .../parser/AiravataDefaultParserTest.java       | 119 -----
 .../parser/AiravataWorkflowParserTest.java      | 119 +++++
 21 files changed, 1724 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index 71f45be..92f29c6 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -47,7 +47,8 @@ int _kMessageTypeValues[] = {
   MessageType::WORKFLOWNODE,
   MessageType::JOB,
   MessageType::LAUNCHTASK,
-  MessageType::TERMINATETASK
+  MessageType::TERMINATETASK,
+  MessageType::TASKOUTPUT
 };
 const char* _kMessageTypeNames[] = {
   "EXPERIMENT",
@@ -55,9 +56,10 @@ const char* _kMessageTypeNames[] = {
   "WORKFLOWNODE",
   "JOB",
   "LAUNCHTASK",
-  "TERMINATETASK"
+  "TERMINATETASK",
+  "TASKOUTPUT"
 };
-const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 const char* ExperimentStatusChangeEvent::ascii_fingerprint = "38C252E94E93B69D04EB3A6EE2F9EDFB";
 const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x38,0xC2,0x52,0xE9,0x4E,0x93,0xB6,0x9D,0x04,0xEB,0x3A,0x6E,0xE2,0xF9,0xED,0xFB};
@@ -839,6 +841,86 @@ void swap(JobIdentifier &a, JobIdentifier &b) {
   swap(a.gatewayId, b.gatewayId);
 }
 
+const char* ProcessSubmitEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972";
+const uint8_t ProcessSubmitEvent::binary_fingerprint[16] = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+uint32_t ProcessSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+  uint32_t xfer = 0;
+  std::string fname;
+  ::apache::thrift::protocol::TType ftype;
+  int16_t fid;
+
+  xfer += iprot->readStructBegin(fname);
+
+  using ::apache::thrift::protocol::TProtocolException;
+
+  bool isset_taskId = false;
+  bool isset_credentialToken = false;
+
+  while (true)
+  {
+    xfer += iprot->readFieldBegin(fname, ftype, fid);
+    if (ftype == ::apache::thrift::protocol::T_STOP) {
+      break;
+    }
+    switch (fid)
+    {
+      case 1:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->taskId);
+          isset_taskId = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->credentialToken);
+          isset_credentialToken = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      default:
+        xfer += iprot->skip(ftype);
+        break;
+    }
+    xfer += iprot->readFieldEnd();
+  }
+
+  xfer += iprot->readStructEnd();
+
+  if (!isset_taskId)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_credentialToken)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  return xfer;
+}
+
+uint32_t ProcessSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+  uint32_t xfer = 0;
+  xfer += oprot->writeStructBegin("ProcessSubmitEvent");
+
+  xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 1);
+  xfer += oprot->writeString(this->taskId);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("credentialToken", ::apache::thrift::protocol::T_STRING, 2);
+  xfer += oprot->writeString(this->credentialToken);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldStop();
+  xfer += oprot->writeStructEnd();
+  return xfer;
+}
+
+void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b) {
+  using ::std::swap;
+  swap(a.taskId, b.taskId);
+  swap(a.credentialToken, b.credentialToken);
+}
+
 const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2";
 const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index c7e2bb5..aafcda1 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -54,7 +54,8 @@ struct MessageType {
     WORKFLOWNODE = 2,
     JOB = 3,
     LAUNCHTASK = 4,
-    TERMINATETASK = 5
+    TERMINATETASK = 5,
+    TASKOUTPUT = 6
   };
 };
 
@@ -462,6 +463,50 @@ class JobIdentifier {
 void swap(JobIdentifier &a, JobIdentifier &b);
 
 
+class ProcessSubmitEvent {
+ public:
+
+  static const char* ascii_fingerprint; // = "07A9615F837F7D0A952B595DD3020972";
+  static const uint8_t binary_fingerprint[16]; // = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+  ProcessSubmitEvent() : taskId(), credentialToken() {
+  }
+
+  virtual ~ProcessSubmitEvent() throw() {}
+
+  std::string taskId;
+  std::string credentialToken;
+
+  void __set_taskId(const std::string& val) {
+    taskId = val;
+  }
+
+  void __set_credentialToken(const std::string& val) {
+    credentialToken = val;
+  }
+
+  bool operator == (const ProcessSubmitEvent & rhs) const
+  {
+    if (!(taskId == rhs.taskId))
+      return false;
+    if (!(credentialToken == rhs.credentialToken))
+      return false;
+    return true;
+  }
+  bool operator != (const ProcessSubmitEvent &rhs) const {
+    return !(*this == rhs);
+  }
+
+  bool operator < (const ProcessSubmitEvent & ) const;
+
+  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b);
+
+
 class TaskSubmitEvent {
  public:
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index b0d7676..4c4ec93 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -37,6 +37,7 @@ final class MessageType {
   const JOB = 3;
   const LAUNCHTASK = 4;
   const TERMINATETASK = 5;
+  const TASKOUTPUT = 6;
   static public $__names = array(
     0 => 'EXPERIMENT',
     1 => 'TASK',
@@ -44,6 +45,7 @@ final class MessageType {
     3 => 'JOB',
     4 => 'LAUNCHTASK',
     5 => 'TERMINATETASK',
+    6 => 'TASKOUTPUT',
   );
 }
 
@@ -971,6 +973,98 @@ class JobIdentifier {
 
 }
 
+class ProcessSubmitEvent {
+  static $_TSPEC;
+
+  public $taskId = null;
+  public $credentialToken = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        1 => array(
+          'var' => 'taskId',
+          'type' => TType::STRING,
+          ),
+        2 => array(
+          'var' => 'credentialToken',
+          'type' => TType::STRING,
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['taskId'])) {
+        $this->taskId = $vals['taskId'];
+      }
+      if (isset($vals['credentialToken'])) {
+        $this->credentialToken = $vals['credentialToken'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'ProcessSubmitEvent';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 1:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->taskId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->credentialToken);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('ProcessSubmitEvent');
+    if ($this->taskId !== null) {
+      $xfer += $output->writeFieldBegin('taskId', TType::STRING, 1);
+      $xfer += $output->writeString($this->taskId);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->credentialToken !== null) {
+      $xfer += $output->writeFieldBegin('credentialToken', TType::STRING, 2);
+      $xfer += $output->writeString($this->credentialToken);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
 class TaskSubmitEvent {
   static $_TSPEC;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index 230b87b..761626f 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -34,7 +34,8 @@ import org.apache.thrift.TEnum;
   WORKFLOWNODE(2),
   JOB(3),
   LAUNCHTASK(4),
-  TERMINATETASK(5);
+  TERMINATETASK(5),
+  TASKOUTPUT(6);
 
   private final int value;
 
@@ -67,6 +68,8 @@ import org.apache.thrift.TEnum;
         return LAUNCHTASK;
       case 5:
         return TERMINATETASK;
+      case 6:
+        return TASKOUTPUT;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
new file mode 100644
index 0000000..e1d001a
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class ProcessSubmitEvent implements org.apache.thrift.TBase<ProcessSubmitEvent, ProcessSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ProcessSubmitEvent> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ProcessSubmitEvent");
+
+  private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField CREDENTIAL_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("credentialToken", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new ProcessSubmitEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new ProcessSubmitEventTupleSchemeFactory());
+  }
+
+  private String taskId; // required
+  private String credentialToken; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    TASK_ID((short)1, "taskId"),
+    CREDENTIAL_TOKEN((short)2, "credentialToken");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // TASK_ID
+          return TASK_ID;
+        case 2: // CREDENTIAL_TOKEN
+          return CREDENTIAL_TOKEN;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.CREDENTIAL_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("credentialToken", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ProcessSubmitEvent.class, metaDataMap);
+  }
+
+  public ProcessSubmitEvent() {
+  }
+
+  public ProcessSubmitEvent(
+    String taskId,
+    String credentialToken)
+  {
+    this();
+    this.taskId = taskId;
+    this.credentialToken = credentialToken;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ProcessSubmitEvent(ProcessSubmitEvent other) {
+    if (other.isSetTaskId()) {
+      this.taskId = other.taskId;
+    }
+    if (other.isSetCredentialToken()) {
+      this.credentialToken = other.credentialToken;
+    }
+  }
+
+  public ProcessSubmitEvent deepCopy() {
+    return new ProcessSubmitEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    this.taskId = null;
+    this.credentialToken = null;
+  }
+
+  public String getTaskId() {
+    return this.taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  public void unsetTaskId() {
+    this.taskId = null;
+  }
+
+  /** Returns true if field taskId is set (has been assigned a value) and false otherwise */
+  public boolean isSetTaskId() {
+    return this.taskId != null;
+  }
+
+  public void setTaskIdIsSet(boolean value) {
+    if (!value) {
+      this.taskId = null;
+    }
+  }
+
+  public String getCredentialToken() {
+    return this.credentialToken;
+  }
+
+  public void setCredentialToken(String credentialToken) {
+    this.credentialToken = credentialToken;
+  }
+
+  public void unsetCredentialToken() {
+    this.credentialToken = null;
+  }
+
+  /** Returns true if field credentialToken is set (has been assigned a value) and false otherwise */
+  public boolean isSetCredentialToken() {
+    return this.credentialToken != null;
+  }
+
+  public void setCredentialTokenIsSet(boolean value) {
+    if (!value) {
+      this.credentialToken = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case TASK_ID:
+      if (value == null) {
+        unsetTaskId();
+      } else {
+        setTaskId((String)value);
+      }
+      break;
+
+    case CREDENTIAL_TOKEN:
+      if (value == null) {
+        unsetCredentialToken();
+      } else {
+        setCredentialToken((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case TASK_ID:
+      return getTaskId();
+
+    case CREDENTIAL_TOKEN:
+      return getCredentialToken();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case TASK_ID:
+      return isSetTaskId();
+    case CREDENTIAL_TOKEN:
+      return isSetCredentialToken();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ProcessSubmitEvent)
+      return this.equals((ProcessSubmitEvent)that);
+    return false;
+  }
+
+  public boolean equals(ProcessSubmitEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_taskId = true && this.isSetTaskId();
+    boolean that_present_taskId = true && that.isSetTaskId();
+    if (this_present_taskId || that_present_taskId) {
+      if (!(this_present_taskId && that_present_taskId))
+        return false;
+      if (!this.taskId.equals(that.taskId))
+        return false;
+    }
+
+    boolean this_present_credentialToken = true && this.isSetCredentialToken();
+    boolean that_present_credentialToken = true && that.isSetCredentialToken();
+    if (this_present_credentialToken || that_present_credentialToken) {
+      if (!(this_present_credentialToken && that_present_credentialToken))
+        return false;
+      if (!this.credentialToken.equals(that.credentialToken))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(ProcessSubmitEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTaskId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetCredentialToken()).compareTo(other.isSetCredentialToken());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCredentialToken()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentialToken, other.credentialToken);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ProcessSubmitEvent(");
+    boolean first = true;
+
+    sb.append("taskId:");
+    if (this.taskId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.taskId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("credentialToken:");
+    if (this.credentialToken == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.credentialToken);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!isSetTaskId()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString());
+    }
+
+    if (!isSetCredentialToken()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'credentialToken' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class ProcessSubmitEventStandardSchemeFactory implements SchemeFactory {
+    public ProcessSubmitEventStandardScheme getScheme() {
+      return new ProcessSubmitEventStandardScheme();
+    }
+  }
+
+  private static class ProcessSubmitEventStandardScheme extends StandardScheme<ProcessSubmitEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TASK_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.taskId = iprot.readString();
+              struct.setTaskIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // CREDENTIAL_TOKEN
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.credentialToken = iprot.readString();
+              struct.setCredentialTokenIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.taskId != null) {
+        oprot.writeFieldBegin(TASK_ID_FIELD_DESC);
+        oprot.writeString(struct.taskId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.credentialToken != null) {
+        oprot.writeFieldBegin(CREDENTIAL_TOKEN_FIELD_DESC);
+        oprot.writeString(struct.credentialToken);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ProcessSubmitEventTupleSchemeFactory implements SchemeFactory {
+    public ProcessSubmitEventTupleScheme getScheme() {
+      return new ProcessSubmitEventTupleScheme();
+    }
+  }
+
+  private static class ProcessSubmitEventTupleScheme extends TupleScheme<ProcessSubmitEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeString(struct.taskId);
+      oprot.writeString(struct.credentialToken);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.taskId = iprot.readString();
+      struct.setTaskIdIsSet(true);
+      struct.credentialToken = iprot.readString();
+      struct.setCredentialTokenIsSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index d9e85d4..b13b5ed 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -40,7 +40,8 @@ enum MessageType {
     WORKFLOWNODE,
     JOB,
     LAUNCHTASK,
-    TERMINATETASK
+    TERMINATETASK,
+    TASKOUTPUT
 }
 
 struct ExperimentStatusChangeEvent {
@@ -102,6 +103,11 @@ struct JobIdentifier {
 //    //8:
 // }
 
+struct ProcessSubmitEvent{
+    1: required string taskId;
+    2: required string credentialToken;
+}
+
 struct TaskSubmitEvent{
     1: required string experimentId,
     2: required string taskId,

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 837b728..5490d50 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gfac.core.monitor;
 
 import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -150,4 +151,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
             }
         }
 	}
+
+
+    @Subscribe
+    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException {
+        String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+        logger.debug("Task Output changed event received for workflow node : " +
+                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+        // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too.
+        MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId());
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        publisher.publish(messageContext);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
new file mode 100644
index 0000000..3352893
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RabbitMQProcessConsumer {
+
+    private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessConsumer.class);
+
+    private String url;
+    private Connection connection;
+    private Channel channel;
+
+    public RabbitMQProcessConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for default");
+
+            channel = connection.createChannel();
+//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange default";
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+
+    public String listen(final MessageHandler handler) throws AiravataException {
+        try {
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
+                throw new IllegalArgumentException("The routing key must be present");
+            }
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            if (queueName == null) {
+                if (!channel.isOpen()) {
+                    channel = connection.createChannel();
+//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+                }
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+
+            if (consumerTag == null) {
+                consumerTag = "default";
+            }
+            // autoAck=false, we will ack after task is done
+            final String finalQueueName = queueName;
+            channel.basicConsume(queueName, true, new QueueingConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        TBase event = null;
+                        String gatewayId = null;
+                        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+                        ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+                        log.debug("Message received with message id : " + message.getMessageId()
+                                + " with task id : " + processSubmitEvent.getTaskId());
+                        event = processSubmitEvent;
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), null);
+                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                        handler.onMessage(messageContext);
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + finalQueueName;
+                        log.warn(msg, e);
+                    }
+                }
+            });
+            return "";
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange default";
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void stopListen(final String queueName , final String exchangeName) throws AiravataException {
+        try {
+            channel.queueUnbind(queueName, exchangeName, null);
+        } catch (IOException e) {
+            String msg = "could not un-bind queue: " + queueName + " for exchange " + exchangeName;
+            log.debug(msg);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
new file mode 100644
index 0000000..3684198
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RabbitMQProcessPublisher implements Publisher {
+
+    private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessPublisher.class);
+    public static final String PROCESS = "process.queue" ;
+
+    private RabbitMQProducer rabbitMQProducer;
+
+    public RabbitMQProcessPublisher() throws Exception {
+        String brokerUrl;
+        String exchangeName;
+        try {
+            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null, null);
+        rabbitMQProducer.open();
+    }
+
+    @Override
+    public void publish(MessageContext msgCtx) throws AiravataException {
+        try {
+            log.info("Publishing to process queue ...");
+            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            message.setMessageId(msgCtx.getMessageId());
+            message.setMessageType(msgCtx.getType());
+            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+            String queueName = PROCESS;
+            message.setMessageType(MessageType.TASK);
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            rabbitMQProducer.sendToWorkerQueue(messageBody, queueName);
+        } catch (TException e) {
+            String msg = "Error while serializing the thrift object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index d5e8c72..6efa77a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -174,6 +174,12 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     taskStatusChangeEvent.getState());
                             event = taskStatusChangeEvent;
                             gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+                        }else if (message.getMessageType() == MessageType.TASKOUTPUT) {
+                            TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+                            event = taskOutputChangeEvent;
+                            gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
                         } else if (message.getMessageType().equals(MessageType.JOB)) {
                             JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
@@ -182,7 +188,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     jobStatusChangeEvent.getState());
                             event = jobStatusChangeEvent;
                             gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
-                        }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+                        } else if (message.getMessageType().equals(MessageType.LAUNCHTASK)) {
                             TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()
@@ -190,7 +196,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                                     taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
                             event = taskSubmitEvent;
                             gatewayId = taskSubmitEvent.getGatewayId();
-                        }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+                        } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) {
                             TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
                             log.debug(" Message Received with message id '" + message.getMessageId()

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 966d44d..a149037 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -68,21 +68,25 @@ public class RabbitMQStatusPublisher implements Publisher {
             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
             String gatewayId = msgCtx.getGatewayId();
             String routingKey = null;
-            if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+            if (msgCtx.getType() == MessageType.EXPERIMENT) {
                 ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
                 routingKey = gatewayId + "." + event.getExperimentId();
-            } else if (msgCtx.getType().equals(MessageType.TASK)) {
+            } else if (msgCtx.getType() == MessageType.TASK) {
                 TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
-                routingKey =  gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
                         event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
-            }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+            } else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+                        event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+            } else if (msgCtx.getType() == MessageType.WORKFLOWNODE) {
                 WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
                 WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
-                routingKey =  gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
-            }else if (msgCtx.getType().equals(MessageType.JOB)){
-                JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+                routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+            } else if (msgCtx.getType() == MessageType.JOB) {
+                JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
                 JobIdentifier identity = event.getJobIdentity();
-                routingKey =  gatewayId + "." + identity.getExperimentId() + "." +
+                routingKey = gatewayId + "." + identity.getExperimentId() + "." +
                         identity.getWorkflowNodeId() + "." +
                         identity.getTaskId() + "." +
                         identity.getJobId();

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e873e05..a4e105e 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -38,14 +38,19 @@ import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
 import org.apache.airavata.model.util.ExecutionType;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
@@ -61,6 +66,7 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetai
 import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
 import org.apache.airavata.orchestrator.util.DataModelUtils;
 import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
@@ -86,7 +92,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 	private String gatewayName;
 	private Publisher publisher;
 
-	/**
+    private RabbitMQProcessConsumer rabbitMQProcessConsumer;
+    private RabbitMQProcessPublisher rabbitMQProcessPublisher;
+
+    /**
 	 * Query orchestrator server to fetch the CPI version
 	 */
 	public String getOrchestratorCPIVersion() throws TException {
@@ -152,7 +161,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 			orchestrator.initialize();
 			orchestrator.getOrchestratorContext().setZk(this.zk);
 			orchestrator.getOrchestratorContext().setPublisher(this.publisher);
-		} catch (OrchestratorException e) {
+            startProcessConsumer();
+        } catch (OrchestratorException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing orchestrator service", e);
 		} catch (RegistryException e) {
@@ -161,6 +171,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 		}
 	}
 
+    private void startProcessConsumer() throws OrchestratorException {
+        try {
+            rabbitMQProcessConsumer = new RabbitMQProcessConsumer();
+            ProcessConsumer processConsumer = new ProcessConsumer();
+            Thread thread = new Thread(processConsumer);
+            thread.start();
+
+        } catch (AiravataException e) {
+            throw new OrchestratorException("Error while starting process consumer", e);
+        }
+
+    }
+
     private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException {
         Stat zkStat = zk.exists(orchServer, false);
         if (zkStat == null) {
@@ -627,6 +650,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
         }
         return true;
     }
+
     private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
 //    	try {
 //			WorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
@@ -634,15 +658,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 //		} catch (WorkflowEngineException e) {
 //            log.errorId(experimentId, "Error while launching experiment.", e);
 //        }
-
         try {
-            SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(experimentId, airavataCredStoreToken);
+            SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
+                    experimentId, airavataCredStoreToken,getGatewayName(), getRabbitMQProcessPublisher());
+
             Thread thread = new Thread(simpleWorkflowInterpreter);
             thread.start();
 //            simpleWorkflowInterpreter.run();
         } catch (RegistryException e) {
             log.error("Error while launching workflow", e);
+        } catch (Exception e) {
+            log.error("Error while initializing rabbit mq process publisher");
+        }
+    }
+
+    public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception {
+        if (rabbitMQProcessPublisher == null) {
+            rabbitMQProcessPublisher = new RabbitMQProcessPublisher();
         }
+        return rabbitMQProcessPublisher;
     }
 
 
@@ -732,4 +766,38 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
         }
     }
 
+    private class ProcessConsumer implements Runnable, MessageHandler{
+
+
+        @Override
+        public void run() {
+            try {
+                rabbitMQProcessConsumer.listen(this);
+            } catch (AiravataException e) {
+                log.error("Error while listen to the RabbitMQProcessConsumer");
+            }
+        }
+
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS);
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS);
+            return props;
+        }
+
+        @Override
+        public void onMessage(MessageContext msgCtx) {
+            TBase event = msgCtx.getEvent();
+            if (event instanceof ProcessSubmitEvent) {
+                ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event;
+                try {
+                    launchTask(processSubmitEvent.getTaskId(), processSubmitEvent.getCredentialToken());
+                } catch (TException e) {
+                    log.error("Error while launching task : " + processSubmitEvent.getTaskId());
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/pom.xml
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/pom.xml b/modules/simple-workflow/pom.xml
index 6b36335..5cb9dfb 100644
--- a/modules/simple-workflow/pom.xml
+++ b/modules/simple-workflow/pom.xml
@@ -48,6 +48,11 @@
         </dependency>
         <!-- Messaging dependency -->
         <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>18.0</version>

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
new file mode 100644
index 0000000..849af85
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessContext {
+    private WorkflowNode workflowNode;
+    private WorkflowNodeDetails wfNodeDetails;
+    private TaskDetails taskDetails;
+
+    public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+        this.workflowNode = workflowNode;
+        this.wfNodeDetails = wfNodeDetails;
+        this.taskDetails = taskDetails;
+    }
+
+    public WorkflowNode getWorkflowNode() {
+        return workflowNode;
+    }
+
+    public void setWorkflowNode(WorkflowNode workflowNode) {
+        this.workflowNode = workflowNode;
+    }
+
+    public WorkflowNodeDetails getWfNodeDetails() {
+        return wfNodeDetails;
+    }
+
+    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+        this.wfNodeDetails = wfNodeDetails;
+    }
+
+    public TaskDetails getTaskDetails() {
+        return taskDetails;
+    }
+
+    public void setTaskDetails(TaskDetails taskDetails) {
+        this.taskDetails = taskDetails;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
deleted file mode 100644
index b58b947..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine;
-
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-
-public class ProcessPack {
-    private WorkflowNode workflowNode;
-    private WorkflowNodeDetails wfNodeDetails;
-    private TaskDetails taskDetails;
-
-    public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
-        this.workflowNode = workflowNode;
-        this.wfNodeDetails = wfNodeDetails;
-        this.taskDetails = taskDetails;
-    }
-
-    public WorkflowNode getWorkflowNode() {
-        return workflowNode;
-    }
-
-    public void setWorkflowNode(WorkflowNode workflowNode) {
-        this.workflowNode = workflowNode;
-    }
-
-    public WorkflowNodeDetails getWfNodeDetails() {
-        return wfNodeDetails;
-    }
-
-    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
-        this.wfNodeDetails = wfNodeDetails;
-    }
-
-    public TaskDetails getTaskDetails() {
-        return taskDetails;
-    }
-
-    public void setTaskDetails(TaskDetails taskDetails) {
-        this.taskDetails = taskDetails;
-    }
-}


Mime
View raw message