airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Added HPCRemoteCluster implementation logic
Date Tue, 16 Jun 2015 18:30:06 GMT
Repository: airavata
Updated Branches:
  refs/heads/master e6622078b -> cb4b40ccb


Added HPCRemoteCluster implementation logic


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cb4b40cc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cb4b40cc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cb4b40cc

Branch: refs/heads/master
Commit: cb4b40ccb8f4bfb563d2b51d578ead64ecd65ba6
Parents: e662207
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Tue Jun 16 14:30:00 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Tue Jun 16 14:30:00 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cluster/JobStatus.java   |  23 ---
 .../gfac/core/cluster/RemoteCluster.java        |  12 +-
 .../gfac/gsi/ssh/impl/HPCRemoteCluster.java     | 171 ++++++++++++++++---
 .../org/apache/airavata/gfac/impl/SSHUtils.java |  23 ++-
 4 files changed, 164 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/cb4b40cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
deleted file mode 100644
index f784aa6..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobStatus.java
+++ /dev/null
@@ -1,23 +0,0 @@
- /*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
- package org.apache.airavata.gfac.core.cluster;
-
-

http://git-wip-us.apache.org/repos/asf/airavata/blob/cb4b40cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index e438a37..b3a8ffb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -47,19 +47,19 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	/**
 	 * This will copy the localFile to remoteFile location in configured cluster
 	 *
-	 * @param sourceFile      remote file location, this can be a directory too
-	 * @param destinationFile local file path of the file which needs to copy to remote location
+	 * @param remoteFile      remote file location, this can be a directory too
+	 * @param localFile local file path of the file which needs to copy to remote location
 	 * @throws SSHApiException throws exception during error
 	 */
-	public void scpTo(String sourceFile, String destinationFile) throws SSHApiException;
+	public void scpTo(String remoteFile, String localFile) throws SSHApiException;
 
 	/**
 	 * This will copy a remote file in path rFile to local file lFile
 	 *
-	 * @param sourceFile      remote file path, this has to be a full qualified path
-	 * @param destinationFile This is the local file to copy, this can be a directory too
+	 * @param remoteFile      remote file path, this has to be a full qualified path
+	 * @param localFile This is the local file to copy, this can be a directory too
 	 */
-	public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException;
+	public void scpFrom(String remoteFile, String localFile) throws SSHApiException;
 
 	/**
 	 * This wil copy source remote file to target remote file.

http://git-wip-us.apache.org/repos/asf/airavata/blob/cb4b40cc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
index e1d9c27..55a0ab6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/HPCRemoteCluster.java
@@ -20,7 +20,6 @@
 */
 package org.apache.airavata.gfac.gsi.ssh.impl;
 
-import com.jcraft.jsch.Channel;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
@@ -32,21 +31,24 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.gfac.core.cluster.CommandInfo;
 import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.OutputParser;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.status.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 
 /**
- * This is the default implementation of a cluster.
- * this has most of the methods to be used by the end user of the
- * library.
+ * One Remote cluster instance for each compute resource.
  */
 public class HPCRemoteCluster implements RemoteCluster{
     private static final Logger log = LoggerFactory.getLogger(HPCRemoteCluster.class);
@@ -55,9 +57,10 @@ public class HPCRemoteCluster implements RemoteCluster{
 	private final JobManagerConfiguration jobManagerConfiguration;
 	private final JSch jSch;
 	private Session session;
+	private OutputParser outputParser;
 
 	public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration,
AuthenticationInfo
-			authenticationInfo) throws AiravataException {
+			authenticationInfo, OutputParser outputParser) throws AiravataException {
 		try {
 			this.serverInfo = serverInfo;
 			this.jobManagerConfiguration = jobManagerConfiguration;
@@ -66,7 +69,7 @@ public class HPCRemoteCluster implements RemoteCluster{
 			} else {
 				throw new AiravataException("Support ssh key authentication only");
 			}
-
+			this.outputParser = outputParser;
 			jSch = new JSch();
 			jSch.addIdentity(authentication.getPrivateKeyFilePath(), authentication.getPublicKeyFilePath(),
authentication
 					.getPassphrase().getBytes());
@@ -80,63 +83,180 @@ public class HPCRemoteCluster implements RemoteCluster{
 
 	@Override
 	public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException
{
-		
-		return null;
+		scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+		RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory,
jobScriptFilePath);
+
+		StandardOutReader reader = new StandardOutReader();
+		executeCommand(submitCommand, reader);
+		throwExceptionOnError(reader, submitCommand);
+		return outputParser.parseJobSubmission(reader.getStdOutputString());
 	}
 
 	@Override
-	public void scpTo(String sourceFile, String destinationFile) throws SSHApiException {
-
+	public void scpTo(String localFile, String remoteFile) throws SSHApiException {
+		int retry = 3;
+		while (retry > 0) {
+			try {
+				if (!session.isConnected()) {
+					session.connect();
+				}
+				log.info("Transferring localhost:" + localFile  + " to " + serverInfo.getHost() + ":"
+ remoteFile);
+				SSHUtils.scpTo(localFile, remoteFile, session);
+				retry = 0;
+			} catch (Exception e) {
+				retry--;
+				if (!session.isConnected()) {
+					try {
+						session.connect();
+					} catch (JSchException e1) {
+						throw new SSHApiException("JSch Session connection failed");
+					}
+				}
+				if (retry == 0) {
+					throw new SSHApiException("Failed to scp localhost:" + localFile + " to " + serverInfo.getHost()
+
+							":" + remoteFile, e);
+				} else {
+					log.info("Retry transfer localhost:" + localFile + " to " + serverInfo.getHost() + ":"
+
+							remoteFile);
+				}
+			}
+		}
 	}
 
 	@Override
-	public void scpFrom(String sourceFile, String destinationFile) throws SSHApiException {
-
+	public void scpFrom(String remoteFile, String localFile) throws SSHApiException {
+		int retry = 3;
+		while(retry>0) {
+			try {
+				if (!session.isConnected()) {
+					session.connect();
+				}
+				log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:"
+ localFile);
+				SSHUtils.scpFrom(remoteFile, localFile, session);
+				retry=0;
+			} catch (Exception e) {
+				retry--;
+				if (!session.isConnected()) {
+					try {
+						session.connect();
+					} catch (JSchException e1) {
+						throw new SSHApiException("JSch Session connection failed");
+					}
+				}
+				if (retry == 0) {
+					throw new SSHApiException("Failed to scp " + serverInfo.getHost() + ":" + remoteFile
+ " to " +
+							"localhost:" + localFile, e);
+				} else {
+					log.info("Retry transfer " + serverInfo.getHost() + ":" + remoteFile + "  to localhost:"
+ localFile);
+				}
+			}
+		}
 	}
 
 	@Override
 	public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException
{
-
+		try {
+			if(!session.isConnected()){
+				session.connect();
+			}
+			log.info("Transferring from:" + remoteFileSource + " To: " + remoteFileTarget);
+			SSHUtils.scpThirdParty(remoteFileSource, remoteFileTarget, session);
+		} catch (IOException | JSchException e) {
+			throw new SSHApiException("Failed scp file:" + remoteFileSource + " to remote file "
+					+remoteFileTarget , e);
+		}
 	}
 
 	@Override
 	public void makeDirectory(String directoryPath) throws SSHApiException {
-
+		try {
+			if (!session.isConnected()) {
+				session.connect();
+			}
+			log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
+			SSHUtils.makeDirectory(directoryPath, session);
+		} catch (JSchException | IOException e) {
+			throw new SSHApiException("Failed to create directory " + serverInfo.getHost() + ":" +
directoryPath);
+		}
 	}
 
 	@Override
-	public boolean cancelJob(String jobID) throws SSHApiException {
-		return false;
+	public boolean cancelJob(String jobId) throws SSHApiException {
+		RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
+		StandardOutReader reader = new StandardOutReader();
+		executeCommand(cancelCommand, reader);
+		throwExceptionOnError(reader, cancelCommand);
+		return true;
 	}
 
 	@Override
-	public JobStatus getJobStatus(String jobID) throws SSHApiException {
-		return null;
+	public JobStatus getJobStatus(String jobId) throws SSHApiException {
+		RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobId);
+		StandardOutReader reader = new StandardOutReader();
+		executeCommand(monitorCommand, reader);
+		throwExceptionOnError(reader, monitorCommand);
+		return outputParser.parseJobStatus(jobId, reader.getStdOutputString());
 	}
 
 	@Override
 	public String getJobIdByJobName(String jobName, String userName) throws SSHApiException
{
-		return null;
+		RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName,
userName);
+		StandardOutReader reader = new StandardOutReader();
+		executeCommand(jobIdMonitorCommand, reader);
+		throwExceptionOnError(reader, jobIdMonitorCommand);
+		return outputParser.parseJobId(jobName, reader.getStdOutputString());
 	}
 
 	@Override
-	public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws
SSHApiException {
-
+	public void getJobStatuses(String userName, Map<String, JobStatus> jobStatusMap) throws
SSHApiException {
+		RawCommandInfo userBasedMonitorCommand = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+		StandardOutReader reader = new StandardOutReader();
+		executeCommand(userBasedMonitorCommand, reader);
+		throwExceptionOnError(reader, userBasedMonitorCommand);
+		outputParser.parseJobStatuses(userName, jobStatusMap, reader.getStdOutputString());
 	}
 
 	@Override
 	public List<String> listDirectory(String directoryPath) throws SSHApiException {
-		return null;
+		try {
+			if (!session.isConnected()) {
+				session.connect();
+			}
+			log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
+			return SSHUtils.listDirectory(directoryPath, session);
+		} catch (JSchException | IOException e) {
+			throw new SSHApiException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath);
+		}
 	}
 
 	@Override
 	public Session getSession() throws SSHApiException {
-		return null;
+		return session;
 	}
 
 	@Override
 	public void disconnect() throws SSHApiException {
+		session.disconnect();
+	}
 
+	/**
+	 * This method return <code>true</code> if there is an error in standard output.
If not return <code>false</code>
+	 * @param reader - command output reader
+	 * @param submitCommand - command which executed in remote machine.
+	 * @return command has return error or not.
+	 */
+	private void throwExceptionOnError(StandardOutReader reader, RawCommandInfo submitCommand)
throws SSHApiException{
+		String stdErrorString = reader.getStdErrorString();
+		String command = submitCommand.getCommand().substring(submitCommand.getCommand().lastIndexOf(File.separator)
+				+ 1);
+		if (stdErrorString == null) {
+			// noting to do
+		}else if ((stdErrorString.contains(command.trim()) && !stdErrorString.contains("Warning"))
|| stdErrorString
+				.contains("error")) {
+			log.error("Command {} , Standard Error output {}", command, stdErrorString);
+			throw new SSHApiException("Error running command " + command + "  on remote cluster. StandardError:
" +
+					stdErrorString);
+		}
 	}
 
 	private void executeCommand(CommandInfo commandInfo, CommandOutput commandOutput) throws
SSHApiException {
@@ -150,13 +270,16 @@ public class HPCRemoteCluster implements RemoteCluster{
 			channelExec.setCommand(command);
 		    channelExec.setInputStream(null);
 			channelExec.setErrStream(commandOutput.getStandardError());
+			log.info("Executing command {}", commandInfo.getCommand());
 			channelExec.connect();
 			commandOutput.onOutput(channelExec);
 		} catch (JSchException e) {
 			throw new SSHApiException("Unable to execute command - ", e);
 		}finally {
 			//Only disconnecting the channel, session can be reused
-			channelExec.disconnect();
+			if (channelExec != null) {
+				channelExec.disconnect();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/cb4b40cc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index c0b458c..eef6cf3 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -52,13 +52,10 @@ public class SSHUtils {
 	 *                   a dirctory we do copy it to that directory but we simply return the
directory name
 	 *                   todo handle the directory name as input and return the proper final
output file name
 	 * @param localFile  Local file to transfer, this can be a directory
-	 * @param session
 	 * @return returns the final remote file path, so that users can use the new file location
-	 * @throws IOException
-	 * @throws JSchException
-	 * @throws SSHApiException
 	 */
-	public static String scpTo(String remoteFile, String localFile, Session session) throws
IOException, JSchException, SSHApiException {
+	public static String scpTo(String localFile, String remoteFile, Session session) throws
IOException,
+			JSchException, SSHApiException {
 		FileInputStream fis = null;
 		String prefix = null;
 		if (new File(localFile).isDirectory()) {
@@ -119,7 +116,7 @@ public class SSHUtils {
 			throw new SSHApiException(error);
 		}
 
-		// send a content of lFile
+		// send a content of localFile
 		fis = new FileInputStream(localFile);
 		byte[] buf = new byte[1024];
 		while (true) {
@@ -155,10 +152,10 @@ public class SSHUtils {
 	 *
 	 * @param remoteFile remote file path, this has to be a full qualified path
 	 * @param localFile  This is the local file to copy, this can be a directory too
-	 * @param session
 	 * @return returns the final local file path of the new file came from the remote resource
 	 */
-	public static void scpFrom(String remoteFile, String localFile, Session session) throws
IOException, JSchException, SSHApiException {
+	public static void scpFrom(String remoteFile, String localFile, Session session) throws
IOException,
+			JSchException, SSHApiException {
 		FileOutputStream fos = null;
 		try {
 			String prefix = null;
@@ -269,11 +266,12 @@ public class SSHUtils {
 	 * This method will copy a remote file to a local directory
 	 *
 	 * @param remoteFileSource remote file path, this has to be a full qualified path
-	 * @param remoteFileTarget  This is the local file to copy, this can be a directory too
-	 * @param session JSch Session object
+	 * @param remoteFileTarget This is the local file to copy, this can be a directory too
+	 * @param session          JSch Session object
 	 * @return returns the final local file path of the new file came from the remote resource
 	 */
-	public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session
session) throws IOException, JSchException, SSHApiException {
+	public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session
session) throws
+			IOException, JSchException, SSHApiException {
 		FileOutputStream fos = null;
 		try {
 			String prefix = null;
@@ -384,7 +382,8 @@ public class SSHUtils {
 		channel.disconnect();
 	}
 
-	public static List<String> listDirectory(String path, Session session) throws IOException,
JSchException, SSHApiException {
+	public static List<String> listDirectory(String path, Session session) throws IOException,
JSchException,
+			SSHApiException {
 
 		// exec 'scp -t rfile' remotely
 		String command = "ls " + path;


Mime
View raw message