Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E81AB18B29 for ; Mon, 9 Nov 2015 20:25:57 +0000 (UTC) Received: (qmail 82399 invoked by uid 500); 9 Nov 2015 20:25:57 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 82261 invoked by uid 500); 9 Nov 2015 20:25:57 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 82245 invoked by uid 99); 9 Nov 2015 20:25:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 20:25:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93D11E0850; Mon, 9 Nov 2015 20:25:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Mon, 09 Nov 2015 20:25:58 -0000 Message-Id: <18edfa56a9e94056840a122984650671@git.apache.org> In-Reply-To: <3fbae00eb2ed4bb2b05c623e422e9e9c@git.apache.org> References: <3fbae00eb2ed4bb2b05c623e422e9e9c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] airavata git commit: Added Local batch job submission support and renamed task implementtions to have generic names Added Local batch job submission support and renamed task implementtions to have generic names Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dc1be312 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dc1be312 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dc1be312 Branch: refs/heads/master Commit: dc1be3126409992f6de94d76e0fc4834540e9e9c Parents: c62f74a Author: Shameera Rathnayaka Authored: Mon Nov 9 15:25:52 2015 -0500 Committer: Shameera Rathnayaka Committed: Mon Nov 9 15:25:52 2015 -0500 ---------------------------------------------------------------------- .../server/src/main/resources/gfac-config.yaml | 11 +- .../org/apache/airavata/gfac/impl/Factory.java | 16 +- .../airavata/gfac/impl/GFacEngineImpl.java | 4 +- .../airavata/gfac/impl/LocalCommandOutput.java | 2 +- .../impl/task/AdvancedSCPDataStageTask.java | 344 ----------------- .../airavata/gfac/impl/task/DataStageTask.java | 126 +++++++ .../impl/task/DefaultJobSubmissionTask.java | 286 +++++++++++++++ .../gfac/impl/task/EnvironmentSetupTask.java | 74 ++++ .../gfac/impl/task/ForkJobSubmissionTask.java | 184 ++++++++++ .../gfac/impl/task/SCPDataStageTask.java | 365 +++++++++++++++---- .../gfac/impl/task/SSHEnvironmentSetupTask.java | 74 ---- .../impl/task/SSHForkJobSubmissionTask.java | 184 ---------- .../gfac/impl/task/SSHJobSubmissionTask.java | 286 --------------- 13 files changed, 980 insertions(+), 976 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/configuration/server/src/main/resources/gfac-config.yaml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml index 8dafe09..dff4062 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.yaml +++ b/modules/configuration/server/src/main/resources/gfac-config.yaml @@ -20,7 +20,7 @@ jobSubmitters: - submissionProtocol: SSH - taskClass: org.apache.airavata.gfac.impl.task.SSHJobSubmissionTask + taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask # properties: # - userName: airavata # passPhrase: airavata @@ -29,10 +29,11 @@ jobSubmitters: # hostName: remote.client.hostName - submissionProtocol: SSH_FORK - taskClass: org.apache.airavata.gfac.impl.task.SSHForkJobSubmissionTask + taskClass: org.apache.airavata.gfac.impl.task.ForkJobSubmissionTask - submissionProtocol: LOCAL - taskClass: org.apache.airavata.gfac.impl.task.LocalJobSubmissionTask + taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask + # Following job subbmitters are not yet implemented. @@ -47,14 +48,14 @@ commonTasks: fileTransferTasks: - transferProtocol: SCP - taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask + taskClass: org.apache.airavata.gfac.impl.task.DataStageTask # If your client doen't run the same instance where airavata server is running then you need to comment above # SCPDataStageTask and uncomment AdvancedSCPDataStageTask. To work with AdvancedSCPDataStageTask, you either need to # provide ssh keys or password. # - transferProtocol: SCP -# taskClass: org.apache.airavata.gfac.impl.task.AdvancedSCPDataStageTask +# taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask # properties: # - userName: airavata # passPhrase: airavata http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index e2966c5..4dc63e6 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -207,11 +207,17 @@ public abstract class Factory { String computeResourceId = processContext.getComputeResourceId(); String key = processContext.getJobSubmissionProtocol().toString() + ":" + computeResourceId; RemoteCluster remoteCluster = remoteClusterMap.get(key); - if (remoteCluster == null) { - JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager()); - AuthenticationInfo authenticationInfo = getSSHKeyAuthentication(); - remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo); - remoteClusterMap.put(key, remoteCluster); + JobSubmissionProtocol jobSubmissionProtocol = processContext.getJobSubmissionProtocol(); + if (remoteCluster == null) { + JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager()); + if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL) { + remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null); + } else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || + jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) { + AuthenticationInfo authenticationInfo = getSSHKeyAuthentication(); + remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, authenticationInfo); + } + remoteClusterMap.put(key, remoteCluster); } return remoteCluster; } http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 6444eb4..d386f66 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -35,7 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor; import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask; +import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; @@ -356,7 +356,7 @@ public class GFacEngineImpl implements GFacEngine { EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel(); Task envSetupTask = null; if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) { - envSetupTask = new SSHEnvironmentSetupTask(); + envSetupTask = new EnvironmentSetupTask(); } else { throw new GFacException("Unsupported security protocol, Airavata doesn't support " + subTaskModel.getProtocol().name() + " protocol yet."); http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java index e9d683d..4d98423 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java @@ -55,6 +55,6 @@ public class LocalCommandOutput implements CommandOutput { @Override public int getExitCode() { - return 0; + return process.exitValue(); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java deleted file mode 100644 index 6c4e14e..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.gfac.impl.task; - -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.Session; -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.credential.store.credential.Credential; -import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; -import org.apache.airavata.credential.store.store.CredentialReader; -import org.apache.airavata.credential.store.store.CredentialStoreException; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.core.SSHApiException; -import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; -import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication; -import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication; -import org.apache.airavata.gfac.core.cluster.CommandInfo; -import org.apache.airavata.gfac.core.cluster.RawCommandInfo; -import org.apache.airavata.gfac.core.cluster.RemoteCluster; -import org.apache.airavata.gfac.core.cluster.ServerInfo; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.Task; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.Factory; -import org.apache.airavata.gfac.impl.SSHUtils; -import org.apache.airavata.model.application.io.InputDataObjectType; -import org.apache.airavata.model.application.io.OutputDataObjectType; -import org.apache.airavata.model.commons.ErrorModel; -import org.apache.airavata.model.status.ProcessState; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; -import org.apache.airavata.model.task.DataStagingTaskModel; -import org.apache.airavata.model.task.TaskTypes; -import org.apache.airavata.registry.cpi.ExperimentCatalog; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; - -/** - * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic - * in this class please consider that will works with both input and output cases. - */ -public class AdvancedSCPDataStageTask implements Task { - private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class); - private static final int DEFAULT_SSH_PORT = 22; - private String password; - private String publicKeyPath; - private String passPhrase; - private String privateKeyPath; - private String userName; - private String hostName; - private String inputPath; - - @Override - public void init(Map propertyMap) throws TaskException { - inputPath = propertyMap.get("inputPath"); - hostName = propertyMap.get("hostName"); - userName = propertyMap.get("userName"); - } - - @Override - public TaskStatus execute(TaskContext taskContext) { - TaskStatus status = new TaskStatus(TaskState.EXECUTING); - AuthenticationInfo authenticationInfo = null; - DataStagingTaskModel subTaskModel = null; - String localDataDir = null; - ProcessState processState = taskContext.getParentProcessContext().getProcessState(); - try { - subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel - (taskContext.getTaskModel()); - if (processState == ProcessState.OUTPUT_DATA_STAGING) { - OutputDataObjectType processOutput = taskContext.getProcessOutput(); - if (processOutput != null && processOutput.getValue() == null) { - log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null", - taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), - processOutput.getName()); - status = new TaskStatus(TaskState.FAILED); - if (processOutput.isIsRequired()) { - status.setReason("File name is null, but this output's isRequired bit is not set"); - } else { - status.setReason("File name is null"); - } - return status; - } - } else if (processState == ProcessState.INPUT_DATA_STAGING) { - InputDataObjectType processInput = taskContext.getProcessInput(); - if (processInput != null && processInput.getValue() == null) { - log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null", - taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), - processInput.getName()); - status = new TaskStatus(TaskState.FAILED); - if (processInput.isIsRequired()) { - status.setReason("File name is null, but this input's isRequired bit is not set"); - } else { - status.setReason("File name is null"); - } - return status; - } - } else { - status.setState(TaskState.FAILED); - status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " + - "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name()); - return status; - } - - // use rsync instead of scp if source and destination host and user name is same. - URI sourceURI = new URI(subTaskModel.getSource()); - String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, - sourceURI.getPath().length()); - URI destinationURI = null; - if (subTaskModel.getDestination().startsWith("dummy")) { - destinationURI = getDestinationURI(taskContext, fileName); - subTaskModel.setDestination(destinationURI.toString()); - } else { - destinationURI = new URI(subTaskModel.getDestination()); - } - - if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost()) - && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) { - localDataCopy(taskContext, sourceURI, destinationURI); - status.setState(TaskState.COMPLETED); - status.setReason("Locally copied file using 'cp' command "); - return status; - } - - - String tokenId = taskContext.getParentProcessContext().getTokenId(); - CredentialReader credentialReader = GFacUtils.getCredentialReader(); - Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId); - if (credential instanceof SSHCredential) { - SSHCredential sshCredential = (SSHCredential) credential; - byte[] publicKey = sshCredential.getPublicKey(); - publicKeyPath = writeFileToDisk(publicKey); - byte[] privateKey = sshCredential.getPrivateKey(); - privateKeyPath = writeFileToDisk(privateKey); - passPhrase = sshCredential.getPassphrase(); -// userName = sshCredential.getPortalUserName(); // this might not same as login user name - authenticationInfo = getSSHKeyAuthentication(); - } else { - String msg = "Provided credential store token is not valid. Please provide the correct credential store token"; - log.error(msg); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(msg); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - return status; - } - status = new TaskStatus(TaskState.COMPLETED); - - ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT); - Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo); - if (processState == ProcessState.INPUT_DATA_STAGING) { - inputDataStaging(taskContext, sshSession, sourceURI, destinationURI); - status.setReason("Successfully staged input data"); - } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { - String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/')); - SSHUtils.makeDirectory(targetPath, sshSession); - // TODO - save updated subtask model with new destination - outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); - status.setReason("Successfully staged output data"); - } - } catch (TException e) { - String msg = "Couldn't create subTask model thrift model"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - return status; - } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) { - String msg = "Failed while reading credentials"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (URISyntaxException e) { - String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " + - "destination : " + subTaskModel.getDestination(); - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (SSHApiException e) { - String msg = "Failed to do scp with compute resource"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (AiravataException e) { - String msg = "Error while creating ssh session with client"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (JSchException | IOException e) { - String msg = "Failed to do scp with client"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (GFacException e) { - String msg = "Failed update experiment and process inputs and outputs"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } - return status; - } - - private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException { - StringBuilder sb = new StringBuilder("rsync -cr "); - sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath()); - CommandInfo commandInfo = new RawCommandInfo(sb.toString()); - taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo); - } - - private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI - destinationURI) throws SSHApiException, IOException, JSchException { - /** - * scp third party file transfer 'to' compute resource. - */ - taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(), - destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO); - } - - private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI) - throws SSHApiException, AiravataException, IOException, JSchException, GFacException { - - /** - * scp third party file transfer 'from' comute resource. - */ - taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(), - destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM); - // update output locations - GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); - GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); - - } - - @Override - public TaskStatus recover(TaskContext taskContext) { - TaskState state = taskContext.getTaskStatus().getState(); - if (state == TaskState.EXECUTING || state == TaskState.CREATED) { - return execute(taskContext); - } else { - // files already transferred or failed - return taskContext.getTaskStatus(); - } - } - - @Override - public TaskTypes getType() { - return TaskTypes.DATA_STAGING; - } - - private SSHPasswordAuthentication getSSHPasswordAuthentication() { - return new SSHPasswordAuthentication(userName, password); - } - - private SSHKeyAuthentication getSSHKeyAuthentication() { - SSHKeyAuthentication sshKA = new SSHKeyAuthentication(); - sshKA.setUserName(userName); - sshKA.setPassphrase(passPhrase); - sshKA.setPrivateKeyFilePath(privateKeyPath); - sshKA.setPublicKeyFilePath(publicKeyPath); - sshKA.setStrictHostKeyChecking("no"); - return sshKA; - } - - private String writeFileToDisk(byte[] data) { - File temp = null; - try { - temp = File.createTempFile("id_rsa", ""); - //write it - FileOutputStream bw = new FileOutputStream(temp); - bw.write(data); - bw.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - return temp.getAbsolutePath(); - } - - public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException { - String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) + - taskContext.getParentProcessContext().getProcessId() + File.separator + fileName; - return new URI("SCP", hostName, filePath, null); - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java new file mode 100644 index 0000000..ab9d562 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.gfac.core.SSHApiException; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.Task; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.DataStagingTaskModel; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class DataStageTask implements Task { + private static final Logger log = LoggerFactory.getLogger(DataStageTask.class); + + @Override + public void init(Map propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus status = new TaskStatus(TaskState.COMPLETED); + if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { + status.setState(TaskState.FAILED); + status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + + taskContext.getTaskModel().getTaskType().toString()); + } else { + try { + DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext + .getTaskModel()); + URI sourceURI = new URI(subTaskModel.getSource()); + URI destinationURI = new URI(subTaskModel.getDestination()); + + ProcessState processState = taskContext.getParentProcessContext().getProcessState(); + if (processState == ProcessState.INPUT_DATA_STAGING) { + /** + * copy local file to compute resource. + */ + taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI + .getPath()); + } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { + /** + * copy remote file from compute resource. + */ + taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI + .getPath()); + } + status.setReason("Successfully staged data"); + } catch (SSHApiException e) { + String msg = "Scp attempt failed"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (TException e) { + String msg = "Invalid task invocation"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (URISyntaxException e) { + String msg = "source or destination is not a valid URI"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + } + return status; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + TaskState state = taskContext.getTaskStatus().getState(); + if (state == TaskState.EXECUTING || state == TaskState.CREATED) { + return execute(taskContext); + } else { + // files already transferred or failed + return taskContext.getTaskStatus(); + } + } + + @Override + public TaskTypes getType() { + return TaskTypes.DATA_STAGING; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java new file mode 100644 index 0000000..020880d --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +package org.apache.airavata.gfac.impl.task; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.*; +import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput; +import org.apache.airavata.gfac.core.cluster.RemoteCluster; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.JobSubmissionTask; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.Factory; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class DefaultJobSubmissionTask implements JobSubmissionTask { + private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class); + @Override + public void init(Map propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext){ + TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. + try { + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + jobModel.setTaskId(taskContext.getTaskId()); + RemoteCluster remoteCluster = processContext.getRemoteCluster(); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext,taskContext); + jobModel.setJobName(jobDescriptor.getJobName()); + ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); + JobManagerConfiguration jConfig = null; + if (resourceJobManager != null) { + jConfig = Factory.getJobManagerConfiguration(resourceJobManager); + } + JobStatus jobStatus = new JobStatus(); + File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig); + if (jobFile != null && jobFile.exists()) { + jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); + JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(), + processContext.getWorkingDir()); + jobModel.setExitCode(jobSubmissionOutput.getExitCode()); + jobModel.setStdErr(jobSubmissionOutput.getStdErr()); + jobModel.setStdOut(jobSubmissionOutput.getStdOut()); + String jobId = jobSubmissionOutput.getJobId(); + if (jobId != null && !jobId.isEmpty()) { + jobModel.setJobId(jobId); + GFacUtils.saveJobModel(processContext, jobModel); + jobStatus.setJobState(JobState.SUBMITTED); + jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext() + .getComputeResourceDescription().getHostName()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatus(jobStatus); + GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + if (verifyJobSubmissionByJobId(remoteCluster, jobId)) { + jobStatus.setJobState(JobState.QUEUED); + jobStatus.setReason("Verification step succeeded"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatus(jobStatus); + GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + } + taskStatus = new TaskStatus(TaskState.COMPLETED); + taskStatus.setReason("Submitted job to compute resource"); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + } else { + int verificationTryCount = 0; + while (verificationTryCount++ < 3) { + String verifyJobId = verifyJobSubmission(remoteCluster, jobModel); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobId = verifyJobId; + jobModel.setJobId(jobId); + GFacUtils.saveJobModel(processContext,jobModel); + jobStatus.setJobState(JobState.QUEUED); + jobStatus.setReason("Verification step succeeded"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatus(jobStatus); + GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + taskStatus.setState(TaskState.COMPLETED); + taskStatus.setReason("Submitted job to compute resource"); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + break; + } + log.info("Verify step return invalid jobId, retry verification step in {} secs", verificationTryCount * 10); + Thread.sleep(verificationTryCount * 10000); + } + } + + if (jobId == null || jobId.isEmpty()) { + String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " + + "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " + + "doesn't return a valid JobId. " + "Hence changing experiment state to Failed"; + log.error(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setUserFriendlyMessage(msg); + errorModel.setActualErrorMessage(msg); + GFacUtils.saveExperimentError(processContext, errorModel); + GFacUtils.saveProcessError(processContext, errorModel); + GFacUtils.saveTaskError(taskContext, errorModel); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason("Couldn't find job id in both submitted and verified steps"); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + }else { + GFacUtils.saveJobModel(processContext, jobModel); + } + } else { + taskStatus.setState(TaskState.FAILED); + if (jobFile == null) { + taskStatus.setReason("JobFile is null"); + } else { + taskStatus.setReason("Job file doesn't exist"); + } + } + + } catch (AppCatalogException e) { + String msg = "Error while instantiating app catalog"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (ApplicationSettingsException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (GFacException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (SSHApiException e) { + String msg = "Error occurred while submitting the job"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (IOException e) { + String msg = "Error while reading the content of the job file"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (InterruptedException e) { + String msg = "Error occurred while verifying the job submission"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + + taskContext.setTaskStatus(taskStatus); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status", e); + } + return taskStatus; + } + + private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException { + JobStatus status = remoteCluster.getJobStatus(jobID); + return status != null && status.getJobState() != JobState.UNKNOWN; + } + + private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) { + String jobName = jobDetails.getJobName(); + String jobId = null; + try { + jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName()); + } catch (SSHApiException e) { + log.error("Error while verifying JobId from JobName"); + } + return jobId; + } + + + @Override + public TaskStatus recover(TaskContext taskContext) { + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + // original job failed before submitting + if (jobModel == null || jobModel.getJobId() == null ){ + return execute(taskContext); + }else { + // job is already submitted and monitor should handle the recovery + return new TaskStatus(TaskState.COMPLETED); + } + } + + @Override + public TaskTypes getType() { + return TaskTypes.JOB_SUBMISSION; + } + + @Override + public JobStatus cancel(TaskContext taskcontext) throws TaskException { + ProcessContext processContext = taskcontext.getParentProcessContext(); + RemoteCluster remoteCluster = processContext.getRemoteCluster(); + JobModel jobModel = processContext.getJobModel(); + int retryCount = 0; + if (jobModel != null) { + try { + JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId()); + while (oldJobStatus == null && retryCount <= 5) { + retryCount++; + Thread.sleep(retryCount * 1000); + oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId()); + } + if (oldJobStatus != null) { + oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId()); + return oldJobStatus; + } else { + throw new TaskException("Cancel operation failed, Job status couldn't find in resource, JobId " + + jobModel.getJobId()); + } + } catch (SSHApiException | InterruptedException e) { + throw new TaskException("Error while cancelling job " + jobModel.getJobId(), e); + } + } else { + throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext."); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java new file mode 100644 index 0000000..fff130c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import org.apache.airavata.gfac.core.SSHApiException; +import org.apache.airavata.gfac.core.cluster.RemoteCluster; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.Task; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class EnvironmentSetupTask implements Task { + + private static final Logger log = LoggerFactory.getLogger(EnvironmentSetupTask.class); + @Override + public void init(Map propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus status = new TaskStatus(TaskState.COMPLETED); + try { + RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster(); + remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir()); + status.setReason("Successfully created environment"); + } catch (SSHApiException e) { + String msg = "Error while environment setup"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + return status; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + return execute(taskContext); + } + + @Override + public TaskTypes getType() { + return TaskTypes.ENV_SETUP; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java new file mode 100644 index 0000000..ed75fef --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +package org.apache.airavata.gfac.impl.task; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.*; +import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput; +import org.apache.airavata.gfac.core.cluster.RemoteCluster; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.JobSubmissionTask; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.Factory; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class ForkJobSubmissionTask implements JobSubmissionTask { + private static final Logger log = LoggerFactory.getLogger(ForkJobSubmissionTask.class); + @Override + public void init(Map propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); + try { + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + jobModel.setTaskId(taskContext.getTaskId()); + RemoteCluster remoteCluster = processContext.getRemoteCluster(); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext, taskContext); + jobModel.setJobName(jobDescriptor.getJobName()); + ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); + JobManagerConfiguration jConfig = null; + if (resourceJobManager != null) { + jConfig = Factory.getJobManagerConfiguration(resourceJobManager); + } + JobStatus jobStatus = new JobStatus(); + File jobFile = GFacUtils.createJobFile(taskContext, jobDescriptor, jConfig); + if (jobFile != null && jobFile.exists()) { + jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); + JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(), + processContext.getWorkingDir()); + jobModel.setExitCode(jobSubmissionOutput.getExitCode()); + jobModel.setStdErr(jobSubmissionOutput.getStdErr()); + jobModel.setStdOut(jobSubmissionOutput.getStdOut()); + String jobId = jobSubmissionOutput.getJobId(); + if (jobId != null && !jobId.isEmpty()) { + jobModel.setJobId(jobId); + GFacUtils.saveJobModel(processContext, jobModel); + jobStatus.setJobState(JobState.SUBMITTED); + jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext() + .getComputeResourceDescription().getHostName()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatus(jobStatus); + GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + taskStatus = new TaskStatus(TaskState.COMPLETED); + taskStatus.setReason("Submitted job to compute resource"); + } + if (jobId == null || jobId.isEmpty()) { + String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " + + "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " + + "doesn't return a valid JobId. " + "Hence changing experiment state to Failed"; + log.error(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(msg); + errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + GFacUtils.saveExperimentError(processContext, errorModel); + GFacUtils.saveProcessError(processContext, errorModel); + GFacUtils.saveTaskError(taskContext, errorModel); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason("Couldn't find job id in both submitted and verified steps"); + }else { + GFacUtils.saveJobModel(processContext, jobModel); + } + } else { + taskStatus.setState(TaskState.FAILED); + if (jobFile == null) { + taskStatus.setReason("JobFile is null"); + } else { + taskStatus.setReason("Job file doesn't exist"); + } + } + } catch (ApplicationSettingsException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (AppCatalogException e) { + String msg = "Error while instantiating app catalog"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (GFacException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (SSHApiException e) { + String msg = "Error occurred while submitting the job"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (IOException e) { + String msg = "Error while reading the content of the job file"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + return taskStatus; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + //TODO implement recovery scenario instead of calling execute. + return execute(taskContext); + } + + @Override + public TaskTypes getType() { + return TaskTypes.JOB_SUBMISSION; + } + + @Override + public JobStatus cancel(TaskContext taskcontext) { + // TODO - implement cancel with SSH Fork + return null; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 32ee31b..678ded1 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -20,11 +20,32 @@ */ package org.apache.airavata.gfac.impl.task; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.credential.store.store.CredentialStoreException; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.SSHApiException; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication; +import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication; +import org.apache.airavata.gfac.core.cluster.CommandInfo; +import org.apache.airavata.gfac.core.cluster.RawCommandInfo; +import org.apache.airavata.gfac.core.cluster.RemoteCluster; +import org.apache.airavata.gfac.core.cluster.ServerInfo; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.Factory; +import org.apache.airavata.gfac.impl.SSHUtils; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.TaskState; @@ -35,81 +56,240 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; +/** + * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic + * in this class please consider that will works with both input and output cases. + */ public class SCPDataStageTask implements Task { - private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class); - - @Override - public void init(Map propertyMap) throws TaskException { - - } - - @Override - public TaskStatus execute(TaskContext taskContext) { - TaskStatus status = new TaskStatus(TaskState.COMPLETED); - if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { - status.setState(TaskState.FAILED); - status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " - + taskContext.getTaskModel().getTaskType().toString()); - } else { - try { - DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext - .getTaskModel()); - URI sourceURI = new URI(subTaskModel.getSource()); - URI destinationURI = new URI(subTaskModel.getDestination()); - - ProcessState processState = taskContext.getParentProcessContext().getProcessState(); - if (processState == ProcessState.INPUT_DATA_STAGING) { - /** - * copy local file to compute resource. - */ - taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI - .getPath()); - } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { - /** - * copy remote file from compute resource. - */ - taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI - .getPath()); - } - status.setReason("Successfully staged data"); - } catch (SSHApiException e) { - String msg = "Scp attempt failed"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (TException e) { - String msg = "Invalid task invocation"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } catch (URISyntaxException e) { - String msg = "source or destination is not a valid URI"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } - } - return status; - } - - @Override - public TaskStatus recover(TaskContext taskContext) { + private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class); + private static final int DEFAULT_SSH_PORT = 22; + private String password; + private String publicKeyPath; + private String passPhrase; + private String privateKeyPath; + private String userName; + private String hostName; + private String inputPath; + + @Override + public void init(Map propertyMap) throws TaskException { + inputPath = propertyMap.get("inputPath"); + hostName = propertyMap.get("hostName"); + userName = propertyMap.get("userName"); + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus status = new TaskStatus(TaskState.EXECUTING); + AuthenticationInfo authenticationInfo = null; + DataStagingTaskModel subTaskModel = null; + String localDataDir = null; + ProcessState processState = taskContext.getParentProcessContext().getProcessState(); + try { + subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel + (taskContext.getTaskModel()); + if (processState == ProcessState.OUTPUT_DATA_STAGING) { + OutputDataObjectType processOutput = taskContext.getProcessOutput(); + if (processOutput != null && processOutput.getValue() == null) { + log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null", + taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), + processOutput.getName()); + status = new TaskStatus(TaskState.FAILED); + if (processOutput.isIsRequired()) { + status.setReason("File name is null, but this output's isRequired bit is not set"); + } else { + status.setReason("File name is null"); + } + return status; + } + } else if (processState == ProcessState.INPUT_DATA_STAGING) { + InputDataObjectType processInput = taskContext.getProcessInput(); + if (processInput != null && processInput.getValue() == null) { + log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null", + taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), + processInput.getName()); + status = new TaskStatus(TaskState.FAILED); + if (processInput.isIsRequired()) { + status.setReason("File name is null, but this input's isRequired bit is not set"); + } else { + status.setReason("File name is null"); + } + return status; + } + } else { + status.setState(TaskState.FAILED); + status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " + + "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name()); + return status; + } + + // use rsync instead of scp if source and destination host and user name is same. + URI sourceURI = new URI(subTaskModel.getSource()); + String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1, + sourceURI.getPath().length()); + URI destinationURI = null; + if (subTaskModel.getDestination().startsWith("dummy")) { + destinationURI = getDestinationURI(taskContext, fileName); + subTaskModel.setDestination(destinationURI.toString()); + } else { + destinationURI = new URI(subTaskModel.getDestination()); + } + + if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost()) + && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) { + localDataCopy(taskContext, sourceURI, destinationURI); + status.setState(TaskState.COMPLETED); + status.setReason("Locally copied file using 'cp' command "); + return status; + } + + + String tokenId = taskContext.getParentProcessContext().getTokenId(); + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + Credential credential = credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(), tokenId); + if (credential instanceof SSHCredential) { + SSHCredential sshCredential = (SSHCredential) credential; + byte[] publicKey = sshCredential.getPublicKey(); + publicKeyPath = writeFileToDisk(publicKey); + byte[] privateKey = sshCredential.getPrivateKey(); + privateKeyPath = writeFileToDisk(privateKey); + passPhrase = sshCredential.getPassphrase(); +// userName = sshCredential.getPortalUserName(); // this might not same as login user name + authenticationInfo = getSSHKeyAuthentication(); + } else { + String msg = "Provided credential store token is not valid. Please provide the correct credential store token"; + log.error(msg); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(msg); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + return status; + } + status = new TaskStatus(TaskState.COMPLETED); + + ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT); + Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo); + if (processState == ProcessState.INPUT_DATA_STAGING) { + inputDataStaging(taskContext, sshSession, sourceURI, destinationURI); + status.setReason("Successfully staged input data"); + } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { + String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/')); + SSHUtils.makeDirectory(targetPath, sshSession); + // TODO - save updated subtask model with new destination + outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); + status.setReason("Successfully staged output data"); + } + } catch (TException e) { + String msg = "Couldn't create subTask model thrift model"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + return status; + } catch (ApplicationSettingsException | FileNotFoundException | CredentialStoreException | IllegalAccessException | InstantiationException e) { + String msg = "Failed while reading credentials"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (URISyntaxException e) { + String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource() + ", " + + "destination : " + subTaskModel.getDestination(); + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (SSHApiException e) { + String msg = "Failed to do scp with compute resource"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (AiravataException e) { + String msg = "Error while creating ssh session with client"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (JSchException | IOException e) { + String msg = "Failed to do scp with client"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (GFacException e) { + String msg = "Failed update experiment and process inputs and outputs"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + return status; + } + + private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws SSHApiException { + StringBuilder sb = new StringBuilder("rsync -cr "); + sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath()); + CommandInfo commandInfo = new RawCommandInfo(sb.toString()); + taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo); + } + + private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI + destinationURI) throws SSHApiException, IOException, JSchException { + /** + * scp third party file transfer 'to' compute resource. + */ + taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(), + destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO); + } + + private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI) + throws SSHApiException, AiravataException, IOException, JSchException, GFacException { + + /** + * scp third party file transfer 'from' comute resource. + */ + taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(), + destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM); + // update output locations + GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); + GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); + + } + + @Override + public TaskStatus recover(TaskContext taskContext) { TaskState state = taskContext.getTaskStatus().getState(); if (state == TaskState.EXECUTING || state == TaskState.CREATED) { return execute(taskContext); @@ -117,10 +297,45 @@ public class SCPDataStageTask implements Task { // files already transferred or failed return taskContext.getTaskStatus(); } - } + } + + @Override + public TaskTypes getType() { + return TaskTypes.DATA_STAGING; + } + + private SSHPasswordAuthentication getSSHPasswordAuthentication() { + return new SSHPasswordAuthentication(userName, password); + } + + private SSHKeyAuthentication getSSHKeyAuthentication() { + SSHKeyAuthentication sshKA = new SSHKeyAuthentication(); + sshKA.setUserName(userName); + sshKA.setPassphrase(passPhrase); + sshKA.setPrivateKeyFilePath(privateKeyPath); + sshKA.setPublicKeyFilePath(publicKeyPath); + sshKA.setStrictHostKeyChecking("no"); + return sshKA; + } + + private String writeFileToDisk(byte[] data) { + File temp = null; + try { + temp = File.createTempFile("id_rsa", ""); + //write it + FileOutputStream bw = new FileOutputStream(temp); + bw.write(data); + bw.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + return temp.getAbsolutePath(); + } + + public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException { + String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator) + + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName; + return new URI("SCP", hostName, filePath, null); - @Override - public TaskTypes getType() { - return TaskTypes.DATA_STAGING; - } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java deleted file mode 100644 index d28ae3f..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.gfac.impl.task; - -import org.apache.airavata.gfac.core.SSHApiException; -import org.apache.airavata.gfac.core.cluster.RemoteCluster; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.Task; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.model.commons.ErrorModel; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; -import org.apache.airavata.model.task.TaskTypes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class SSHEnvironmentSetupTask implements Task { - - private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class); - @Override - public void init(Map propertyMap) throws TaskException { - - } - - @Override - public TaskStatus execute(TaskContext taskContext) { - TaskStatus status = new TaskStatus(TaskState.COMPLETED); - try { - RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster(); - remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir()); - status.setReason("Successfully created environment"); - } catch (SSHApiException e) { - String msg = "Error while environment setup"; - log.error(msg, e); - status.setState(TaskState.FAILED); - status.setReason(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskError(errorModel); - } - return status; - } - - @Override - public TaskStatus recover(TaskContext taskContext) { - return execute(taskContext); - } - - @Override - public TaskTypes getType() { - return TaskTypes.ENV_SETUP; - } -}