airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Added AdvancedSCPDataStageTask for both input and output data staging with remote client
Date Fri, 31 Jul 2015 17:20:17 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 67839c0f5 -> 86eac5d92


Added AdvancedSCPDataStageTask for both input and output data staging with remote client


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/86eac5d9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/86eac5d9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/86eac5d9

Branch: refs/heads/master
Commit: 86eac5d929eeb9dbbeb09be02e46054f69083c3c
Parents: 67839c0
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Fri Jul 31 13:20:10 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Fri Jul 31 13:20:10 2015 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  |  78 +++++++
 .../impl/task/AdvancedSCPDataStageTask.java     | 231 +++++++++++++++++++
 .../gfac/impl/task/SCPDataStageTask.java        |  12 +-
 3 files changed, 319 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/86eac5d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a5fa5ed..4bfb6cf 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -21,6 +21,10 @@
 package org.apache.airavata.gfac.impl;
 
 import com.google.common.eventbus.EventBus;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.LocalEventPublisher;
@@ -103,6 +107,7 @@ public abstract class Factory {
 	private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
 	private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
 	private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
+	private static Map<String, Session> sessionMap = new HashMap<>();
 
 	public static GFacEngine getGFacEngine() throws GFacException {
 		if (engine == null) {
@@ -299,4 +304,77 @@ public abstract class Factory {
 		return jobMonitor;
 	}
 
+	public static Session getSSHSession(AuthenticationInfo authenticationInfo, ServerInfo serverInfo)
throws AiravataException {
+		SSHKeyAuthentication authentication = null;
+		String key = serverInfo.getUserName() + "_" + serverInfo.getHost() + "_" + serverInfo.getPort();
+		if (sessionMap.get(key) == null) {
+			try {
+				if (authenticationInfo instanceof SSHKeyAuthentication) {
+					authentication = (SSHKeyAuthentication) authenticationInfo;
+				} else {
+					throw new AiravataException("Support ssh key authentication only");
+				}
+				JSch jSch = new JSch();
+				jSch.addIdentity(authentication.getPrivateKeyFilePath(), authentication.getPublicKeyFilePath(),
+						authentication.getPassphrase().getBytes());
+				Session session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(),
+						serverInfo.getPort());
+				session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+				if (authentication.getStrictHostKeyChecking().equals("yes")) {
+					jSch.setKnownHosts(authentication.getKnownHostsFilePath());
+				} else {
+					session.setConfig("StrictHostKeyChecking","no");
+				}
+				session.connect(); // 0 connection timeout
+				sessionMap.put(key, session);
+			} catch (JSchException e) {
+				throw new AiravataException("JSch initialization error ", e);
+			}
+		}
+		return sessionMap.get(key);
+
+	}
+
+	private static class DefaultUserInfo implements UserInfo {
+
+		private String userName;
+		private String password;
+		private String passphrase;
+
+		public DefaultUserInfo(String userName, String password, String passphrase) {
+			this.userName = userName;
+			this.password = password;
+			this.passphrase = passphrase;
+		}
+
+		@Override
+		public String getPassphrase() {
+			return null;
+		}
+
+		@Override
+		public String getPassword() {
+			return null;
+		}
+
+		@Override
+		public boolean promptPassword(String s) {
+			return false;
+		}
+
+		@Override
+		public boolean promptPassphrase(String s) {
+			return false;
+		}
+
+		@Override
+		public boolean promptYesNo(String s) {
+			return false;
+		}
+
+		@Override
+		public void showMessage(String s) {
+
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/86eac5d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
new file mode 100644
index 0000000..45f5ff6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class AdvancedSCPDataStageTask implements Task{
+	private static final Logger log = LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
+	private static final int DEFAULT_SSH_PORT = 22;
+	private String password;
+	private String publicKeyPath;
+	private String passPhrase;
+	private String privateKeyPath;
+	private String userName;
+	private String hostName;
+	private String inputPath;
+
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+		password = propertyMap.get("password");
+		passPhrase = propertyMap.get("passPhrase");
+		privateKeyPath = propertyMap.get("privateKeyPath");
+		publicKeyPath = propertyMap.get("publicKeyPath");
+		userName = propertyMap.get("userName");
+		hostName = propertyMap.get("hostName");
+		inputPath = propertyMap.get("inputPath");
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		AuthenticationInfo authenticationInfo = null;
+		if (password != null) {
+			authenticationInfo = getSSHPasswordAuthentication();
+		} else {
+			authenticationInfo = getSSHKeyAuthentication();
+		}
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		DataStagingTaskModel subTaskModel = null;
+		try {
+			subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+					(taskContext.getTaskModel());
+		}  catch (TException e) {
+			String msg = "Couldn't create subTask model thrift model";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+			return status;
+		}
+
+		try {
+			URI sourceURI = new URI(subTaskModel.getSource());
+			URI destinationURI = new URI(subTaskModel.getDestination());
+
+			File tempOutputDir = getLocalDir(taskContext);
+			if (!tempOutputDir.exists()) {
+				if (!tempOutputDir.mkdirs()) {
+					// failed to create temp output location
+				}
+			}
+
+			String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator)
+ 1,
+					sourceURI.getPath().length());
+			String filePath = tempOutputDir + File.separator + fileName;
+
+			ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
+			Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+			ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+			if (processState == ProcessState.INPUT_DATA_STAGING) {
+				inputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+				status.setReason("Successfully staged input data");
+			}else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+				outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+				status.setReason("Successfully staged output data");
+			} else {
+				status.setState(TaskState.FAILED);
+				status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name()
+ " and " +
+						"" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+			}
+
+		}catch (URISyntaxException e) {
+			String msg = "Sorce or destination uri is not correct source : " + subTaskModel.getSource()
+ ", " +
+					"destination : " + subTaskModel.getDestination();
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+		} catch (SSHApiException e) {
+			String msg = "Failed to do scp with compute resource";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+		} catch (AiravataException e) {
+			String msg = "Error while creating ssh session with client";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+		} catch (JSchException | IOException e ) {
+			String msg = "Failed to do scp with client";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
+		}
+
+		return status;
+	}
+
+	private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI
+			destinationURI, String filePath) throws SSHApiException, IOException, JSchException {
+		/**
+		 * scp remote client file to airavata local dir.
+		 */
+		SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
+
+		/**
+		 * scp local file to compute resource.
+		 */
+		taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURI.getPath(),
filePath);
+	}
+
+	private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI destinationURI,
+	                               String filePath) throws SSHApiException, AiravataException,
IOException, JSchException {
+		/**
+		 * scp remote file from comute resource to airavata local
+		 */
+		taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), filePath);
+
+		/**
+		 * scp local file to remote client
+		 */
+		SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
+	}
+
+	private File getLocalDir(TaskContext taskContext) {
+		if (inputPath == null) {
+			return new File(ServerSettings.getOutputLocation() + taskContext.getParentProcessContext()
+					.getProcessId());
+		} else {
+			return new File(inputPath);
+		}
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+		return null;
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return null;
+	}
+
+	private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+		return new SSHPasswordAuthentication(userName, password);
+	}
+
+	private  SSHKeyAuthentication getSSHKeyAuthentication(){
+		SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+		sshKA.setUserName(userName);
+		sshKA.setPassphrase(passPhrase);
+		sshKA.setPrivateKeyFilePath(privateKeyPath);
+		sshKA.setPublicKeyFilePath(publicKeyPath);
+		sshKA.setStrictHostKeyChecking("no");
+		return sshKA;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/86eac5d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index b2a83ed..eecf57d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -26,6 +26,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
@@ -60,10 +61,17 @@ public class SCPDataStageTask implements Task {
 				URI sourceURI = new URI(subTaskModel.getSource());
 				URI destinationURI = new URI(subTaskModel.getDestination());
 
-				if (sourceURI.getScheme().equalsIgnoreCase("file")) {  //  Airavata --> RemoteCluster
+				ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+				if (processState == ProcessState.INPUT_DATA_STAGING) {
+					/**
+					 * copy local file to compute resource.
+					 */
 					taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
destinationURI
 							.getPath());
-				} else { // RemoteCluster --> Airavata
+				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+					/**
+					 * copy remote file from compute resource.
+					 */
 					taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
destinationURI
 							.getPath());
 				}


Mime
View raw message