Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB856173E7 for ; Wed, 25 Feb 2015 22:13:30 +0000 (UTC) Received: (qmail 77899 invoked by uid 500); 25 Feb 2015 22:13:30 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 77788 invoked by uid 500); 25 Feb 2015 22:13:30 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 77749 invoked by uid 99); 25 Feb 2015 22:13:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Feb 2015 22:13:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 56A19E0B33; Wed, 25 Feb 2015 22:13:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Wed, 25 Feb 2015 22:13:33 -0000 Message-Id: <1f067266a9ab48e29c1cff82bc8d517b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/4] airavata git commit: Fixed AIRAVATA-1591 , AIRAVATA-1592 , AIRAVATA-1593 Fixed AIRAVATA-1591 ,AIRAVATA-1592 , AIRAVATA-1593 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d25441a0 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d25441a0 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d25441a0 Branch: refs/heads/new-workflow-design-rabbitmq Commit: d25441a017f2e443165a22c9c5966a6d9e6aad9e Parents: 97ff3b7 Author: shamrath Authored: Wed Feb 25 17:12:18 2015 -0500 Committer: shamrath Committed: Wed Feb 25 17:12:18 2015 -0500 ---------------------------------------------------------------------- .../lib/airavata/messagingEvents_types.cpp | 88 +++- .../lib/airavata/messagingEvents_types.h | 47 +- .../Airavata/Model/Messaging/Event/Types.php | 94 ++++ .../model/messaging/event/MessageType.java | 5 +- .../messaging/event/ProcessSubmitEvent.java | 492 +++++++++++++++++++ .../messagingEvents.thrift | 8 +- .../core/monitor/AiravataTaskStatusUpdator.java | 13 + .../core/impl/RabbitMQProcessConsumer.java | 158 ++++++ .../core/impl/RabbitMQProcessPublisher.java | 84 ++++ .../core/impl/RabbitMQStatusConsumer.java | 10 +- .../core/impl/RabbitMQStatusPublisher.java | 20 +- .../server/OrchestratorServerHandler.java | 76 ++- modules/simple-workflow/pom.xml | 5 + .../simple/workflow/engine/ProcessContext.java | 62 +++ .../simple/workflow/engine/ProcessPack.java | 62 --- .../engine/SimpleWorkflowInterpreter.java | 378 +++++++------- .../workflow/engine/WorkflowFactoryImpl.java | 4 +- .../engine/parser/AiravataDefaultParser.java | 293 ----------- .../engine/parser/AiravataWorkflowParser.java | 291 +++++++++++ .../parser/AiravataDefaultParserTest.java | 119 ----- .../parser/AiravataWorkflowParserTest.java | 119 +++++ 21 files changed, 1724 insertions(+), 704 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp index 71f45be..92f29c6 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp @@ -47,7 +47,8 @@ int _kMessageTypeValues[] = { MessageType::WORKFLOWNODE, MessageType::JOB, MessageType::LAUNCHTASK, - MessageType::TERMINATETASK + MessageType::TERMINATETASK, + MessageType::TASKOUTPUT }; const char* _kMessageTypeNames[] = { "EXPERIMENT", @@ -55,9 +56,10 @@ const char* _kMessageTypeNames[] = { "WORKFLOWNODE", "JOB", "LAUNCHTASK", - "TERMINATETASK" + "TERMINATETASK", + "TASKOUTPUT" }; -const std::map _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); const char* ExperimentStatusChangeEvent::ascii_fingerprint = "38C252E94E93B69D04EB3A6EE2F9EDFB"; const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x38,0xC2,0x52,0xE9,0x4E,0x93,0xB6,0x9D,0x04,0xEB,0x3A,0x6E,0xE2,0xF9,0xED,0xFB}; @@ -839,6 +841,86 @@ void swap(JobIdentifier &a, JobIdentifier &b) { swap(a.gatewayId, b.gatewayId); } +const char* ProcessSubmitEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972"; +const uint8_t ProcessSubmitEvent::binary_fingerprint[16] = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72}; + +uint32_t ProcessSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_taskId = false; + bool isset_credentialToken = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->taskId); + isset_taskId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->credentialToken); + isset_credentialToken = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_taskId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_credentialToken) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t ProcessSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("ProcessSubmitEvent"); + + xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->taskId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("credentialToken", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->credentialToken); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b) { + using ::std::swap; + swap(a.taskId, b.taskId); + swap(a.credentialToken, b.credentialToken); +} + const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2"; const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2}; http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h index c7e2bb5..aafcda1 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h @@ -54,7 +54,8 @@ struct MessageType { WORKFLOWNODE = 2, JOB = 3, LAUNCHTASK = 4, - TERMINATETASK = 5 + TERMINATETASK = 5, + TASKOUTPUT = 6 }; }; @@ -462,6 +463,50 @@ class JobIdentifier { void swap(JobIdentifier &a, JobIdentifier &b); +class ProcessSubmitEvent { + public: + + static const char* ascii_fingerprint; // = "07A9615F837F7D0A952B595DD3020972"; + static const uint8_t binary_fingerprint[16]; // = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72}; + + ProcessSubmitEvent() : taskId(), credentialToken() { + } + + virtual ~ProcessSubmitEvent() throw() {} + + std::string taskId; + std::string credentialToken; + + void __set_taskId(const std::string& val) { + taskId = val; + } + + void __set_credentialToken(const std::string& val) { + credentialToken = val; + } + + bool operator == (const ProcessSubmitEvent & rhs) const + { + if (!(taskId == rhs.taskId)) + return false; + if (!(credentialToken == rhs.credentialToken)) + return false; + return true; + } + bool operator != (const ProcessSubmitEvent &rhs) const { + return !(*this == rhs); + } + + bool operator < (const ProcessSubmitEvent & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b); + + class TaskSubmitEvent { public: http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index b0d7676..4c4ec93 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -37,6 +37,7 @@ final class MessageType { const JOB = 3; const LAUNCHTASK = 4; const TERMINATETASK = 5; + const TASKOUTPUT = 6; static public $__names = array( 0 => 'EXPERIMENT', 1 => 'TASK', @@ -44,6 +45,7 @@ final class MessageType { 3 => 'JOB', 4 => 'LAUNCHTASK', 5 => 'TERMINATETASK', + 6 => 'TASKOUTPUT', ); } @@ -971,6 +973,98 @@ class JobIdentifier { } +class ProcessSubmitEvent { + static $_TSPEC; + + public $taskId = null; + public $credentialToken = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'taskId', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'credentialToken', + 'type' => TType::STRING, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['taskId'])) { + $this->taskId = $vals['taskId']; + } + if (isset($vals['credentialToken'])) { + $this->credentialToken = $vals['credentialToken']; + } + } + } + + public function getName() { + return 'ProcessSubmitEvent'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->taskId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->credentialToken); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ProcessSubmitEvent'); + if ($this->taskId !== null) { + $xfer += $output->writeFieldBegin('taskId', TType::STRING, 1); + $xfer += $output->writeString($this->taskId); + $xfer += $output->writeFieldEnd(); + } + if ($this->credentialToken !== null) { + $xfer += $output->writeFieldBegin('credentialToken', TType::STRING, 2); + $xfer += $output->writeString($this->credentialToken); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + class TaskSubmitEvent { static $_TSPEC; http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java index 230b87b..761626f 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java @@ -34,7 +34,8 @@ import org.apache.thrift.TEnum; WORKFLOWNODE(2), JOB(3), LAUNCHTASK(4), - TERMINATETASK(5); + TERMINATETASK(5), + TASKOUTPUT(6); private final int value; @@ -67,6 +68,8 @@ import org.apache.thrift.TEnum; return LAUNCHTASK; case 5: return TERMINATETASK; + case 6: + return TASKOUTPUT; default: return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java new file mode 100644 index 0000000..e1d001a --- /dev/null +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.model.messaging.event; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class ProcessSubmitEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ProcessSubmitEvent"); + + private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField CREDENTIAL_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("credentialToken", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new ProcessSubmitEventStandardSchemeFactory()); + schemes.put(TupleScheme.class, new ProcessSubmitEventTupleSchemeFactory()); + } + + private String taskId; // required + private String credentialToken; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { + TASK_ID((short)1, "taskId"), + CREDENTIAL_TOKEN((short)2, "credentialToken"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // TASK_ID + return TASK_ID; + case 2: // CREDENTIAL_TOKEN + return CREDENTIAL_TOKEN; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.CREDENTIAL_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("credentialToken", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ProcessSubmitEvent.class, metaDataMap); + } + + public ProcessSubmitEvent() { + } + + public ProcessSubmitEvent( + String taskId, + String credentialToken) + { + this(); + this.taskId = taskId; + this.credentialToken = credentialToken; + } + + /** + * Performs a deep copy on other. + */ + public ProcessSubmitEvent(ProcessSubmitEvent other) { + if (other.isSetTaskId()) { + this.taskId = other.taskId; + } + if (other.isSetCredentialToken()) { + this.credentialToken = other.credentialToken; + } + } + + public ProcessSubmitEvent deepCopy() { + return new ProcessSubmitEvent(this); + } + + @Override + public void clear() { + this.taskId = null; + this.credentialToken = null; + } + + public String getTaskId() { + return this.taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void unsetTaskId() { + this.taskId = null; + } + + /** Returns true if field taskId is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskId() { + return this.taskId != null; + } + + public void setTaskIdIsSet(boolean value) { + if (!value) { + this.taskId = null; + } + } + + public String getCredentialToken() { + return this.credentialToken; + } + + public void setCredentialToken(String credentialToken) { + this.credentialToken = credentialToken; + } + + public void unsetCredentialToken() { + this.credentialToken = null; + } + + /** Returns true if field credentialToken is set (has been assigned a value) and false otherwise */ + public boolean isSetCredentialToken() { + return this.credentialToken != null; + } + + public void setCredentialTokenIsSet(boolean value) { + if (!value) { + this.credentialToken = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case TASK_ID: + if (value == null) { + unsetTaskId(); + } else { + setTaskId((String)value); + } + break; + + case CREDENTIAL_TOKEN: + if (value == null) { + unsetCredentialToken(); + } else { + setCredentialToken((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case TASK_ID: + return getTaskId(); + + case CREDENTIAL_TOKEN: + return getCredentialToken(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case TASK_ID: + return isSetTaskId(); + case CREDENTIAL_TOKEN: + return isSetCredentialToken(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof ProcessSubmitEvent) + return this.equals((ProcessSubmitEvent)that); + return false; + } + + public boolean equals(ProcessSubmitEvent that) { + if (that == null) + return false; + + boolean this_present_taskId = true && this.isSetTaskId(); + boolean that_present_taskId = true && that.isSetTaskId(); + if (this_present_taskId || that_present_taskId) { + if (!(this_present_taskId && that_present_taskId)) + return false; + if (!this.taskId.equals(that.taskId)) + return false; + } + + boolean this_present_credentialToken = true && this.isSetCredentialToken(); + boolean that_present_credentialToken = true && that.isSetCredentialToken(); + if (this_present_credentialToken || that_present_credentialToken) { + if (!(this_present_credentialToken && that_present_credentialToken)) + return false; + if (!this.credentialToken.equals(that.credentialToken)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(ProcessSubmitEvent other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetCredentialToken()).compareTo(other.isSetCredentialToken()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCredentialToken()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentialToken, other.credentialToken); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ProcessSubmitEvent("); + boolean first = true; + + sb.append("taskId:"); + if (this.taskId == null) { + sb.append("null"); + } else { + sb.append(this.taskId); + } + first = false; + if (!first) sb.append(", "); + sb.append("credentialToken:"); + if (this.credentialToken == null) { + sb.append("null"); + } else { + sb.append(this.credentialToken); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!isSetTaskId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString()); + } + + if (!isSetCredentialToken()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'credentialToken' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class ProcessSubmitEventStandardSchemeFactory implements SchemeFactory { + public ProcessSubmitEventStandardScheme getScheme() { + return new ProcessSubmitEventStandardScheme(); + } + } + + private static class ProcessSubmitEventStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // TASK_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // CREDENTIAL_TOKEN + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.credentialToken = iprot.readString(); + struct.setCredentialTokenIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.taskId != null) { + oprot.writeFieldBegin(TASK_ID_FIELD_DESC); + oprot.writeString(struct.taskId); + oprot.writeFieldEnd(); + } + if (struct.credentialToken != null) { + oprot.writeFieldBegin(CREDENTIAL_TOKEN_FIELD_DESC); + oprot.writeString(struct.credentialToken); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class ProcessSubmitEventTupleSchemeFactory implements SchemeFactory { + public ProcessSubmitEventTupleScheme getScheme() { + return new ProcessSubmitEventTupleScheme(); + } + } + + private static class ProcessSubmitEventTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + oprot.writeString(struct.taskId); + oprot.writeString(struct.credentialToken); + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + struct.credentialToken = iprot.readString(); + struct.setCredentialTokenIsSet(true); + } + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index d9e85d4..b13b5ed 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -40,7 +40,8 @@ enum MessageType { WORKFLOWNODE, JOB, LAUNCHTASK, - TERMINATETASK + TERMINATETASK, + TASKOUTPUT } struct ExperimentStatusChangeEvent { @@ -102,6 +103,11 @@ struct JobIdentifier { // //8: // } +struct ProcessSubmitEvent{ + 1: required string taskId; + 2: required string credentialToken; +} + struct TaskSubmitEvent{ 1: required string experimentId, 2: required string taskId, http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index 837b728..5490d50 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -21,6 +21,7 @@ package org.apache.airavata.gfac.core.monitor; import com.google.common.eventbus.Subscribe; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; @@ -150,4 +151,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { } } } + + + @Subscribe + public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException { + String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); + logger.debug("Task Output changed event received for workflow node : " + + taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too. + MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId()); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java new file mode 100644 index 0000000..3352893 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.model.messaging.event.Message; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class RabbitMQProcessConsumer { + + private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessConsumer.class); + + private String url; + private Connection connection; + private Channel channel; + + public RabbitMQProcessConsumer() throws AiravataException { + try { + url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + createConnection(); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + } + + private void createConnection() throws AiravataException { + try { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(url); + connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for default"); + + channel = connection.createChannel(); +// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); + + } catch (Exception e) { + String msg = "could not open channel for exchange default"; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + + public String listen(final MessageHandler handler) throws AiravataException { + try { + Map props = handler.getProperties(); + final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); + if (routing == null) { + throw new IllegalArgumentException("The routing key must be present"); + } + String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); + String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); + if (queueName == null) { + if (!channel.isOpen()) { + channel = connection.createChannel(); +// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); + } + queueName = channel.queueDeclare().getQueue(); + } else { + channel.queueDeclare(queueName, true, false, false, null); + } + + if (consumerTag == null) { + consumerTag = "default"; + } + // autoAck=false, we will ack after task is done + final String finalQueueName = queueName; + channel.basicConsume(queueName, true, new QueueingConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + Message message = new Message(); + + try { + ThriftUtils.createThriftFromBytes(body, message); + TBase event = null; + String gatewayId = null; + ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent); + log.debug("Message received with message id : " + message.getMessageId() + + " with task id : " + processSubmitEvent.getTaskId()); + event = processSubmitEvent; + MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), null); + messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); + handler.onMessage(messageContext); + } catch (TException e) { + String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + finalQueueName; + log.warn(msg, e); + } + } + }); + return ""; + } catch (Exception e) { + String msg = "could not open channel for exchange default"; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + public void stopListen(final String queueName , final String exchangeName) throws AiravataException { + try { + channel.queueUnbind(queueName, exchangeName, null); + } catch (IOException e) { + String msg = "could not un-bind queue: " + queueName + " for exchange " + exchangeName; + log.debug(msg); + } + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java new file mode 100644 index 0000000..3684198 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.Message; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RabbitMQProcessPublisher implements Publisher { + + private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessPublisher.class); + public static final String PROCESS = "process.queue" ; + + private RabbitMQProducer rabbitMQProducer; + + public RabbitMQProcessPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); +// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, null, null); + rabbitMQProducer.open(); + } + + @Override + public void publish(MessageContext msgCtx) throws AiravataException { + try { + log.info("Publishing to process queue ..."); + byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(msgCtx.getMessageId()); + message.setMessageType(msgCtx.getType()); + message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); + String queueName = PROCESS; + message.setMessageType(MessageType.TASK); + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + rabbitMQProducer.sendToWorkerQueue(messageBody, queueName); + } catch (TException e) { + String msg = "Error while serializing the thrift object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java index d5e8c72..6efa77a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java @@ -174,6 +174,12 @@ public class RabbitMQStatusConsumer implements Consumer { taskStatusChangeEvent.getState()); event = taskStatusChangeEvent; gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId(); + }else if (message.getMessageType() == MessageType.TASKOUTPUT) { + TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType()); + event = taskOutputChangeEvent; + gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId(); } else if (message.getMessageType().equals(MessageType.JOB)) { JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); @@ -182,7 +188,7 @@ public class RabbitMQStatusConsumer implements Consumer { jobStatusChangeEvent.getState()); event = jobStatusChangeEvent; gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId(); - }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) { + } else if (message.getMessageType().equals(MessageType.LAUNCHTASK)) { TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); log.debug(" Message Received with message id '" + message.getMessageId() @@ -190,7 +196,7 @@ public class RabbitMQStatusConsumer implements Consumer { taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId()); event = taskSubmitEvent; gatewayId = taskSubmitEvent.getGatewayId(); - }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) { + } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) { TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent); log.debug(" Message Received with message id '" + message.getMessageId() http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java index 966d44d..a149037 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@ -68,21 +68,25 @@ public class RabbitMQStatusPublisher implements Publisher { message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); String gatewayId = msgCtx.getGatewayId(); String routingKey = null; - if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ + if (msgCtx.getType() == MessageType.EXPERIMENT) { ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); routingKey = gatewayId + "." + event.getExperimentId(); - } else if (msgCtx.getType().equals(MessageType.TASK)) { + } else if (msgCtx.getType() == MessageType.TASK) { TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); - }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ + } else if (msgCtx.getType() == MessageType.TASKOUTPUT) { + TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); + routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); + } else if (msgCtx.getType() == MessageType.WORKFLOWNODE) { WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); - routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); - }else if (msgCtx.getType().equals(MessageType.JOB)){ - JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); + routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + } else if (msgCtx.getType() == MessageType.JOB) { + JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent(); JobIdentifier identity = event.getJobIdentity(); - routingKey = gatewayId + "." + identity.getExperimentId() + "." + + routingKey = gatewayId + "." + identity.getExperimentId() + "." + identity.getWorkflowNodeId() + "." + identity.getTaskId() + "." + identity.getJobId(); http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index e873e05..a4e105e 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -38,14 +38,19 @@ import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.PublisherFactory; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.error.LaunchValidationException; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; import org.apache.airavata.model.util.ExecutionType; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; @@ -61,6 +66,7 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetai import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants; import org.apache.airavata.orchestrator.util.DataModelUtils; import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -86,7 +92,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, private String gatewayName; private Publisher publisher; - /** + private RabbitMQProcessConsumer rabbitMQProcessConsumer; + private RabbitMQProcessPublisher rabbitMQProcessPublisher; + + /** * Query orchestrator server to fetch the CPI version */ public String getOrchestratorCPIVersion() throws TException { @@ -152,7 +161,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, orchestrator.initialize(); orchestrator.getOrchestratorContext().setZk(this.zk); orchestrator.getOrchestratorContext().setPublisher(this.publisher); - } catch (OrchestratorException e) { + startProcessConsumer(); + } catch (OrchestratorException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); } catch (RegistryException e) { @@ -161,6 +171,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } } + private void startProcessConsumer() throws OrchestratorException { + try { + rabbitMQProcessConsumer = new RabbitMQProcessConsumer(); + ProcessConsumer processConsumer = new ProcessConsumer(); + Thread thread = new Thread(processConsumer); + thread.start(); + + } catch (AiravataException e) { + throw new OrchestratorException("Error while starting process consumer", e); + } + + } + private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException { Stat zkStat = zk.exists(orchServer, false); if (zkStat == null) { @@ -627,6 +650,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } return true; } + private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { // try { // WorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); @@ -634,15 +658,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, // } catch (WorkflowEngineException e) { // log.errorId(experimentId, "Error while launching experiment.", e); // } - try { - SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(experimentId, airavataCredStoreToken); + SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( + experimentId, airavataCredStoreToken,getGatewayName(), getRabbitMQProcessPublisher()); + Thread thread = new Thread(simpleWorkflowInterpreter); thread.start(); // simpleWorkflowInterpreter.run(); } catch (RegistryException e) { log.error("Error while launching workflow", e); + } catch (Exception e) { + log.error("Error while initializing rabbit mq process publisher"); + } + } + + public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception { + if (rabbitMQProcessPublisher == null) { + rabbitMQProcessPublisher = new RabbitMQProcessPublisher(); } + return rabbitMQProcessPublisher; } @@ -732,4 +766,38 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } } + private class ProcessConsumer implements Runnable, MessageHandler{ + + + @Override + public void run() { + try { + rabbitMQProcessConsumer.listen(this); + } catch (AiravataException e) { + log.error("Error while listen to the RabbitMQProcessConsumer"); + } + } + + @Override + public Map getProperties() { + Map props = new HashMap(); + props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS); + return props; + } + + @Override + public void onMessage(MessageContext msgCtx) { + TBase event = msgCtx.getEvent(); + if (event instanceof ProcessSubmitEvent) { + ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event; + try { + launchTask(processSubmitEvent.getTaskId(), processSubmitEvent.getCredentialToken()); + } catch (TException e) { + log.error("Error while launching task : " + processSubmitEvent.getTaskId()); + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/pom.xml ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/pom.xml b/modules/simple-workflow/pom.xml index 6b36335..5cb9dfb 100644 --- a/modules/simple-workflow/pom.xml +++ b/modules/simple-workflow/pom.xml @@ -48,6 +48,11 @@ + org.apache.airavata + airavata-messaging-core + ${project.version} + + com.google.guava guava 18.0 http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java new file mode 100644 index 0000000..849af85 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +public class ProcessContext { + private WorkflowNode workflowNode; + private WorkflowNodeDetails wfNodeDetails; + private TaskDetails taskDetails; + + public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { + this.workflowNode = workflowNode; + this.wfNodeDetails = wfNodeDetails; + this.taskDetails = taskDetails; + } + + public WorkflowNode getWorkflowNode() { + return workflowNode; + } + + public void setWorkflowNode(WorkflowNode workflowNode) { + this.workflowNode = workflowNode; + } + + public WorkflowNodeDetails getWfNodeDetails() { + return wfNodeDetails; + } + + public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { + this.wfNodeDetails = wfNodeDetails; + } + + public TaskDetails getTaskDetails() { + return taskDetails; + } + + public void setTaskDetails(TaskDetails taskDetails) { + this.taskDetails = taskDetails; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java deleted file mode 100644 index b58b947..0000000 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.simple.workflow.engine; - -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; - -public class ProcessPack { - private WorkflowNode workflowNode; - private WorkflowNodeDetails wfNodeDetails; - private TaskDetails taskDetails; - - public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { - this.workflowNode = workflowNode; - this.wfNodeDetails = wfNodeDetails; - this.taskDetails = taskDetails; - } - - public WorkflowNode getWorkflowNode() { - return workflowNode; - } - - public void setWorkflowNode(WorkflowNode workflowNode) { - this.workflowNode = workflowNode; - } - - public WorkflowNodeDetails getWfNodeDetails() { - return wfNodeDetails; - } - - public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { - this.wfNodeDetails = wfNodeDetails; - } - - public TaskDetails getTaskDetails() { - return taskDetails; - } - - public void setTaskDetails(TaskDetails taskDetails) { - this.taskDetails = taskDetails; - } -}