Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C47C171E1 for ; Tue, 11 Nov 2014 20:12:06 +0000 (UTC) Received: (qmail 53957 invoked by uid 500); 11 Nov 2014 20:12:06 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 53905 invoked by uid 500); 11 Nov 2014 20:12:06 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 53816 invoked by uid 99); 11 Nov 2014 20:12:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 20:12:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C8F63A0DE5E; Tue, 11 Nov 2014 20:12:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chathuri@apache.org To: commits@airavata.apache.org Date: Tue, 11 Nov 2014 20:12:07 -0000 Message-Id: <8fe76bface354697a1230c47ff63f3b1@git.apache.org> In-Reply-To: <6c6ea258790d469fadd5d9608689051e@git.apache.org> References: <6c6ea258790d469fadd5d9608689051e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] airavata git commit: adding BES provider changes adding BES provider changes Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/04f09e7d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/04f09e7d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/04f09e7d Branch: refs/heads/master Commit: 04f09e7d424ee94d514c6423cd5f524eb7d53181 Parents: f29dfbe Author: chathuriw Authored: Fri Oct 31 14:40:50 2014 -0400 Committer: Chathuri Wimalasena Committed: Fri Oct 31 14:40:50 2014 -0400 ---------------------------------------------------------------------- .../gfac/bes/handlers/AbstractSMSHandler.java | 74 ++-- .../gfac/bes/provider/impl/BESProvider.java | 378 +++++++++---------- .../bes/security/UNICORESecurityContext.java | 4 +- .../gfac/bes/utils/ApplicationProcessor.java | 212 ++++------- .../airavata/gfac/core/utils/GFacUtils.java | 23 +- .../apache/airavata/gfac/ec2/EC2Provider.java | 15 +- 6 files changed, 306 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java index 8f6fcf4..71ca0db 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/AbstractSMSHandler.java @@ -2,6 +2,7 @@ package org.apache.airavata.gfac.bes.handlers; import java.util.Properties; +import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.bes.security.UNICORESecurityContext; import org.apache.airavata.gfac.bes.security.X509SecurityContext; @@ -13,6 +14,7 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.GFacHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; import org.apache.airavata.schemas.gfac.JobDirectoryModeDocument.JobDirectoryMode; @@ -43,42 +45,42 @@ public abstract class AbstractSMSHandler implements BESConstants, GFacHandler{ @Override public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - - // if not SMS then not to pass further -// if(!isSMSEnabled(jobExecutionContext)) return; - - initSecurityProperties(jobExecutionContext); - + try { + initSecurityProperties(jobExecutionContext); + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol(); + String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId(); + String factoryUrl = null; + if (protocol.equals(JobSubmissionProtocol.UNICORE)) { + UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId); + factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); + } + storageClient = null; - - UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription() - .getType(); - String factoryUrl = host.getUnicoreBESEndPointArray()[0]; - - storageClient = null; - - if(!isSMSInstanceExisting(jobExecutionContext)) { - EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance(); - eprt.addNewAddress().setStringValue(factoryUrl); - StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null); - try { - storageClient = storageCreator.createStorage(); - } catch (Exception e2) { - log.error("Cannot create storage.."); - throw new GFacHandlerException("Cannot create storage..", e2); + if (!isSMSInstanceExisting(jobExecutionContext)) { + EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance(); + eprt.addNewAddress().setStringValue(factoryUrl); + StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, null); + try { + storageClient = storageCreator.createStorage(); + } catch (Exception e2) { + log.error("Cannot create storage.."); + throw new GFacHandlerException("Cannot create storage..", e2); + } + jobExecutionContext.setProperty(PROP_SMS_EPR, storageClient.getEPR()); + } else { + EndpointReferenceType eprt = (EndpointReferenceType) jobExecutionContext.getProperty(PROP_SMS_EPR); + try { + storageClient = new StorageClient(eprt, secProperties); + } catch (Exception e) { + throw new GFacHandlerException("Cannot create storage..", e); + } } - jobExecutionContext.setProperty(PROP_SMS_EPR, storageClient.getEPR()); - } - else { - EndpointReferenceType eprt = (EndpointReferenceType)jobExecutionContext.getProperty(PROP_SMS_EPR); - try { - storageClient = new StorageClient(eprt, secProperties); - } catch (Exception e) { - throw new GFacHandlerException("Cannot create storage..", e); - } + dataTransferrer = new DataTransferrer(jobExecutionContext, storageClient); + } catch (AppCatalogException e) { + throw new GFacHandlerException("Error occurred while retrieving unicore job submission interface..", e); } - dataTransferrer = new DataTransferrer(jobExecutionContext, storageClient); - } + } protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacHandlerException{ log.debug("Initializing SMSInHandler security properties .."); @@ -136,9 +138,9 @@ public abstract class AbstractSMSHandler implements BESConstants, GFacHandler{ * of the job execution context. * */ protected boolean isSMSEnabled(JobExecutionContext jobExecutionContext){ - if(((UnicoreHostType)jobExecutionContext.getApplicationContext().getHostDescription().getType()).getJobDirectoryMode() == JobDirectoryMode.SMS_BYTE_IO) { - return true; - } +// if(((UnicoreHostType)jobExecutionContext.getApplicationContext().getHostDescription().getType()).getJobDirectoryMode() == JobDirectoryMode.SMS_BYTE_IO) { +// return true; +// } return false; } http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index 7ed038a..398f05c 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -23,6 +23,7 @@ package org.apache.airavata.gfac.bes.provider.impl; import java.util.Calendar; import java.util.Map; +import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.bes.security.UNICORESecurityContext; @@ -40,6 +41,9 @@ import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.GFacProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.schemas.gfac.UnicoreHostType; @@ -101,209 +105,165 @@ public class BESProvider extends AbstractProvider implements GFacProvider, public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - UnicoreHostType host = (UnicoreHostType) jobExecutionContext - .getApplicationContext().getHostDescription().getType(); - - String factoryUrl = host.getUnicoreBESEndPointArray()[0]; - - EndpointReferenceType eprt = EndpointReferenceType.Factory - .newInstance(); - eprt.addNewAddress().setStringValue(factoryUrl); - - // WSUtilities.addServerIdentity(eprt, serverDN); - - String userDN = getUserName(jobExecutionContext); - - // TODO: to be removed - if (userDN == null || userDN.equalsIgnoreCase("admin")) { - userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE"; - } - - StorageClient sc = null; - - try { - - CreateActivityDocument cad = CreateActivityDocument.Factory - .newInstance(); - JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory - .newInstance(); - -// String xlogin = getCNFromUserDN(userDN); - - // create storage - StorageCreator storageCreator = new StorageCreator(secProperties, - factoryUrl, 5, null); - - try { - sc = storageCreator.createStorage(); - } catch (Exception e2) { - log.error("Cannot create storage.."); - throw new GFacProviderException("Cannot create storage..", e2); - } - - JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition(); - try { - jobDefinition = JSDLGenerator.buildJSDLInstance( - jobExecutionContext, sc.getUrl()).getJobDefinition(); - cad.addNewCreateActivity().addNewActivityDocument() - .setJobDefinition(jobDefinition); - log.info("JSDL" + jobDefDoc.toString()); - } catch (Exception e1) { - throw new GFacProviderException( - "Cannot generate JSDL instance from the JobExecutionContext.", - e1); - } - - // upload files if any - DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc); - dt.uploadLocalFiles(); - - FactoryClient factory = null; - JobDetails jobDetails = new JobDetails(); - - try { - factory = new FactoryClient(eprt, secProperties); - } catch (Exception e) { - throw new GFacProviderException(e.getLocalizedMessage(), e); - } - CreateActivityResponseDocument response = null; - try { - log.info(String.format("Activity Submitting to %s ... \n", - factoryUrl)); - jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); - response = factory.createActivity(cad); - log.info(String.format("Activity Submitted to %s \n", factoryUrl)); - } catch (Exception e) { - throw new GFacProviderException("Cannot create activity.", e); - } - EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); - - log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted."); - - // factory.waitWhileActivityIsDone(activityEpr, 1000); - jobId = WSUtilities.extractResourceID(activityEpr); - if (jobId == null) { - jobId = new Long(Calendar.getInstance().getTimeInMillis()) - .toString(); - } - log.info("JobID: " + jobId); - jobDetails.setJobID(activityEpr.toString()); - jobDetails.setJobDescription(activityEpr.toString()); - - jobExecutionContext.setJobDetails(jobDetails); - try { - log.info(formatStatusMessage(activityEpr.getAddress() - .getStringValue(), factory.getActivityStatus(activityEpr) - .toString())); - - jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId)); - GFacUtils.saveJobStatus(jobExecutionContext, details,JobState.SUBMITTED); - - factory.getActivityStatus(activityEpr); - log.info(formatStatusMessage(activityEpr.getAddress() - .getStringValue(), factory.getActivityStatus(activityEpr) - .toString())); - - // TODO publish the status messages to the message bus - while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) - && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) - && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) { - - ActivityStatusType activityStatus = null; - try { - activityStatus = getStatus(factory, activityEpr); - JobState applicationJobStatus = getApplicationJobStatus(activityStatus); - String jobStatusMessage = "Status of job " + jobId + "is " - + applicationJobStatus; - GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, - applicationJobStatus); - - jobExecutionContext.getNotifier().publish( - new StatusChangeEvent(jobStatusMessage)); - - // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, - // applicationJobStatus); - } catch (UnknownActivityIdentifierFault e) { - throw new GFacProviderException(e.getMessage(), - e.getCause()); - } - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } - continue; - } - }catch(Exception e) { - throw new GFacProviderException(e.getMessage(), - e.getCause()); - - } - - ActivityStatusType activityStatus = null; - try { - activityStatus = getStatus(factory, activityEpr); - log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString())); - ActivityClient activityClient; - activityClient = new ActivityClient(activityEpr,secProperties); - dt.setStorageClient(activityClient.getUspaceClient()); - } catch (Exception e1) { - throw new GFacProviderException(e1.getMessage(), - e1.getCause()); - } - - - - if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) { - String error = activityStatus.getFault().getFaultcode() - .getLocalPart() - + "\n" - + activityStatus.getFault().getFaultstring() - + "\n EXITCODE: " + activityStatus.getExitCode(); - log.info(error); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } - dt.downloadStdOuts(); - } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { - JobState applicationJobStatus = JobState.CANCELED; - String jobStatusMessage = "Status of job " + jobId + "is " - + applicationJobStatus; - jobExecutionContext.getNotifier().publish( - new StatusChangeEvent(jobStatusMessage)); - GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, - applicationJobStatus); - throw new GFacProviderException( - jobExecutionContext.getExperimentID() + "Job Canceled"); - } - - else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } - if (activityStatus.getExitCode() == 0) { - dt.downloadRemoteFiles(); - } else { - dt.downloadStdOuts(); - } - } - - } finally { - // destroy sms instance - try { - if (sc != null) { - sc.destroy(); - } - } catch (Exception e) { - log.warn( - "Cannot destroy temporary SMS instance:" + sc.getUrl(), - e); - } - } - - } + StorageClient sc = null; + try { + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol(); + String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId(); + String factoryUrl = null; + if (protocol.equals(JobSubmissionProtocol.UNICORE)) { + UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId); + factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); + } + EndpointReferenceType eprt = EndpointReferenceType.Factory + .newInstance(); + eprt.addNewAddress().setStringValue(factoryUrl); + String userDN = getUserName(jobExecutionContext); + + // TODO: to be removed + if (userDN == null || userDN.equalsIgnoreCase("admin")) { + userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE"; + } + CreateActivityDocument cad = CreateActivityDocument.Factory + .newInstance(); + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + + // create storage + StorageCreator storageCreator = new StorageCreator(secProperties, + factoryUrl, 5, null); + sc = storageCreator.createStorage(); + + JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance( + jobExecutionContext, sc.getUrl()).getJobDefinition(); + cad.addNewCreateActivity().addNewActivityDocument() + .setJobDefinition(jobDefinition); + log.info("JSDL" + jobDefDoc.toString()); + + // upload files if any + DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc); + dt.uploadLocalFiles(); + + JobDetails jobDetails = new JobDetails(); + FactoryClient factory = new FactoryClient(eprt, secProperties); + + log.info(String.format("Activity Submitting to %s ... \n", + factoryUrl)); + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + CreateActivityResponseDocument response = factory.createActivity(cad); + log.info(String.format("Activity Submitted to %s \n", factoryUrl)); + + EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier(); + + log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted."); + + // factory.waitWhileActivityIsDone(activityEpr, 1000); + jobId = WSUtilities.extractResourceID(activityEpr); + if (jobId == null) { + jobId = new Long(Calendar.getInstance().getTimeInMillis()) + .toString(); + } + log.info("JobID: " + jobId); + jobDetails.setJobID(activityEpr.toString()); + jobDetails.setJobDescription(activityEpr.toString()); + + jobExecutionContext.setJobDetails(jobDetails); + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId)); + GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED); + + factory.getActivityStatus(activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress() + .getStringValue(), factory.getActivityStatus(activityEpr) + .toString())); + + // TODO publish the status messages to the message bus + while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) { + + ActivityStatusType activityStatus = getStatus(factory, activityEpr); + JobState applicationJobStatus = getApplicationJobStatus(activityStatus); + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, + applicationJobStatus); + + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + + // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, + // applicationJobStatus); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + continue; + } + + ActivityStatusType activityStatus = null; + activityStatus = getStatus(factory, activityEpr); + log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString())); + ActivityClient activityClient; + activityClient = new ActivityClient(activityEpr, secProperties); + dt.setStorageClient(activityClient.getUspaceClient()); + + if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) { + String error = activityStatus.getFault().getFaultcode() + .getLocalPart() + + "\n" + + activityStatus.getFault().getFaultstring() + + "\n EXITCODE: " + activityStatus.getExitCode(); + log.info(error); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + dt.downloadStdOuts(); + } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { + JobState applicationJobStatus = JobState.CANCELED; + String jobStatusMessage = "Status of job " + jobId + "is " + + applicationJobStatus; + jobExecutionContext.getNotifier().publish( + new StatusChangeEvent(jobStatusMessage)); + GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, + applicationJobStatus); + throw new GFacProviderException( + jobExecutionContext.getExperimentID() + "Job Canceled"); + } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + if (activityStatus.getExitCode() == 0) { + dt.downloadRemoteFiles(); + } else { + dt.downloadStdOuts(); + } + } + } catch (AppCatalogException e) { + log.error("Error while retrieving UNICORE job submission.."); + throw new GFacProviderException("Error while retrieving UNICORE job submission..", e); + } catch (Exception e) { + log.error("Cannot create storage.."); + throw new GFacProviderException("Cannot create storage..", e); + } finally { + // destroy sms instance + try { + if (sc != null) { + sc.destroy(); + } + } catch (Exception e) { + log.warn( + "Cannot destroy temporary SMS instance:" + sc.getUrl(), + e); + } + } + + } private JobState getApplicationJobStatus(ActivityStatusType activityStatus) { if (activityStatus == null) { @@ -368,10 +328,14 @@ public class BESProvider extends AbstractProvider implements GFacProvider, // initSecurityProperties(jobExecutionContext); EndpointReferenceType eprt = EndpointReferenceType.Factory .parse(activityEpr); - UnicoreHostType host = (UnicoreHostType) jobExecutionContext - .getApplicationContext().getHostDescription().getType(); - - String factoryUrl = host.getUnicoreBESEndPointArray()[0]; + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol protocol = preferredJobSubmissionInterface.getJobSubmissionProtocol(); + String interfaceId = preferredJobSubmissionInterface.getJobSubmissionInterfaceId(); + String factoryUrl = null; + if (protocol.equals(JobSubmissionProtocol.UNICORE)) { + UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId); + factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); + } EndpointReferenceType epr = EndpointReferenceType.Factory .newInstance(); epr.addNewAddress().setStringValue(factoryUrl); http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java index 7285c2c..855335f 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/UNICORESecurityContext.java @@ -38,7 +38,7 @@ public class UNICORESecurityContext extends X509SecurityContext { * @return an instance of the default client configuration * @throws GFacException * @throws ApplicationSettingsException - * @throws GFacProviderException + * @throws GFacException, ApplicationSettingsException */ public DefaultClientConfiguration getDefaultConfiguration() throws GFacException, ApplicationSettingsException { try{ @@ -69,7 +69,7 @@ public class UNICORESecurityContext extends X509SecurityContext { * @param caKeyPath * @param caKeyPwd * @return - * @throws GFacProviderException + * @throws GFacException */ public DefaultClientConfiguration getServerSignedConfiguration(String userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) throws GFacException { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java index d624340..ee58565 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java @@ -22,21 +22,18 @@ package org.apache.airavata.gfac.bes.utils; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; import org.apache.airavata.schemas.gfac.ExtendedKeyValueType; import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; -import org.apache.airavata.schemas.gfac.JobTypeType; -import org.apache.airavata.schemas.gfac.NameValuePairType; import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType; import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType; -import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.EnvironmentType; import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType; import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType; import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType; import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType; import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType; -import java.io.File; - public class ApplicationProcessor { @@ -47,40 +44,50 @@ public class ApplicationProcessor { userName = "CN=zdv575, O=Ultrascan Gateway, C=DE"; } - HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) context - .getApplicationContext().getApplicationDeploymentDescription() - .getType(); - - createGenericApplication(value, appDepType); - - if (appDepType.getApplicationEnvironmentArray().length > 0) { - createApplicationEnvironment(value, - appDepType.getApplicationEnvironmentArray(), appDepType); - } + ApplicationDeploymentDescription appDep= context.getApplicationContext().getApplicationDeploymentDescription(); + String appname = context.getApplicationContext().getApplicationInterfaceDescription().getApplicationName(); + ApplicationParallelismType parallelism = appDep.getParallelism(); - - if (appDepType.getExecutableLocation() != null) { + ApplicationType appType = JSDLUtils.getOrCreateApplication(value); + appType.setApplicationName(appname); + JSDLUtils.getOrCreateJobIdentification(value).setJobName(appname); + +// if (appDep.getSetEnvironment().size() > 0) { +// createApplicationEnvironment(value, appDep.getSetEnvironment(), parallelism); +// } +// + String stdout = context.getStandardOutput(); + String stderr = context.getStandardError(); + if (appDep.getExecutablePath() != null) { FileNameType fNameType = FileNameType.Factory.newInstance(); - fNameType.setStringValue(appDepType.getExecutableLocation()); - if(isParallelJob(appDepType)) { + fNameType.setStringValue(appDep.getExecutablePath()); + if(parallelism.equals(ApplicationParallelismType.MPI) || parallelism.equals(ApplicationParallelismType.OPENMP_MPI)) { JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType); - JSDLUtils.getSPMDApplication(value).setSPMDVariation(getSPMDVariation(appDepType)); - - if(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES)!=null){ + if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){ + JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value()); + }else if (parallelism.equals(ApplicationParallelismType.MPI)){ + JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value()); + } + + int totalCPUCount = context.getTaskData().getTaskScheduling().getTotalCPUCount(); + if(totalCPUCount > 0){ NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance(); - num.setStringValue(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES)); + num.setStringValue(String.valueOf(totalCPUCount)); JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num); } - - if(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST)!=null){ - ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance(); - pph.setStringValue(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST)); - JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph); - } - - if(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST)!=null){ + + int totalNodeCount = context.getTaskData().getTaskScheduling().getNodeCount(); + if(totalNodeCount > 0){ + int ppn = totalCPUCount / totalNodeCount; + ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance(); + pph.setStringValue(String.valueOf(ppn)); + JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph); + } + + int totalThreadCount = context.getTaskData().getTaskScheduling().getNumberOfThreads(); + if(totalThreadCount > 0){ ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance(); - tpp.setStringValue(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST)); + tpp.setStringValue(String.valueOf(totalThreadCount)); JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp); } @@ -90,6 +97,18 @@ public class ApplicationProcessor { userNameType.setStringValue(userName); JSDLUtils.getSPMDApplication(value).setUserName(userNameType); } + if (stdout != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stdout); + JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName); + } + if (stderr != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stderr); + JSDLUtils.getOrCreateSPMDApplication(value).setError(fName); + } + + } else { JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType); @@ -98,17 +117,18 @@ public class ApplicationProcessor { userNameType.setStringValue(userName); JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType); } + if (stdout != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stdout); + JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName); + } + if (stderr != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stderr); + JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName); + } } } - - - String stdout = (appDepType.getStandardOutput() != null) ? new File(appDepType.getStandardOutput()).getName(): "stdout"; - ApplicationProcessor.setApplicationStdOut(value, appDepType, stdout); - - - String stderr = (appDepType.getStandardError() != null) ? new File(appDepType.getStandardError()).getName() : "stderr"; - ApplicationProcessor.setApplicationStdErr(value, appDepType, stderr); - } public static String getUserNameFromContext(JobExecutionContext jobContext) { @@ -117,79 +137,7 @@ public class ApplicationProcessor { //FIXME: Discuss to get user and change this return "admin"; } - public static boolean isParallelJob(HpcApplicationDeploymentType appDepType) { - - boolean isParallel = false; - - if (appDepType.getJobType() != null) { - // TODO set data output directory - int status = appDepType.getJobType().intValue(); - - switch (status) { - // TODO: this check should be done outside this class - case JobTypeType.INT_MPI: - case JobTypeType.INT_OPEN_MP: - isParallel = true; - break; - - case JobTypeType.INT_SERIAL: - case JobTypeType.INT_SINGLE: - isParallel = false; - break; - default: - isParallel = false; - break; - } - } - return isParallel; - } - - - public static void createApplicationEnvironment(JobDefinitionType value, NameValuePairType[] nameValuePairs, HpcApplicationDeploymentType appDepType) { - - if(isParallelJob(appDepType)) { - for (NameValuePairType nv : nameValuePairs) { - EnvironmentType envType = JSDLUtils.getOrCreateSPMDApplication(value).addNewEnvironment(); - envType.setName(nv.getName()); - envType.setStringValue(nv.getValue()); - } - } - else { - for (NameValuePairType nv : nameValuePairs) { - EnvironmentType envType = JSDLUtils.getOrCreatePOSIXApplication(value).addNewEnvironment(); - envType.setName(nv.getName()); - envType.setStringValue(nv.getValue()); - } - } - - } - - - public static String getSPMDVariation (HpcApplicationDeploymentType appDepType) { - - String variation = null; - - if (appDepType.getJobType() != null) { - // TODO set data output directory - int status = appDepType.getJobType().intValue(); - - switch (status) { - // TODO: this check should be done outside this class - case JobTypeType.INT_MPI: - variation = SPMDVariations.MPI.value(); - break; - - case JobTypeType.INT_OPEN_MP: - variation = SPMDVariations.OpenMPI.value(); - break; - - } - } - return variation; - } - - public static void addApplicationArgument(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stringPrm) { if(isParallelJob(appDepType)) JSDLUtils.getOrCreateSPMDApplication(value) @@ -200,24 +148,6 @@ public class ApplicationProcessor { } - public static void setApplicationStdErr(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) { - FileNameType fName = FileNameType.Factory.newInstance(); - fName.setStringValue(stderr); - if (isParallelJob(appDepType)) - JSDLUtils.getOrCreateSPMDApplication(value).setError(fName); - else - JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName); - } - - public static void setApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) { - FileNameType fName = FileNameType.Factory.newInstance(); - fName.setStringValue(stderr); - if (isParallelJob(appDepType)) - JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName); - else - JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName); - } - public static String getApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType) throws RuntimeException { if (isParallelJob(appDepType)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue(); else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue(); @@ -228,18 +158,14 @@ public class ApplicationProcessor { else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue(); } - public static void createGenericApplication(JobDefinitionType value, HpcApplicationDeploymentType appDepType) { - if (appDepType.getApplicationName() != null) { - ApplicationType appType = JSDLUtils.getOrCreateApplication(value); - String appName = appDepType.getApplicationName() - .getStringValue(); - appType.setApplicationName(appName); - JSDLUtils.getOrCreateJobIdentification(value).setJobName(appName); - } - } - - - public static String getValueFromMap(HpcApplicationDeploymentType appDepType, String name) { + public static void createGenericApplication(JobDefinitionType value, String appName) { + ApplicationType appType = JSDLUtils.getOrCreateApplication(value); + appType.setApplicationName(appName); + JSDLUtils.getOrCreateJobIdentification(value).setJobName(appName); + } + + + public static String getValueFromMap(HpcApplicationDeploymentType appDepType, String name) { ExtendedKeyValueType[] extended = appDepType.getKeyValuePairsArray(); for(ExtendedKeyValueType e: extended) { if(e.getName().equalsIgnoreCase(name)) { http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 695c943..1cb1250 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -39,7 +39,9 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.states.GfacExperimentState; import org.apache.airavata.gfac.core.states.GfacPluginState; +import org.apache.airavata.model.appcatalog.computeresource.GlobusJobSubmission; import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.model.workspace.experiment.DataType; @@ -1257,21 +1259,34 @@ public class GFacUtils { AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId); }catch (Exception e){ - String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId; + String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId; log.error(errorMsg, e); throw new AppCatalogException(errorMsg, e); } } - public static UnicoreJobSubmission getJobSubmission (String submissionId) throws AppCatalogException{ + public static GlobusJobSubmission getGlobusJobSubmission (String submissionId) throws AppCatalogException{ + return null; +// try { +// AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); +// return appCatalog.getComputeResource().getGlobus(submissionId); +// }catch (Exception e){ +// String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId; +// log.error(errorMsg, e); +// throw new AppCatalogException(errorMsg, e); +// } + } + + public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{ try { AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); - return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId); + return appCatalog.getComputeResource().getSSHJobSubmission(submissionId); }catch (Exception e){ - String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId; + String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId; log.error(errorMsg, e); throw new AppCatalogException(errorMsg, e); } } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/04f09e7d/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java index 940fff3..5c5af53 100644 --- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java +++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java @@ -38,6 +38,7 @@ import org.apache.airavata.gfac.core.provider.utils.ProviderUtils; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.ec2.util.AmazonEC2Util; import org.apache.airavata.gfac.ec2.util.EC2ProviderUtil; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType; @@ -90,7 +91,7 @@ public class EC2Provider extends AbstractProvider { public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException{ if (jobExecutionContext != null) { - jobId="EC2_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis(); + jobId="EC2_"+jobExecutionContext.getHostName()+"_"+Calendar.getInstance().getTimeInMillis(); if (jobExecutionContext.getSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT) instanceof AmazonSecurityContext) { this.amazonSecurityContext = (AmazonSecurityContext) jobExecutionContext. @@ -156,10 +157,9 @@ public class EC2Provider extends AbstractProvider { try { String outParamName; - OutputParameterType[] outputParametersArray = jobExecutionContext.getApplicationContext(). - getServiceDescription().getType().getOutputParametersArray(); - if(outputParametersArray != null) { - outParamName = outputParametersArray[0].getParameterName(); + List outputs = jobExecutionContext.getApplicationContext().getApplicationInterfaceDescription().getApplicationOutputs(); + if(outputs != null && !outputs.isEmpty()) { + outParamName = outputs.get(0).getName(); } else { throw new GFacProviderException("Output parameter name is not set. Therefore, not being able " + "to filter the job result from standard out "); @@ -217,11 +217,10 @@ public class EC2Provider extends AbstractProvider { executionResult = executionResult.replace("\r","").replace("\n",""); log.info("Result of the job : " + executionResult); - for(OutputParameterType outparamType : outputParametersArray){ + for(OutputDataObjectType outparamType : outputs){ /* Assuming that there is just a single result. If you want to add more results, update the necessary logic below */ - String paramName = outparamType.getParameterName(); - ActualParameter outParam = new ActualParameter(); + String paramName = outparamType.getName(); outParam.getType().changeType(StringParameterType.type); ((StringParameterType) outParam.getType()).setValue(executionResult); jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);