Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57C2117DF1 for ; Fri, 10 Apr 2015 15:50:26 +0000 (UTC) Received: (qmail 77601 invoked by uid 500); 10 Apr 2015 15:50:26 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 77494 invoked by uid 500); 10 Apr 2015 15:50:26 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 77261 invoked by uid 99); 10 Apr 2015 15:50:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 15:50:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0B441E0426; Fri, 10 Apr 2015 15:50:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Fri, 10 Apr 2015 15:50:29 -0000 Message-Id: <57b4d706f947477fb3705a408d4e6921@git.apache.org> In-Reply-To: <154e3dee7ef44e92abcd6005cbae32a8@git.apache.org> References: <154e3dee7ef44e92abcd6005cbae32a8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] airavata git commit: AIRAVATA-1666 - Added compute resource level email based monitoring check. AIRAVATA-1666 - Added compute resource level email based monitoring check. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/93d4421b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/93d4421b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/93d4421b Branch: refs/heads/emailMonitoring Commit: 93d4421b7e5c0d32fc62ceae3d0d0c88cdaf7838 Parents: 042e25c Author: shamrath Authored: Fri Apr 10 11:50:20 2015 -0400 Committer: shamrath Committed: Fri Apr 10 11:50:20 2015 -0400 ---------------------------------------------------------------------- .../lib/airavata/computeResourceModel_types.cpp | 12 +- .../lib/airavata/computeResourceModel_types.h | 16 +-- .../Model/AppCatalog/ComputeResource/Types.php | 20 ++-- .../model/appcatalog/computeresource/ttypes.py | 18 +-- .../client/samples/CreateLaunchExperiment.java | 21 ++-- .../computeresource/SSHJobSubmission.java | 112 +++++++++---------- .../computeResourceModel.thrift | 2 +- .../gsissh/provider/impl/GSISSHProvider.java | 87 +++++++------- .../gfac/monitor/email/EmailBasedMonitor.java | 61 ++++++---- .../gfac/monitor/email/EmailMonitorFactory.java | 58 ++++++++++ .../gfac/ssh/provider/impl/SSHProvider.java | 58 ++++++---- 11 files changed, 279 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp index 8b83573..2a57b7d 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.cpp @@ -1142,8 +1142,8 @@ uint32_t SSHJobSubmission::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 7: if (ftype == ::apache::thrift::protocol::T_STRUCT) { - xfer += this->emailMonitor.read(iprot); - this->__isset.emailMonitor = true; + xfer += this->emailMonitorProperty.read(iprot); + this->__isset.emailMonitorProperty = true; } else { xfer += iprot->skip(ftype); } @@ -1197,9 +1197,9 @@ uint32_t SSHJobSubmission::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += oprot->writeI32((int32_t)this->monitorMode); xfer += oprot->writeFieldEnd(); } - if (this->__isset.emailMonitor) { - xfer += oprot->writeFieldBegin("emailMonitor", ::apache::thrift::protocol::T_STRUCT, 7); - xfer += this->emailMonitor.write(oprot); + if (this->__isset.emailMonitorProperty) { + xfer += oprot->writeFieldBegin("emailMonitorProperty", ::apache::thrift::protocol::T_STRUCT, 7); + xfer += this->emailMonitorProperty.write(oprot); xfer += oprot->writeFieldEnd(); } xfer += oprot->writeFieldStop(); @@ -1215,7 +1215,7 @@ void swap(SSHJobSubmission &a, SSHJobSubmission &b) { swap(a.alternativeSSHHostName, b.alternativeSSHHostName); swap(a.sshPort, b.sshPort); swap(a.monitorMode, b.monitorMode); - swap(a.emailMonitor, b.emailMonitor); + swap(a.emailMonitorProperty, b.emailMonitorProperty); swap(a.__isset, b.__isset); } http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h index 05a2dfd..2b72a04 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/computeResourceModel_types.h @@ -647,11 +647,11 @@ class LOCALDataMovement { void swap(LOCALDataMovement &a, LOCALDataMovement &b); typedef struct _SSHJobSubmission__isset { - _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitor(false) {} + _SSHJobSubmission__isset() : alternativeSSHHostName(false), sshPort(true), monitorMode(false), emailMonitorProperty(false) {} bool alternativeSSHHostName; bool sshPort; bool monitorMode; - bool emailMonitor; + bool emailMonitorProperty; } _SSHJobSubmission__isset; class SSHJobSubmission { @@ -671,7 +671,7 @@ class SSHJobSubmission { std::string alternativeSSHHostName; int32_t sshPort; MonitorMode::type monitorMode; - EmailMonitorProperty emailMonitor; + EmailMonitorProperty emailMonitorProperty; _SSHJobSubmission__isset __isset; @@ -702,9 +702,9 @@ class SSHJobSubmission { __isset.monitorMode = true; } - void __set_emailMonitor(const EmailMonitorProperty& val) { - emailMonitor = val; - __isset.emailMonitor = true; + void __set_emailMonitorProperty(const EmailMonitorProperty& val) { + emailMonitorProperty = val; + __isset.emailMonitorProperty = true; } bool operator == (const SSHJobSubmission & rhs) const @@ -727,9 +727,9 @@ class SSHJobSubmission { return false; else if (__isset.monitorMode && !(monitorMode == rhs.monitorMode)) return false; - if (__isset.emailMonitor != rhs.__isset.emailMonitor) + if (__isset.emailMonitorProperty != rhs.__isset.emailMonitorProperty) return false; - else if (__isset.emailMonitor && !(emailMonitor == rhs.emailMonitor)) + else if (__isset.emailMonitorProperty && !(emailMonitorProperty == rhs.emailMonitorProperty)) return false; return true; } http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php index e488087..22c46a9 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/AppCatalog/ComputeResource/Types.php @@ -1232,7 +1232,7 @@ class SSHJobSubmission { public $alternativeSSHHostName = null; public $sshPort = 22; public $monitorMode = null; - public $emailMonitor = null; + public $emailMonitorProperty = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -1263,7 +1263,7 @@ class SSHJobSubmission { 'type' => TType::I32, ), 7 => array( - 'var' => 'emailMonitor', + 'var' => 'emailMonitorProperty', 'type' => TType::STRUCT, 'class' => '\Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty', ), @@ -1288,8 +1288,8 @@ class SSHJobSubmission { if (isset($vals['monitorMode'])) { $this->monitorMode = $vals['monitorMode']; } - if (isset($vals['emailMonitor'])) { - $this->emailMonitor = $vals['emailMonitor']; + if (isset($vals['emailMonitorProperty'])) { + $this->emailMonitorProperty = $vals['emailMonitorProperty']; } } } @@ -1358,8 +1358,8 @@ class SSHJobSubmission { break; case 7: if ($ftype == TType::STRUCT) { - $this->emailMonitor = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty(); - $xfer += $this->emailMonitor->read($input); + $this->emailMonitorProperty = new \Airavata\Model\AppCatalog\ComputeResource\EmailMonitorProperty(); + $xfer += $this->emailMonitorProperty->read($input); } else { $xfer += $input->skip($ftype); } @@ -1410,12 +1410,12 @@ class SSHJobSubmission { $xfer += $output->writeI32($this->monitorMode); $xfer += $output->writeFieldEnd(); } - if ($this->emailMonitor !== null) { - if (!is_object($this->emailMonitor)) { + if ($this->emailMonitorProperty !== null) { + if (!is_object($this->emailMonitorProperty)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('emailMonitor', TType::STRUCT, 7); - $xfer += $this->emailMonitor->write($output); + $xfer += $output->writeFieldBegin('emailMonitorProperty', TType::STRUCT, 7); + $xfer += $this->emailMonitorProperty->write($output); $xfer += $output->writeFieldEnd(); } $xfer += $output->writeFieldStop(); http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py index e635472..8b34be1 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/appcatalog/computeresource/ttypes.py @@ -1216,7 +1216,7 @@ class SSHJobSubmission: - alternativeSSHHostName - sshPort - monitorMode - - emailMonitor + - emailMonitorProperty """ thrift_spec = ( @@ -1227,17 +1227,17 @@ class SSHJobSubmission: (4, TType.STRING, 'alternativeSSHHostName', None, None, ), # 4 (5, TType.I32, 'sshPort', None, 22, ), # 5 (6, TType.I32, 'monitorMode', None, None, ), # 6 - (7, TType.STRUCT, 'emailMonitor', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7 + (7, TType.STRUCT, 'emailMonitorProperty', (EmailMonitorProperty, EmailMonitorProperty.thrift_spec), None, ), # 7 ) - def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitor=None,): + def __init__(self, jobSubmissionInterfaceId=thrift_spec[1][4], securityProtocol=None, resourceJobManager=None, alternativeSSHHostName=None, sshPort=thrift_spec[5][4], monitorMode=None, emailMonitorProperty=None,): self.jobSubmissionInterfaceId = jobSubmissionInterfaceId self.securityProtocol = securityProtocol self.resourceJobManager = resourceJobManager self.alternativeSSHHostName = alternativeSSHHostName self.sshPort = sshPort self.monitorMode = monitorMode - self.emailMonitor = emailMonitor + self.emailMonitorProperty = emailMonitorProperty def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -1281,8 +1281,8 @@ class SSHJobSubmission: iprot.skip(ftype) elif fid == 7: if ftype == TType.STRUCT: - self.emailMonitor = EmailMonitorProperty() - self.emailMonitor.read(iprot) + self.emailMonitorProperty = EmailMonitorProperty() + self.emailMonitorProperty.read(iprot) else: iprot.skip(ftype) else: @@ -1319,9 +1319,9 @@ class SSHJobSubmission: oprot.writeFieldBegin('monitorMode', TType.I32, 6) oprot.writeI32(self.monitorMode) oprot.writeFieldEnd() - if self.emailMonitor is not None: - oprot.writeFieldBegin('emailMonitor', TType.STRUCT, 7) - self.emailMonitor.write(oprot) + if self.emailMonitorProperty is not None: + oprot.writeFieldBegin('emailMonitorProperty', TType.STRUCT, 7) + self.emailMonitorProperty.write(oprot) oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index d173a0b..9ad71f4 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -61,7 +61,7 @@ public class CreateLaunchExperiment { private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60"; private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; - private static String amberAppId = "Amber_aa083c86-4680-4002-b3ef-fad93c181926"; + private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c"; private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b"; private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1"; private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6"; @@ -168,13 +168,13 @@ public class CreateLaunchExperiment { // final String expId = createMPIExperimentForFSD(airavataClient); // final String expId = createEchoExperimentForStampede(airavataClient); // final String expId = createEchoExperimentForTrestles(airavataClient); - final String expId = createExperimentEchoForLocalHost(airavataClient); - experimentIds.add(expId); +// final String expId = createExperimentEchoForLocalHost(airavataClient); // final String expId = createExperimentWRFTrestles(airavataClient); // final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); // final String expId = createExperimentWRFStampede(airavataClient); -// final String expId = createExperimentForStampedeAmber(airavataClient); + final String expId = createExperimentForStampedeAmber(airavataClient); +// String expId = createExperimentForTrestlesAmber(airavataClient); // final String expId = createExperimentGROMACSStampede(airavataClient); // final String expId = createExperimentESPRESSOStampede(airavataClient); // final String expId = createExperimentLAMMPSStampede(airavataClient); @@ -184,6 +184,7 @@ public class CreateLaunchExperiment { // final String expId = createExperimentForLSF(airavataClient); // final String expId = createExperimentLAMMPSForLSF(airavataClient); // final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0"; + experimentIds.add(expId); System.out.println("Experiment ID : " + expId); // updateExperiment(airavata, expId); @@ -1312,11 +1313,11 @@ public class CreateLaunchExperiment { // } for (InputDataObjectType inputDataObjectType : exInputs) { if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/02_Heat.rst"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/03_Prod.in"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("file://root@test-drive.airavata.org:/var/www/experimentData/admin101a290e6330f15a91349159553ae8b6bb1/prmtop"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop"); } } @@ -1377,11 +1378,11 @@ public class CreateLaunchExperiment { // } for (InputDataObjectType inputDataObjectType : exInputs) { if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/02_Heat.rst"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/03_Prod.in"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("/Users/chathuri/dev/airavata/source/php/inputs/AMBER_FILES/prmtop"); + inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop"); } } List exOut = client.getApplicationOutputs(amberAppId); http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java index a9bf275..66f33ab 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/appcatalog/computeresource/SSHJobSubmission.java @@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField ALTERNATIVE_SSHHOST_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("alternativeSSHHostName", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField SSH_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("sshPort", org.apache.thrift.protocol.TType.I32, (short)5); private static final org.apache.thrift.protocol.TField MONITOR_MODE_FIELD_DESC = new org.apache.thrift.protocol.TField("monitorMode", org.apache.thrift.protocol.TType.I32, (short)6); - private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitor", org.apache.thrift.protocol.TType.STRUCT, (short)7); + private static final org.apache.thrift.protocol.TField EMAIL_MONITOR_PROPERTY_FIELD_DESC = new org.apache.thrift.protocol.TField("emailMonitorProperty", org.apache.thrift.protocol.TType.STRUCT, (short)7); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory; private String alternativeSSHHostName; // optional private int sshPort; // optional private MonitorMode monitorMode; // optional - private EmailMonitorProperty emailMonitor; // optional + private EmailMonitorProperty emailMonitorProperty; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory; * @see MonitorMode */ MONITOR_MODE((short)6, "monitorMode"), - EMAIL_MONITOR((short)7, "emailMonitor"); + EMAIL_MONITOR_PROPERTY((short)7, "emailMonitorProperty"); private static final Map byName = new HashMap(); @@ -126,8 +126,8 @@ import org.slf4j.LoggerFactory; return SSH_PORT; case 6: // MONITOR_MODE return MONITOR_MODE; - case 7: // EMAIL_MONITOR - return EMAIL_MONITOR; + case 7: // EMAIL_MONITOR_PROPERTY + return EMAIL_MONITOR_PROPERTY; default: return null; } @@ -170,7 +170,7 @@ import org.slf4j.LoggerFactory; // isset id assignments private static final int __SSHPORT_ISSET_ID = 0; private byte __isset_bitfield = 0; - private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR}; + private _Fields optionals[] = {_Fields.ALTERNATIVE_SSHHOST_NAME,_Fields.SSH_PORT,_Fields.MONITOR_MODE,_Fields.EMAIL_MONITOR_PROPERTY}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -186,7 +186,7 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.MONITOR_MODE, new org.apache.thrift.meta_data.FieldMetaData("monitorMode", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, MonitorMode.class))); - tmpMap.put(_Fields.EMAIL_MONITOR, new org.apache.thrift.meta_data.FieldMetaData("emailMonitor", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.EMAIL_MONITOR_PROPERTY, new org.apache.thrift.meta_data.FieldMetaData("emailMonitorProperty", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, EmailMonitorProperty.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(SSHJobSubmission.class, metaDataMap); @@ -231,8 +231,8 @@ import org.slf4j.LoggerFactory; if (other.isSetMonitorMode()) { this.monitorMode = other.monitorMode; } - if (other.isSetEmailMonitor()) { - this.emailMonitor = new EmailMonitorProperty(other.emailMonitor); + if (other.isSetEmailMonitorProperty()) { + this.emailMonitorProperty = new EmailMonitorProperty(other.emailMonitorProperty); } } @@ -250,7 +250,7 @@ import org.slf4j.LoggerFactory; this.sshPort = 22; this.monitorMode = null; - this.emailMonitor = null; + this.emailMonitorProperty = null; } public String getJobSubmissionInterfaceId() { @@ -406,26 +406,26 @@ import org.slf4j.LoggerFactory; } } - public EmailMonitorProperty getEmailMonitor() { - return this.emailMonitor; + public EmailMonitorProperty getEmailMonitorProperty() { + return this.emailMonitorProperty; } - public void setEmailMonitor(EmailMonitorProperty emailMonitor) { - this.emailMonitor = emailMonitor; + public void setEmailMonitorProperty(EmailMonitorProperty emailMonitorProperty) { + this.emailMonitorProperty = emailMonitorProperty; } - public void unsetEmailMonitor() { - this.emailMonitor = null; + public void unsetEmailMonitorProperty() { + this.emailMonitorProperty = null; } - /** Returns true if field emailMonitor is set (has been assigned a value) and false otherwise */ - public boolean isSetEmailMonitor() { - return this.emailMonitor != null; + /** Returns true if field emailMonitorProperty is set (has been assigned a value) and false otherwise */ + public boolean isSetEmailMonitorProperty() { + return this.emailMonitorProperty != null; } - public void setEmailMonitorIsSet(boolean value) { + public void setEmailMonitorPropertyIsSet(boolean value) { if (!value) { - this.emailMonitor = null; + this.emailMonitorProperty = null; } } @@ -479,11 +479,11 @@ import org.slf4j.LoggerFactory; } break; - case EMAIL_MONITOR: + case EMAIL_MONITOR_PROPERTY: if (value == null) { - unsetEmailMonitor(); + unsetEmailMonitorProperty(); } else { - setEmailMonitor((EmailMonitorProperty)value); + setEmailMonitorProperty((EmailMonitorProperty)value); } break; @@ -510,8 +510,8 @@ import org.slf4j.LoggerFactory; case MONITOR_MODE: return getMonitorMode(); - case EMAIL_MONITOR: - return getEmailMonitor(); + case EMAIL_MONITOR_PROPERTY: + return getEmailMonitorProperty(); } throw new IllegalStateException(); @@ -536,8 +536,8 @@ import org.slf4j.LoggerFactory; return isSetSshPort(); case MONITOR_MODE: return isSetMonitorMode(); - case EMAIL_MONITOR: - return isSetEmailMonitor(); + case EMAIL_MONITOR_PROPERTY: + return isSetEmailMonitorProperty(); } throw new IllegalStateException(); } @@ -609,12 +609,12 @@ import org.slf4j.LoggerFactory; return false; } - boolean this_present_emailMonitor = true && this.isSetEmailMonitor(); - boolean that_present_emailMonitor = true && that.isSetEmailMonitor(); - if (this_present_emailMonitor || that_present_emailMonitor) { - if (!(this_present_emailMonitor && that_present_emailMonitor)) + boolean this_present_emailMonitorProperty = true && this.isSetEmailMonitorProperty(); + boolean that_present_emailMonitorProperty = true && that.isSetEmailMonitorProperty(); + if (this_present_emailMonitorProperty || that_present_emailMonitorProperty) { + if (!(this_present_emailMonitorProperty && that_present_emailMonitorProperty)) return false; - if (!this.emailMonitor.equals(that.emailMonitor)) + if (!this.emailMonitorProperty.equals(that.emailMonitorProperty)) return false; } @@ -694,12 +694,12 @@ import org.slf4j.LoggerFactory; return lastComparison; } } - lastComparison = Boolean.valueOf(isSetEmailMonitor()).compareTo(other.isSetEmailMonitor()); + lastComparison = Boolean.valueOf(isSetEmailMonitorProperty()).compareTo(other.isSetEmailMonitorProperty()); if (lastComparison != 0) { return lastComparison; } - if (isSetEmailMonitor()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitor, other.emailMonitor); + if (isSetEmailMonitorProperty()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.emailMonitorProperty, other.emailMonitorProperty); if (lastComparison != 0) { return lastComparison; } @@ -773,13 +773,13 @@ import org.slf4j.LoggerFactory; } first = false; } - if (isSetEmailMonitor()) { + if (isSetEmailMonitorProperty()) { if (!first) sb.append(", "); - sb.append("emailMonitor:"); - if (this.emailMonitor == null) { + sb.append("emailMonitorProperty:"); + if (this.emailMonitorProperty == null) { sb.append("null"); } else { - sb.append(this.emailMonitor); + sb.append(this.emailMonitorProperty); } first = false; } @@ -805,8 +805,8 @@ import org.slf4j.LoggerFactory; if (resourceJobManager != null) { resourceJobManager.validate(); } - if (emailMonitor != null) { - emailMonitor.validate(); + if (emailMonitorProperty != null) { + emailMonitorProperty.validate(); } } @@ -895,11 +895,11 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 7: // EMAIL_MONITOR + case 7: // EMAIL_MONITOR_PROPERTY if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.emailMonitor = new EmailMonitorProperty(); - struct.emailMonitor.read(iprot); - struct.setEmailMonitorIsSet(true); + struct.emailMonitorProperty = new EmailMonitorProperty(); + struct.emailMonitorProperty.read(iprot); + struct.setEmailMonitorPropertyIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -951,10 +951,10 @@ import org.slf4j.LoggerFactory; oprot.writeFieldEnd(); } } - if (struct.emailMonitor != null) { - if (struct.isSetEmailMonitor()) { - oprot.writeFieldBegin(EMAIL_MONITOR_FIELD_DESC); - struct.emailMonitor.write(oprot); + if (struct.emailMonitorProperty != null) { + if (struct.isSetEmailMonitorProperty()) { + oprot.writeFieldBegin(EMAIL_MONITOR_PROPERTY_FIELD_DESC); + struct.emailMonitorProperty.write(oprot); oprot.writeFieldEnd(); } } @@ -988,7 +988,7 @@ import org.slf4j.LoggerFactory; if (struct.isSetMonitorMode()) { optionals.set(2); } - if (struct.isSetEmailMonitor()) { + if (struct.isSetEmailMonitorProperty()) { optionals.set(3); } oprot.writeBitSet(optionals, 4); @@ -1001,8 +1001,8 @@ import org.slf4j.LoggerFactory; if (struct.isSetMonitorMode()) { oprot.writeI32(struct.monitorMode.getValue()); } - if (struct.isSetEmailMonitor()) { - struct.emailMonitor.write(oprot); + if (struct.isSetEmailMonitorProperty()) { + struct.emailMonitorProperty.write(oprot); } } @@ -1030,9 +1030,9 @@ import org.slf4j.LoggerFactory; struct.setMonitorModeIsSet(true); } if (incoming.get(3)) { - struct.emailMonitor = new EmailMonitorProperty(); - struct.emailMonitor.read(iprot); - struct.setEmailMonitorIsSet(true); + struct.emailMonitorProperty = new EmailMonitorProperty(); + struct.emailMonitorProperty.read(iprot); + struct.setEmailMonitorPropertyIsSet(true); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift index cf9b06c..b13b2e5 100644 --- a/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift +++ b/airavata-api/thrift-interface-descriptions/computeResourceModel.thrift @@ -348,7 +348,7 @@ struct SSHJobSubmission { 4: optional string alternativeSSHHostName, 5: optional i32 sshPort = 22, 6: optional MonitorMode monitorMode, - 7: optional EmailMonitorProperty emailMonitor + 7: optional EmailMonitorProperty emailMonitorProperty } struct GlobusJobSubmission { http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index 2b23596..075f942 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -21,6 +21,7 @@ package org.apache.airavata.gfac.gsissh.provider.impl; import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.ExecutionMode; @@ -36,11 +37,14 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; +import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; @@ -138,48 +142,53 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } - public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException { - if (ServerSettings.isEmailBasedNotificationEnable()) { - try { - EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher()); - emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); - } catch (ApplicationSettingsException e) { - throw new GFacHandlerException("Error while delegating job execution context to email based monitor"); - } - } else { - List daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - if (daemonHandlers == null) { - daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - } - ThreadedHandler pullMonitorHandler = null; - ThreadedHandler pushMonitorHandler = null; - MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); - for (ThreadedHandler threadedHandler : daemonHandlers) { - if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { - pullMonitorHandler = threadedHandler; - if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { - log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); - pullMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + - " to handle by the GridPullMonitorHandler"); - } - } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { - pushMonitorHandler = threadedHandler; - if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { - log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); - pushMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + - " to handle by the GridPushMonitorHandler"); - } + public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException, AppCatalogException { + if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty(); + if (emailMonitorProp != null) { + EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory(); + EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp); + emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); + return; } - // have to handle the GridPushMonitorHandler logic } - if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { - log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + - ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } + + // if email monitor is not activeated or not configure we use pull or push monitor + List daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { + log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); + pullMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + + " to handle by the GridPullMonitorHandler"); + } + } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { + pushMonitorHandler = threadedHandler; + if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { + log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); + pushMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + + " to handle by the GridPushMonitorHandler"); + } } + // have to handle the GridPushMonitorHandler logic + } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index b6bfa6c..affe156 100644 --- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -21,18 +21,19 @@ package org.apache.airavata.gfac.monitor.email; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; import org.apache.airavata.gfac.core.utils.OutHandlerWorker; import org.apache.airavata.gfac.monitor.email.parser.EmailParser; import org.apache.airavata.gfac.monitor.email.parser.LonestarEmailParser; import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser; import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser; +import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty; +import org.apache.airavata.model.appcatalog.computeresource.EmailProtocol; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.workspace.experiment.JobState; @@ -59,39 +60,34 @@ public class EmailBasedMonitor implements Runnable{ private static final String PBS_CONSULT_SDSC_EDU = "pbsconsult@sdsc.edu"; private static final String SLURM_BATCH_STAMPEDE = "slurm@batch1.stampede.tacc.utexas.edu"; private static final String LONESTAR_ADDRESS = "root@c312-206.ls4.tacc.utexas.edu"; - private static EmailBasedMonitor emailBasedMonitor; - private final MonitorPublisher monitorPublisher; + private final EmailMonitorProperty emailMonitorProperty; + private boolean stopMonitoring = false; private Session session ; private Store store; private Folder emailFolder; - private String host, emailAddress, password, folderName, mailStoreProtocol; +// private String host, emailAddress, password, folderName, mailStoreProtocol; private Properties properties; private Map jobMonitorMap = new ConcurrentHashMap(); - private EmailBasedMonitor(MonitorPublisher monitorPublisher) throws ApplicationSettingsException { - this.monitorPublisher = monitorPublisher; + public EmailBasedMonitor(EmailMonitorProperty emailMonitorProp) { + this.emailMonitorProperty = emailMonitorProp; init(); } - private void init() throws ApplicationSettingsException { - host = ServerSettings.getEmailBasedMonitorHost(); - emailAddress = ServerSettings.getEmailBasedMonitorAddress(); - password = ServerSettings.getEmailBasedMonitorPassword(); - folderName = ServerSettings.getEmailBasedMonitorFolderName(); - mailStoreProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol(); - + private void init() { properties = new Properties(); - properties.put("mail.store.protocol", mailStoreProtocol); + properties.put("mail.store.protocol", emailMonitorProperty.getStoreProtocol()); } - public static EmailBasedMonitor getInstant(MonitorPublisher monitorPublisher) throws ApplicationSettingsException { +/* public static EmailBasedMonitor getInstant(EmailMonitorProperty emailMonitorProp, MonitorPublisher monitorPublisher) + throws ApplicationSettingsException { if (emailBasedMonitor == null) { synchronized (EmailBasedMonitor.class) { if (emailBasedMonitor == null) { - emailBasedMonitor = new EmailBasedMonitor(monitorPublisher); + emailBasedMonitor = new EmailBasedMonitor(emailMonitorProp); Thread thread = new Thread(emailBasedMonitor); thread.start(); } @@ -99,7 +95,7 @@ public class EmailBasedMonitor implements Runnable{ } return emailBasedMonitor; - } + }*/ public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) { addToJobMonitorMap(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext); @@ -134,14 +130,15 @@ public class EmailBasedMonitor implements Runnable{ public void run() { try { session = Session.getDefaultInstance(properties); - store = session.getStore(mailStoreProtocol); - store.connect(host, emailAddress, password); - while (!ServerSettings.isStopAllThreads()) { + store = session.getStore(getProtocol(emailMonitorProperty.getStoreProtocol())); + store.connect(emailMonitorProperty.getHost(), emailMonitorProperty.getEmailAddress(), + emailMonitorProperty.getPassword()); + while (!(stopMonitoring || ServerSettings.isStopAllThreads())) { if (!store.isConnected()) { store.connect(); } Thread.sleep(2000); - emailFolder = store.getFolder(folderName); + emailFolder = store.getFolder(emailMonitorProperty.getFolderName()); emailFolder.open(Folder.READ_WRITE); Message[] searchMessages = emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false)); List processedMessages = new ArrayList<>(); @@ -177,6 +174,8 @@ public class EmailBasedMonitor implements Runnable{ log.error("Couldn't connect to the store ", e); } catch (InterruptedException e) { log.error("Interrupt exception while sleep ", e); + } catch (AiravataException e) { + log.error("UnHandled arguments ", e); } finally { try { store.close(); @@ -186,11 +185,21 @@ public class EmailBasedMonitor implements Runnable{ } } + private String getProtocol(EmailProtocol storeProtocol) throws AiravataException { + switch (storeProtocol) { + case IMAPS: + return "imaps"; + case POP3: + return "pop3"; + default: + throw new AiravataException("Unhandled Email store protocol "); + } + } private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){ JobState resultState = jobStatusResult.getState(); jEC.getJobDetails().setJobStatus(new JobStatus(resultState)); if (resultState == JobState.COMPLETE) { - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, monitorPublisher)); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher())); }else if (resultState == JobState.QUEUED) { // TODO - publish queued rabbitmq message }else if (resultState == JobState.FAILED) { @@ -216,7 +225,7 @@ public class EmailBasedMonitor implements Runnable{ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), jobStatus.getJobIdentity().getTaskId()); - monitorPublisher.publish(jobStatus); + BetterGfacImpl.getMonitorPublisher().publish(jobStatus); } private void writeEnvelopeOnError(Message m) throws MessagingException { @@ -235,4 +244,8 @@ public class EmailBasedMonitor implements Runnable{ if (m.getSubject() != null) log.error("SUBJECT: " + m.getSubject()); } + + public void stopMonitoring() { + stopMonitoring = true; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java new file mode 100644 index 0000000..2c24973 --- /dev/null +++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email; + +import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty; + +import java.util.HashMap; +import java.util.Map; + +public class EmailMonitorFactory { + + private Map emailMonitors = new HashMap(); + + + public synchronized EmailBasedMonitor getEmailBasedMonitor(EmailMonitorProperty emailMonitorProp) { + String key = getKey(emailMonitorProp); + EmailBasedMonitor monitor = emailMonitors.get(key); + if (monitor == null) { + monitor = new EmailBasedMonitor(emailMonitorProp); + emailMonitors.put(key, monitor); + new Thread(monitor).start(); + } + return monitor; + } + + public void stopAllMonitors() { + for (EmailBasedMonitor emailBasedMonitor : emailMonitors.values()) { + emailBasedMonitor.stopMonitoring(); + } + } + + private String getKey(EmailMonitorProperty emailMonitorProp) { + StringBuffer sb = new StringBuffer(emailMonitorProp.getHost().trim()); + sb.append("_").append(emailMonitorProp.getStoreProtocol().name()); + sb.append("_").append(emailMonitorProp.getEmailAddress().trim()); + sb.append("_").append(emailMonitorProp.getFolderName().trim()); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/93d4421b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index b201c79..7c29352 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -21,6 +21,7 @@ package org.apache.airavata.gfac.ssh.provider.impl; +import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.Constants; @@ -36,6 +37,7 @@ import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; +import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; @@ -47,9 +49,12 @@ import org.apache.airavata.gsi.ssh.impl.StandardOutReader; import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; import org.apache.airavata.model.appcatalog.appinterface.DataType; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.computeresource.EmailMonitorProperty; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; import org.apache.airavata.model.workspace.experiment.JobDetails; @@ -369,33 +374,40 @@ public class SSHProvider extends AbstractProvider { return stdOutputString; } - public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - if (ServerSettings.isEmailBasedNotificationEnable()) { - try { - EmailBasedMonitor emailBasedMonitor = EmailBasedMonitor.getInstant(BetterGfacImpl.getMonitorPublisher()); - emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); - } catch (ApplicationSettingsException e) { - throw new GFacHandlerException("Error while delegating job execution context to email based monitor"); - } - } else { - List daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - if (daemonHandlers == null) { - daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - } - ThreadedHandler pullMonitorHandler = null; - ThreadedHandler pushMonitorHandler = null; - for (ThreadedHandler threadedHandler : daemonHandlers) { - if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { - pullMonitorHandler = threadedHandler; - pullMonitorHandler.invoke(jobExecutionContext); + public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext) throws GFacHandlerException, AppCatalogException { + if (jobExecutionContext.getPreferredJobSubmissionProtocol()== JobSubmissionProtocol.SSH) { + String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + if (sshJobSubmission.getMonitorMode() == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + EmailMonitorProperty emailMonitorProp = sshJobSubmission.getEmailMonitorProperty(); + if (emailMonitorProp != null) { + EmailMonitorFactory emailMonitorFactory = new EmailMonitorFactory(); + EmailBasedMonitor emailBasedMonitor = emailMonitorFactory.getEmailBasedMonitor(emailMonitorProp); + emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); + return; } - // have to handle the GridPushMonitorHandler logic } - if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { - log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + - ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } + + // if email monitor is not activeated or not configure we use pull or push monitor + List daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + pullMonitorHandler.invoke(jobExecutionContext); } + // have to handle the GridPushMonitorHandler logic } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } + } }