airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [4/4] git commit: update airavata to use new events
Date Mon, 11 Aug 2014 19:32:44 GMT
update airavata to use new events


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dcc647eb
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dcc647eb
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dcc647eb

Branch: refs/heads/messaging_framework
Commit: dcc647ebc683b9da454ba113765d5521acabd6a7
Parents: 785394d
Author: Chathuri Wimalasena <kamalasini@gmail.com>
Authored: Mon Aug 11 15:31:29 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Mon Aug 11 15:31:29 2014 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        |   2 +-
 .../lib/airavata/messagingEvents_types.cpp      | 106 +++-
 .../lib/airavata/messagingEvents_types.h        |  44 ++
 .../Airavata/Model/Messaging/Event/Types.php    | 125 +++++
 .../messaging/event/TaskOutputChangeEvent.java  | 551 +++++++++++++++++++
 .../messagingEvents.thrift                      |   5 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 111 ++--
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |  35 +-
 .../core/monitor/AiravataTaskStatusUpdator.java |  27 +-
 .../gfac/core/monitor/ExperimentIdentity.java   |  72 +--
 .../airavata/gfac/core/monitor/JobIdentity.java |  78 +--
 .../gfac/core/monitor/TaskIdentity.java         |  76 +--
 .../gfac/core/monitor/WorkflowNodeIdentity.java |  74 +--
 .../state/TaskOutputDataChangedEvent.java       |  48 +-
 .../gfac/local/provider/impl/LocalProvider.java |  44 +-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |  21 +-
 .../monitor/impl/push/amqp/AMQPMonitor.java     |   6 +-
 .../impl/push/amqp/UnRegisterWorker.java        |   5 +-
 .../apache/airavata/job/AMQPMonitorTest.java    |   4 +-
 .../job/QstatMonitorTestWithMyProxyAuth.java    |   6 +-
 .../engine/interpretor/WorkflowInterpreter.java |  12 +-
 21 files changed, 1129 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 903e630..c78390a 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -79,7 +79,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 	                break;
 	        }
 	        if (!updateExperimentStatus){
-				ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentID()));
+				ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
 				updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP);
 	        }
 			updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index 956def7..a96736f 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -449,6 +449,106 @@ void swap(TaskStatusChangeEvent &a, TaskStatusChangeEvent &b) {
   swap(a.taskIdentity, b.taskIdentity);
 }
 
+const char* TaskOutputChangeEvent::ascii_fingerprint = "6488123A3A8B4CF758D069C9B693C7EB";
+const uint8_t TaskOutputChangeEvent::binary_fingerprint[16] = {0x64,0x88,0x12,0x3A,0x3A,0x8B,0x4C,0xF7,0x58,0xD0,0x69,0xC9,0xB6,0x93,0xC7,0xEB};
+
+uint32_t TaskOutputChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+  uint32_t xfer = 0;
+  std::string fname;
+  ::apache::thrift::protocol::TType ftype;
+  int16_t fid;
+
+  xfer += iprot->readStructBegin(fname);
+
+  using ::apache::thrift::protocol::TProtocolException;
+
+  bool isset_output = false;
+  bool isset_taskIdentity = false;
+
+  while (true)
+  {
+    xfer += iprot->readFieldBegin(fname, ftype, fid);
+    if (ftype == ::apache::thrift::protocol::T_STOP) {
+      break;
+    }
+    switch (fid)
+    {
+      case 1:
+        if (ftype == ::apache::thrift::protocol::T_LIST) {
+          {
+            this->output.clear();
+            uint32_t _size3;
+            ::apache::thrift::protocol::TType _etype6;
+            xfer += iprot->readListBegin(_etype6, _size3);
+            this->output.resize(_size3);
+            uint32_t _i7;
+            for (_i7 = 0; _i7 < _size3; ++_i7)
+            {
+              xfer += this->output[_i7].read(iprot);
+            }
+            xfer += iprot->readListEnd();
+          }
+          isset_output = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+          xfer += this->taskIdentity.read(iprot);
+          isset_taskIdentity = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      default:
+        xfer += iprot->skip(ftype);
+        break;
+    }
+    xfer += iprot->readFieldEnd();
+  }
+
+  xfer += iprot->readStructEnd();
+
+  if (!isset_output)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_taskIdentity)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  return xfer;
+}
+
+uint32_t TaskOutputChangeEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+  uint32_t xfer = 0;
+  xfer += oprot->writeStructBegin("TaskOutputChangeEvent");
+
+  xfer += oprot->writeFieldBegin("output", ::apache::thrift::protocol::T_LIST, 1);
+  {
+    xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast<uint32_t>(this->output.size()));
+    std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType> ::const_iterator _iter8;
+    for (_iter8 = this->output.begin(); _iter8 != this->output.end(); ++_iter8)
+    {
+      xfer += (*_iter8).write(oprot);
+    }
+    xfer += oprot->writeListEnd();
+  }
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("taskIdentity", ::apache::thrift::protocol::T_STRUCT, 2);
+  xfer += this->taskIdentity.write(oprot);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldStop();
+  xfer += oprot->writeStructEnd();
+  return xfer;
+}
+
+void swap(TaskOutputChangeEvent &a, TaskOutputChangeEvent &b) {
+  using ::std::swap;
+  swap(a.output, b.output);
+  swap(a.taskIdentity, b.taskIdentity);
+}
+
 const char* JobIdentity::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2";
 const uint8_t JobIdentity::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
 
@@ -588,9 +688,9 @@ uint32_t JobStatusChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot
     {
       case 1:
         if (ftype == ::apache::thrift::protocol::T_I32) {
-          int32_t ecast3;
-          xfer += iprot->readI32(ecast3);
-          this->state = ( ::apache::airavata::model::workspace::experiment::JobState::type)ecast3;
+          int32_t ecast9;
+          xfer += iprot->readI32(ecast9);
+          this->state = ( ::apache::airavata::model::workspace::experiment::JobState::type)ecast9;
           isset_state = true;
         } else {
           xfer += iprot->skip(ftype);

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index c4c07f9..ad3d052 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -263,6 +263,50 @@ class TaskStatusChangeEvent {
 void swap(TaskStatusChangeEvent &a, TaskStatusChangeEvent &b);
 
 
+class TaskOutputChangeEvent {
+ public:
+
+  static const char* ascii_fingerprint; // = "6488123A3A8B4CF758D069C9B693C7EB";
+  static const uint8_t binary_fingerprint[16]; // = {0x64,0x88,0x12,0x3A,0x3A,0x8B,0x4C,0xF7,0x58,0xD0,0x69,0xC9,0xB6,0x93,0xC7,0xEB};
+
+  TaskOutputChangeEvent() {
+  }
+
+  virtual ~TaskOutputChangeEvent() throw() {}
+
+  std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType>  output;
+  TaskIdentity taskIdentity;
+
+  void __set_output(const std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType> & val) {
+    output = val;
+  }
+
+  void __set_taskIdentity(const TaskIdentity& val) {
+    taskIdentity = val;
+  }
+
+  bool operator == (const TaskOutputChangeEvent & rhs) const
+  {
+    if (!(output == rhs.output))
+      return false;
+    if (!(taskIdentity == rhs.taskIdentity))
+      return false;
+    return true;
+  }
+  bool operator != (const TaskOutputChangeEvent &rhs) const {
+    return !(*this == rhs);
+  }
+
+  bool operator < (const TaskOutputChangeEvent & ) const;
+
+  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(TaskOutputChangeEvent &a, TaskOutputChangeEvent &b);
+
+
 class JobIdentity {
  public:
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index c72d900..7bcf528 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -507,6 +507,131 @@ class TaskStatusChangeEvent {
 
 }
 
+class TaskOutputChangeEvent {
+  static $_TSPEC;
+
+  public $output = null;
+  public $taskIdentity = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        1 => array(
+          'var' => 'output',
+          'type' => TType::LST,
+          'etype' => TType::STRUCT,
+          'elem' => array(
+            'type' => TType::STRUCT,
+            'class' => '\Airavata\Model\Workspace\Experiment\DataObjectType',
+            ),
+          ),
+        2 => array(
+          'var' => 'taskIdentity',
+          'type' => TType::STRUCT,
+          'class' => '\Airavata\Model\Messaging\Event\TaskIdentity',
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['output'])) {
+        $this->output = $vals['output'];
+      }
+      if (isset($vals['taskIdentity'])) {
+        $this->taskIdentity = $vals['taskIdentity'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'TaskOutputChangeEvent';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 1:
+          if ($ftype == TType::LST) {
+            $this->output = array();
+            $_size0 = 0;
+            $_etype3 = 0;
+            $xfer += $input->readListBegin($_etype3, $_size0);
+            for ($_i4 = 0; $_i4 < $_size0; ++$_i4)
+            {
+              $elem5 = null;
+              $elem5 = new \Airavata\Model\Workspace\Experiment\DataObjectType();
+              $xfer += $elem5->read($input);
+              $this->output []= $elem5;
+            }
+            $xfer += $input->readListEnd();
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
+          if ($ftype == TType::STRUCT) {
+            $this->taskIdentity = new \Airavata\Model\Messaging\Event\TaskIdentity();
+            $xfer += $this->taskIdentity->read($input);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('TaskOutputChangeEvent');
+    if ($this->output !== null) {
+      if (!is_array($this->output)) {
+        throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+      }
+      $xfer += $output->writeFieldBegin('output', TType::LST, 1);
+      {
+        $output->writeListBegin(TType::STRUCT, count($this->output));
+        {
+          foreach ($this->output as $iter6)
+          {
+            $xfer += $iter6->write($output);
+          }
+        }
+        $output->writeListEnd();
+      }
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->taskIdentity !== null) {
+      if (!is_object($this->taskIdentity)) {
+        throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+      }
+      $xfer += $output->writeFieldBegin('taskIdentity', TType::STRUCT, 2);
+      $xfer += $this->taskIdentity->write($output);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
 class JobIdentity {
   static $_TSPEC;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java
new file mode 100644
index 0000000..d73ab63
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class TaskOutputChangeEvent implements org.apache.thrift.TBase<TaskOutputChangeEvent, TaskOutputChangeEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskOutputChangeEvent> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskOutputChangeEvent");
+
+  private static final org.apache.thrift.protocol.TField OUTPUT_FIELD_DESC = new org.apache.thrift.protocol.TField("output", org.apache.thrift.protocol.TType.LIST, (short)1);
+  private static final org.apache.thrift.protocol.TField TASK_IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("taskIdentity", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new TaskOutputChangeEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new TaskOutputChangeEventTupleSchemeFactory());
+  }
+
+  private List<org.apache.airavata.model.workspace.experiment.DataObjectType> output; // required
+  private TaskIdentity taskIdentity; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    OUTPUT((short)1, "output"),
+    TASK_IDENTITY((short)2, "taskIdentity");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // OUTPUT
+          return OUTPUT;
+        case 2: // TASK_IDENTITY
+          return TASK_IDENTITY;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.OUTPUT, new org.apache.thrift.meta_data.FieldMetaData("output", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.airavata.model.workspace.experiment.DataObjectType.class))));
+    tmpMap.put(_Fields.TASK_IDENTITY, new org.apache.thrift.meta_data.FieldMetaData("taskIdentity", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TaskIdentity.class)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskOutputChangeEvent.class, metaDataMap);
+  }
+
+  public TaskOutputChangeEvent() {
+  }
+
+  public TaskOutputChangeEvent(
+    List<org.apache.airavata.model.workspace.experiment.DataObjectType> output,
+    TaskIdentity taskIdentity)
+  {
+    this();
+    this.output = output;
+    this.taskIdentity = taskIdentity;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public TaskOutputChangeEvent(TaskOutputChangeEvent other) {
+    if (other.isSetOutput()) {
+      List<org.apache.airavata.model.workspace.experiment.DataObjectType> __this__output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(other.output.size());
+      for (org.apache.airavata.model.workspace.experiment.DataObjectType other_element : other.output) {
+        __this__output.add(new org.apache.airavata.model.workspace.experiment.DataObjectType(other_element));
+      }
+      this.output = __this__output;
+    }
+    if (other.isSetTaskIdentity()) {
+      this.taskIdentity = new TaskIdentity(other.taskIdentity);
+    }
+  }
+
+  public TaskOutputChangeEvent deepCopy() {
+    return new TaskOutputChangeEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    this.output = null;
+    this.taskIdentity = null;
+  }
+
+  public int getOutputSize() {
+    return (this.output == null) ? 0 : this.output.size();
+  }
+
+  public java.util.Iterator<org.apache.airavata.model.workspace.experiment.DataObjectType> getOutputIterator() {
+    return (this.output == null) ? null : this.output.iterator();
+  }
+
+  public void addToOutput(org.apache.airavata.model.workspace.experiment.DataObjectType elem) {
+    if (this.output == null) {
+      this.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>();
+    }
+    this.output.add(elem);
+  }
+
+  public List<org.apache.airavata.model.workspace.experiment.DataObjectType> getOutput() {
+    return this.output;
+  }
+
+  public void setOutput(List<org.apache.airavata.model.workspace.experiment.DataObjectType> output) {
+    this.output = output;
+  }
+
+  public void unsetOutput() {
+    this.output = null;
+  }
+
+  /** Returns true if field output is set (has been assigned a value) and false otherwise */
+  public boolean isSetOutput() {
+    return this.output != null;
+  }
+
+  public void setOutputIsSet(boolean value) {
+    if (!value) {
+      this.output = null;
+    }
+  }
+
+  public TaskIdentity getTaskIdentity() {
+    return this.taskIdentity;
+  }
+
+  public void setTaskIdentity(TaskIdentity taskIdentity) {
+    this.taskIdentity = taskIdentity;
+  }
+
+  public void unsetTaskIdentity() {
+    this.taskIdentity = null;
+  }
+
+  /** Returns true if field taskIdentity is set (has been assigned a value) and false otherwise */
+  public boolean isSetTaskIdentity() {
+    return this.taskIdentity != null;
+  }
+
+  public void setTaskIdentityIsSet(boolean value) {
+    if (!value) {
+      this.taskIdentity = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case OUTPUT:
+      if (value == null) {
+        unsetOutput();
+      } else {
+        setOutput((List<org.apache.airavata.model.workspace.experiment.DataObjectType>)value);
+      }
+      break;
+
+    case TASK_IDENTITY:
+      if (value == null) {
+        unsetTaskIdentity();
+      } else {
+        setTaskIdentity((TaskIdentity)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case OUTPUT:
+      return getOutput();
+
+    case TASK_IDENTITY:
+      return getTaskIdentity();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case OUTPUT:
+      return isSetOutput();
+    case TASK_IDENTITY:
+      return isSetTaskIdentity();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof TaskOutputChangeEvent)
+      return this.equals((TaskOutputChangeEvent)that);
+    return false;
+  }
+
+  public boolean equals(TaskOutputChangeEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_output = true && this.isSetOutput();
+    boolean that_present_output = true && that.isSetOutput();
+    if (this_present_output || that_present_output) {
+      if (!(this_present_output && that_present_output))
+        return false;
+      if (!this.output.equals(that.output))
+        return false;
+    }
+
+    boolean this_present_taskIdentity = true && this.isSetTaskIdentity();
+    boolean that_present_taskIdentity = true && that.isSetTaskIdentity();
+    if (this_present_taskIdentity || that_present_taskIdentity) {
+      if (!(this_present_taskIdentity && that_present_taskIdentity))
+        return false;
+      if (!this.taskIdentity.equals(that.taskIdentity))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(TaskOutputChangeEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetOutput()).compareTo(other.isSetOutput());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetOutput()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.output, other.output);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetTaskIdentity()).compareTo(other.isSetTaskIdentity());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTaskIdentity()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskIdentity, other.taskIdentity);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("TaskOutputChangeEvent(");
+    boolean first = true;
+
+    sb.append("output:");
+    if (this.output == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.output);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("taskIdentity:");
+    if (this.taskIdentity == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.taskIdentity);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!isSetOutput()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'output' is unset! Struct:" + toString());
+    }
+
+    if (!isSetTaskIdentity()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskIdentity' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+    if (taskIdentity != null) {
+      taskIdentity.validate();
+    }
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class TaskOutputChangeEventStandardSchemeFactory implements SchemeFactory {
+    public TaskOutputChangeEventStandardScheme getScheme() {
+      return new TaskOutputChangeEventStandardScheme();
+    }
+  }
+
+  private static class TaskOutputChangeEventStandardScheme extends StandardScheme<TaskOutputChangeEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // OUTPUT
+            if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+              {
+                org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+                struct.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(_list0.size);
+                for (int _i1 = 0; _i1 < _list0.size; ++_i1)
+                {
+                  org.apache.airavata.model.workspace.experiment.DataObjectType _elem2;
+                  _elem2 = new org.apache.airavata.model.workspace.experiment.DataObjectType();
+                  _elem2.read(iprot);
+                  struct.output.add(_elem2);
+                }
+                iprot.readListEnd();
+              }
+              struct.setOutputIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // TASK_IDENTITY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.taskIdentity = new TaskIdentity();
+              struct.taskIdentity.read(iprot);
+              struct.setTaskIdentityIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.output != null) {
+        oprot.writeFieldBegin(OUTPUT_FIELD_DESC);
+        {
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.output.size()));
+          for (org.apache.airavata.model.workspace.experiment.DataObjectType _iter3 : struct.output)
+          {
+            _iter3.write(oprot);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      if (struct.taskIdentity != null) {
+        oprot.writeFieldBegin(TASK_IDENTITY_FIELD_DESC);
+        struct.taskIdentity.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class TaskOutputChangeEventTupleSchemeFactory implements SchemeFactory {
+    public TaskOutputChangeEventTupleScheme getScheme() {
+      return new TaskOutputChangeEventTupleScheme();
+    }
+  }
+
+  private static class TaskOutputChangeEventTupleScheme extends TupleScheme<TaskOutputChangeEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      {
+        oprot.writeI32(struct.output.size());
+        for (org.apache.airavata.model.workspace.experiment.DataObjectType _iter4 : struct.output)
+        {
+          _iter4.write(oprot);
+        }
+      }
+      struct.taskIdentity.write(oprot);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      {
+        org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+        struct.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(_list5.size);
+        for (int _i6 = 0; _i6 < _list5.size; ++_i6)
+        {
+          org.apache.airavata.model.workspace.experiment.DataObjectType _elem7;
+          _elem7 = new org.apache.airavata.model.workspace.experiment.DataObjectType();
+          _elem7.read(iprot);
+          struct.output.add(_elem7);
+        }
+      }
+      struct.setOutputIsSet(true);
+      struct.taskIdentity = new TaskIdentity();
+      struct.taskIdentity.read(iprot);
+      struct.setTaskIdentityIsSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index ebe1c57..38c25a4 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -50,6 +50,11 @@ struct TaskStatusChangeEvent {
     2: required TaskIdentity taskIdentity;
 }
 
+struct TaskOutputChangeEvent {
+    1: required list<experimentModel.DataObjectType> output;
+    2: required TaskIdentity taskIdentity;
+}
+
 struct JobIdentity {
     1: required string jobId;
     2: required string taskId;

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index a68a302..e843e4d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -20,16 +20,6 @@
 */
 package org.apache.airavata.gfac.core.cpi;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPathExpressionException;
-
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.client.api.AiravataAPI;
@@ -48,20 +38,9 @@ import org.apache.airavata.gfac.Scheduler;
 import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.GFacHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.GFacRecoverableHandler;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
-import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.handler.*;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
 import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
 import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
 import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
@@ -75,36 +54,16 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
-import org.apache.airavata.model.workspace.experiment.DataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.*;
 import org.apache.airavata.schemas.gfac.DataType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
-import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.InputParameterType;
-import org.apache.airavata.schemas.gfac.JobTypeType;
-import org.apache.airavata.schemas.gfac.OutputParameterType;
-import org.apache.airavata.schemas.gfac.ParameterType;
-import org.apache.airavata.schemas.gfac.ProjectAccountType;
-import org.apache.airavata.schemas.gfac.QueueType;
-import org.apache.airavata.schemas.gfac.SSHHostType;
-import org.apache.airavata.schemas.gfac.ServiceDescriptionType;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
@@ -112,6 +71,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent;
+
 /**
  * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
  * the resource, required data for the job has to be stored in registry prior to invoke this object.
@@ -582,17 +552,21 @@ public class BetterGfacImpl implements GFac {
 				// jobExecutionContext.getTaskData().getTaskID()),
 				// TaskState.FAILED
 				// ));
-				monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(),
-						jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext
-								.getJobDetails().getJobID()), JobState.FAILED));
+                org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity(
+                        jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID());
+				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
 			} catch (NullPointerException e1) {
 				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
 						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
 //				monitorPublisher
 //						.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
 				// Updating the task status if there's any task associated
-				monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext
-						.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED));
+                org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID());
+				monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
 			}
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
@@ -640,16 +614,19 @@ public class BetterGfacImpl implements GFac {
 				// jobExecutionContext.getTaskData().getTaskID()),
 				// TaskState.FAILED
 				// ));
-				monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(),
-						jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext
-								.getJobDetails().getJobID()), JobState.FAILED));
+                org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity(
+                        jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID());
+				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
 			} catch (NullPointerException e1) {
 				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
 						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
 				//monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
 				// Updating the task status if there's any task associated
-				monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext
-						.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED));
+                org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID());
+                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
 			}
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
@@ -824,11 +801,10 @@ public class BetterGfacImpl implements GFac {
 //                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
 //                ExperimentState.COMPLETED));
         // Updating the task status if there's any task associated
-        monitorPublisher.publish(new TaskStatusChangeRequest(
-                new TaskIdentity(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-        ));
+        org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID());
+        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
     }
 
@@ -953,12 +929,11 @@ public class BetterGfacImpl implements GFac {
 //                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
 //                ExperimentState.COMPLETED));
         // Updating the task status if there's any task associated
-        
-        monitorPublisher.publish(new TaskStatusChangedEvent(
-                new TaskIdentity(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-        ));
+
+        org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID());
+        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index d370924..4a58f68 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -50,14 +50,6 @@ import org.apache.airavata.gfac.core.handler.GFacHandler;
 import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
-import org.apache.airavata.gfac.core.monitor.JobIdentity;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
 import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
 import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
 import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
@@ -65,9 +57,12 @@ import org.apache.airavata.gfac.core.provider.GFacProvider;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.model.messaging.event.JobIdentity;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentity;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -331,10 +326,11 @@ public class GFacImpl implements GFac {
             }
         } catch (Exception e) {
             try {
-                monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
-                        new JobIdentity(jobExecutionContext.getExperimentID(),
+                JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(),
+                        jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
+                        jobExecutionContext.getExperimentID());
+                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED,jobIdentity));
             } catch (NullPointerException e1) {
                 log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
                         "NullPointerException occurred because at this point there might not have Job Created", e1, e);
@@ -342,8 +338,10 @@ public class GFacImpl implements GFac {
 //				monitorPublisher
 //						.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
 				// Updating the task status if there's any task associated
-				monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext
-						.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED));
+                TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID());
+                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
             }
             jobExecutionContext.setProperty(ERROR_SENT, "true");
@@ -460,11 +458,10 @@ public class GFacImpl implements GFac {
 //                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
 //                ExperimentState.COMPLETED));
         // Updating the task status if there's any task associated
-        monitorPublisher.publish(new TaskStatusChangeRequest(
-                new TaskIdentity(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-        ));
+        TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID());
+        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 56fbb1c..e6ab5ef 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -24,8 +24,6 @@ import java.util.Calendar;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.messaging.event.TaskIdentity;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -51,17 +49,17 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
     public void setAiravataRegistry(Registry airavataRegistry) {
         this.airavataRegistry = airavataRegistry;
     }
-    
-    @Subscribe
-    public void setupTaskStatus(TaskStatusChangeRequest taskStatus){
-    	try {
-			updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState());
-			logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString());
-			monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState()));
-		} catch (Exception e) {
-            logger.error("Error persisting data" + e.getLocalizedMessage(), e);
-		}
-    }
+//
+//    @Subscribe
+//    public void setupTaskStatus(TaskStatusChangeRequest taskStatus){
+//    	try {
+//			updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState());
+//			logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString());
+//			monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState()));
+//		} catch (Exception e) {
+//            logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+//		}
+//    }
 
     @Subscribe
     public void setupTaskStatus(JobStatusChangeEvent jobStatus){
@@ -95,7 +93,8 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
                                                          jobStatus.getJobIdentity().getWorkflowNodeId(),
                                                          jobStatus.getJobIdentity().getExperimentId());
             monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
-		} catch (Exception e) {
+
+        } catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
 		}
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java
index e8d22f7..d5f043f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java
@@ -1,36 +1,36 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-// */
-//
-//package org.apache.airavata.gfac.core.monitor;
-//
-//public class ExperimentIdentity {
-//	private String experimentID;
-//	public ExperimentIdentity(String experimentId) {
-//		setExperimentID(experimentId);
-//	}
-//	public String getExperimentID() {
-//		return experimentID;
-//	}
-//
-//	public void setExperimentID(String experimentID) {
-//		this.experimentID = experimentID;
-//	}
-//}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.core.monitor;
+
+public class ExperimentIdentity {
+	private String experimentID;
+	public ExperimentIdentity(String experimentId) {
+		setExperimentID(experimentId);
+	}
+	public String getExperimentID() {
+		return experimentID;
+	}
+
+	public void setExperimentID(String experimentID) {
+		this.experimentID = experimentID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java
index 1773ff1..304e3eb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java
@@ -1,39 +1,39 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-// */
-//
-//package org.apache.airavata.gfac.core.monitor;
-//
-//public class JobIdentity extends TaskIdentity {
-//	private String jobId;
-//
-//	public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
-//		super(experimentId,workflowNodeId,taskId);
-//		setJobId(jobId);
-//	}
-//
-//	public String getJobId() {
-//		return jobId;
-//	}
-//
-//	public void setJobId(String jobId) {
-//		this.jobId = jobId;
-//	}
-//}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.core.monitor;
+
+public class JobIdentity extends TaskIdentity {
+	private String jobId;
+
+	public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+		super(experimentId,workflowNodeId,taskId);
+		setJobId(jobId);
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(String jobId) {
+		this.jobId = jobId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java
index 8448437..db03348 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java
@@ -1,38 +1,38 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-// */
-//
-//package org.apache.airavata.gfac.core.monitor;
-//
-//public class TaskIdentity extends WorkflowNodeIdentity {
-//	private String taskId;
-//
-//	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
-//		super(experimentId,workflowNodeId);
-//		setTaskId(taskId);
-//	}
-//	public String getTaskId() {
-//		return taskId;
-//	}
-//
-//	public void setTaskId(String taskId) {
-//		this.taskId = taskId;
-//	}
-//}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.core.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+	private String taskId;
+
+	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+		super(experimentId,workflowNodeId);
+		setTaskId(taskId);
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java
index ebdc372..e15f733 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java
@@ -1,37 +1,37 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-// */
-//
-//package org.apache.airavata.gfac.core.monitor;
-//
-//public class WorkflowNodeIdentity extends ExperimentIdentity {
-//	private String workflowNodeID;
-//	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
-//		super(experimentId);
-//		setWorkflowNodeID(workflowNodeId);
-//	}
-//	public String getWorkflowNodeID() {
-//		return workflowNodeID;
-//	}
-//
-//	public void setWorkflowNodeID(String workflowNodeID) {
-//		this.workflowNodeID = workflowNodeID;
-//	}
-//}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.core.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+	private String workflowNodeID;
+	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+		super(experimentId);
+		setWorkflowNodeID(workflowNodeId);
+	}
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java
index cccca30..db7ee59 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java
@@ -1,22 +1,22 @@
 ///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
+//*
+//* Licensed to the Apache Software Foundation (ASF) under one
+//* or more contributor license agreements.  See the NOTICE file
+//* distributed with this work for additional information
+//* regarding copyright ownership.  The ASF licenses this file
+//* to you under the Apache License, Version 2.0 (the
+//* "License"); you may not use this file except in compliance
+//* with the License.  You may obtain a copy of the License at
+//*
+//*   http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing,
+//* software distributed under the License is distributed on an
+//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//* KIND, either express or implied.  See the License for the
+//* specific language governing permissions and limitations
+//* under the License.
+//*
 //*/
 //package org.apache.airavata.gfac.core.monitor.state;
 //
@@ -27,12 +27,12 @@
 //import org.apache.airavata.model.workspace.experiment.DataObjectType;
 //
 ///**
-// * This is the primary job state object used in
-// * through out the monitor module. This use airavata-data-model JobState enum
-// * Ideally after processing each event or monitoring message from remote system
-// * Each monitoring implementation has to return this object with a state and
-// * the monitoring ID
-// */
+//* This is the primary job state object used in
+//* through out the monitor module. This use airavata-data-model JobState enum
+//* Ideally after processing each event or monitoring message from remote system
+//* Each monitoring implementation has to return this object with a state and
+//* the monitoring ID
+//*/
 //public class TaskOutputDataChangedEvent extends AbstractStateChangeRequest {
 //    private List<DataObjectType> output;
 //    private TaskIdentity identity;

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 514f901..5c86560 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -29,10 +29,6 @@ import java.util.Map;
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskOutputDataChangedEvent;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
@@ -41,6 +37,10 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.core.utils.OutputUtils;
 import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
 import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.messaging.event.JobIdentity;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentity;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -173,10 +173,12 @@ public class LocalProvider extends AbstractProvider {
             log.info(buf.toString());
 
             // updating the job status to complete because there's nothing to monitor in local jobs
-            MonitorID monitorID = createMonitorID(jobExecutionContext);
-            JobStatusChangeRequest jobStatusChangeRequest = new JobStatusChangeRequest(monitorID);
-            jobStatusChangeRequest.setState(JobState.COMPLETE);
-            this.getMonitorPublisher().publish(jobStatusChangeRequest);
+//            MonitorID monitorID = createMonitorID(jobExecutionContext);
+            JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(),
+                    jobExecutionContext.getTaskData().getTaskID(),
+                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                    jobExecutionContext.getExperimentID());
+            this.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
         } catch (IOException io) {
             throw new GFacProviderException(io.getMessage(), io);
         } catch (InterruptedException e) {
@@ -186,13 +188,13 @@ public class LocalProvider extends AbstractProvider {
         }
     }
 
-	private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
-		MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
-		        jobExecutionContext.getTaskData().getTaskID(),
-		        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
-		        jobExecutionContext.getExperiment().getUserName(),jobId);
-		return monitorID;
-	}
+//	private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
+//		MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
+//		        jobExecutionContext.getTaskData().getTaskID(),
+//		        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+//		        jobExecutionContext.getExperiment().getUserName(),jobId);
+//		return monitorID;
+//	}
 
 //	private void saveApplicationJob(JobExecutionContext jobExecutionContext)
 //			throws GFacProviderException {
@@ -225,11 +227,15 @@ public class LocalProvider extends AbstractProvider {
 			Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
             OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
             TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
-            taskDetails.setApplicationOutputs(outputArray);
-            registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
+            if (taskDetails != null){
+                taskDetails.setApplicationOutputs(outputArray);
+                registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
+            }
             registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
-            MonitorID monitorId = createMonitorID(jobExecutionContext);
-            getMonitorPublisher().publish(new TaskOutputDataChangedEvent(new TaskIdentity(monitorId.getExperimentID(), monitorId.getWorkflowNodeID(), monitorId.getTaskID()), outputArray));
+            TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(),
+                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                    jobExecutionContext.getExperimentID());
+            getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
         } catch (XmlException e) {
             throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
         } catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2fea154..591bca6 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -36,9 +36,6 @@ import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -46,6 +43,9 @@ import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.model.messaging.event.JobIdentity;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -139,7 +139,7 @@ public class HPCPullMonitor extends PullMonitor {
         //todo this polling will not work with multiple usernames but with single user
         // and multiple hosts, currently monitoring will work
         UserMonitorData take = null;
-        JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
+        JobStatusChangeEvent jobStatus = new JobStatusChangeEvent();
         MonitorID currentMonitorID = null;
         HostDescription currentHostDescription = null;
         try {
@@ -164,7 +164,8 @@ public class HPCPullMonitor extends PullMonitor {
                     for (MonitorID iMonitorID : monitorID) {
                         currentMonitorID = iMonitorID;
                         iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
-                        jobStatus = new JobStatusChangeRequest(iMonitorID);
+                        jobStatus.setJobIdentity(new JobIdentity(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID()));
+                        jobStatus.setState(iMonitorID.getStatus());
                         // we have this JobStatus class to handle amqp monitoring
 
                         publisher.publish(jobStatus);
@@ -177,8 +178,9 @@ public class HPCPullMonitor extends PullMonitor {
                             try {
                                 gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
                             } catch (GFacException e) {
-                            	publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
-										iMonitorID.getTaskID()), TaskState.FAILED));
+                                org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(),
+                                        iMonitorID.getExperimentID());
+                            	publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
                             	//FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status 
                             	//should be done understanding whole workflow of job submission and data transfer
 //                            	publisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(iMonitorID.getExperimentID()),
@@ -193,8 +195,9 @@ public class HPCPullMonitor extends PullMonitor {
                                 logger.error("Launching outflow handlers to check output are genereated or not");
                                 gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
                             } catch (GFacException e) {
-                                publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
-                                        iMonitorID.getTaskID()), TaskState.FAILED));
+                                org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(),
+                                        iMonitorID.getExperimentID());
+                                publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
                                 logger.info(e.getLocalizedMessage(), e);
                             }
                         } else {

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index 010d3bc..72338fe 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -33,11 +33,11 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.monitor.JobIdentity;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.monitor.core.PushMonitor;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,7 +202,9 @@ public class AMQPMonitor extends PushMonitor {
             }
         }
         next.setStatus(monitorID.getStatus());
-        publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
+        org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity(next.getJobID(),
+                                                                            next.getTaskID(), next.getWorkflowNodeID(), next.getExperimentID());
+        publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
         return true;
     }
     @Override


Mime
View raw message