airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/3] airavata git commit: Reverted scp refactoring
Date Wed, 02 Nov 2016 21:30:01 GMT
Repository: airavata
Updated Branches:
  refs/heads/develop db027a53e -> fe6ebe9c0


Reverted scp refactoring


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dbcfd77b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dbcfd77b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dbcfd77b

Branch: refs/heads/develop
Commit: dbcfd77be3abd26f97a185f65649cd3ea1ac46c0
Parents: 4a92da4
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Wed Nov 2 17:24:27 2016 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Wed Nov 2 17:24:27 2016 -0400

----------------------------------------------------------------------
 .../gfac/core/cluster/RemoteCluster.java        |   6 +-
 .../airavata/gfac/impl/BESRemoteCluster.java    |   6 +-
 .../airavata/gfac/impl/HPCRemoteCluster.java    |  19 +-
 .../airavata/gfac/impl/LocalRemoteCluster.java  |   9 +-
 .../org/apache/airavata/gfac/impl/SSHUtils.java | 198 +++++++++----------
 .../airavata/gfac/impl/task/ArchiveTask.java    |   9 +-
 .../gfac/impl/task/BESJobSubmissionTask.java    |   5 +-
 .../airavata/gfac/impl/task/DataStageTask.java  |  13 +-
 .../gfac/impl/task/EnvironmentSetupTask.java    |   4 +-
 .../gfac/impl/task/SCPDataStageTask.java        |  44 +++--
 .../gfac/impl/task/utils/StreamData.java        |  43 ++--
 11 files changed, 178 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index f59a9e3..5f8d0ec 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	 * @param remoteFile remote file location, this can be a directory too
 	 * @throws SSHApiException throws exception during error
 	 */
-	void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionSessionConsumer)
throws SSHApiException;
+	void copyTo(String localFile, String remoteFile) throws SSHApiException;
 
 	/**
 	 * This will copy a remote file in path rFile to local file lFile
@@ -60,7 +60,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	 * @param remoteFile      remote file path, this has to be a full qualified path
 	 * @param localFile This is the local file to copy, this can be a directory too
 	 */
-	void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionSessionConsumer)
throws SSHApiException;
+	void copyFrom(String remoteFile, String localFile) throws SSHApiException;
 
 	/**
 	 * This wil copy source remote file to target remote file.
@@ -77,7 +77,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	 * @param directoryPath the full qualified path for the directory user wants to create
 	 * @throws SSHApiException throws during error
 	 */
-	void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer)
throws SSHApiException;
+	void makeDirectory(String directoryPath) throws SSHApiException;
 
 	/**
 	 * This will delete the given job from the queue

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
index 828a34e..0f517b5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -43,12 +43,12 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
     }
 
     @Override
-    public void copyTo(String localFile, String remoteFile, SessionConsumer<Session>
sessionConsumer) throws SSHApiException {
+    public void copyTo(String localFile, String remoteFile) throws SSHApiException {
 
     }
 
     @Override
-    public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session>
sessionConsumer) throws SSHApiException {
+    public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
 
     }
 
@@ -57,7 +57,7 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
     }
 
     @Override
-    public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer)
throws SSHApiException {
+    public void makeDirectory(String directoryPath) throws SSHApiException {
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index b322cef..c3566b8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -86,8 +86,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	@Override
 	public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory)
throws SSHApiException {
 		JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-		copyTo(jobScriptFilePath, workingDirectory,
-				session -> SSHUtils.scpTo(jobScriptFilePath, workingDirectory, session)); // scp script
file to working directory
+		copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
 		RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory,
jobScriptFilePath);
 		submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
 		StandardOutReader reader = new StandardOutReader();
@@ -113,13 +112,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer)
throws SSHApiException {
+	public void copyTo(String localFile, String remoteFile) throws SSHApiException {
 		int retry = 3;
 		while (retry > 0) {
 			try {
 				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Transferring localhost:" + localFile  + " to " + serverInfo.getHost() + ":"
+ remoteFile);
-                sessionConsumer.consume(session);
+				SSHUtils.scpTo(localFile, remoteFile, session);
 				retry = 0;
 			} catch (Exception e) {
 				retry--;
@@ -140,13 +139,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session>
sessionConsumer) throws SSHApiException {
+	public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
 		int retry = 3;
 		while(retry>0) {
 			try {
 				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:"
+ localFile);
-				sessionConsumer.consume(session);
+				SSHUtils.scpFrom(remoteFile, localFile, session);
 				retry=0;
 			} catch (Exception e) {
 				retry--;
@@ -191,7 +190,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer)
throws SSHApiException {
+	public void makeDirectory(String directoryPath) throws SSHApiException {
 		int retryCount = 0;
 		try {
 			while (retryCount < MAX_RETRY_COUNT) {
@@ -199,9 +198,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 				session = Factory.getSSHSession(authenticationInfo, serverInfo);
 				log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
 				try {
-					sessionConsumer.consume(session);
+					SSHUtils.makeDirectory(directoryPath, session);
 					break;  // Exit while loop
-				} catch (DataStagingException e) {
+				} catch (JSchException e) {
 					if (retryCount == MAX_RETRY_COUNT) {
 						log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for creating directory: "
 								+ serverInfo.getHost() + ":" + directoryPath, e);
@@ -211,7 +210,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 					log.error("Issue with jsch, Retry creating directory: " + serverInfo.getHost() + ":"
+ directoryPath);
 				}
 			}
-		} catch (AiravataException | DataStagingException e) {
+		} catch (JSchException | AiravataException | IOException e) {
 			throw new SSHApiException("Failed to create directory " + serverInfo.getHost() + ":" +
directoryPath, e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
index aa79a0c..d5422d2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -60,8 +60,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
     public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory)
throws SSHApiException {
         try {
             JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-            String remoteFile = workingDirectory + File.separator + new File(jobScriptFilePath).getName();
-            copyTo(jobScriptFilePath, remoteFile, session -> SSHUtils.scpTo(jobScriptFilePath,
remoteFile, session)); // scp script file to working directory
+            copyTo(jobScriptFilePath, workingDirectory + File.separator + new File(jobScriptFilePath).getName());
// scp script file to working directory
             RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory,
jobScriptFilePath);
             submitCommand.setRawCommand(submitCommand.getRawCommand());
             LocalCommandOutput localCommandOutput = new LocalCommandOutput();
@@ -77,7 +76,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
     }
 
     @Override
-    public void copyTo(String localFile, String remoteFile, SessionConsumer<Session>
sessionConsumer) throws SSHApiException {
+    public void copyTo(String localFile, String remoteFile) throws SSHApiException {
         Path sourcePath = Paths.get(localFile);
         Path targetPath = Paths.get(remoteFile);
         try {
@@ -89,7 +88,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
     }
 
     @Override
-    public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session>
sessionConsumer) throws SSHApiException {
+    public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
         Path sourcePath = Paths.get(remoteFile);
         Path targetPath = Paths.get(localFile);
         try {
@@ -126,7 +125,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
     }
 
     @Override
-    public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer)
throws SSHApiException {
+    public void makeDirectory(String directoryPath) throws SSHApiException {
         Path dirPath = Paths.get(directoryPath);
         Set<PosixFilePermission> perms = new HashSet<>();
         // add permission as rwxr--r-- 744

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 07e1799..cd5651e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -24,7 +24,6 @@ import com.jcraft.jsch.Channel;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,100 +53,97 @@ public class SSHUtils {
 	 * @param localFile  Local file to transfer, this can be a directory
 	 * @return returns the final remote file path, so that users can use the new file location
 	 */
-	public static String scpTo(String localFile, String remoteFile, Session session) throws
DataStagingException {
+	public static String scpTo(String localFile, String remoteFile, Session session) throws
IOException,
+			JSchException, SSHApiException {
 		FileInputStream fis = null;
 		String prefix = null;
 		if (new File(localFile).isDirectory()) {
 			prefix = localFile + File.separator;
 		}
 		boolean ptimestamp = true;
-		Channel channel = null;
+
 		// exec 'scp -t rfile' remotely
 		String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
-		try {
-			channel = session.openChannel("exec");
+		Channel channel = session.openChannel("exec");
 
-			StandardOutReader stdOutReader = new StandardOutReader();
-			((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
-			((ChannelExec) channel).setCommand(command);
+		StandardOutReader stdOutReader = new StandardOutReader();
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		((ChannelExec) channel).setCommand(command);
 
-			// get I/O streams for remote scp
-			try (OutputStream out = channel.getOutputStream();
-				 InputStream in = channel.getInputStream()) {
+		// get I/O streams for remote scp
+		OutputStream out = channel.getOutputStream();
+		InputStream in = channel.getInputStream();
 
-				channel.connect();
-				if (checkAck(in) != 0) {
-					String error = "Error Reading input Stream";
-					log.error(error);
-					throw new DataStagingException(error);
-				}
+		channel.connect();
 
-				File _lfile = new File(localFile);
-
-				if (ptimestamp) {
-					command = "T" + (_lfile.lastModified() / 1000) + " 0";
-					// The access time should be sent here,
-					// but it is not accessible with JavaAPI ;-<
-					command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
-					out.write(command.getBytes());
-					out.flush();
-					if (checkAck(in) != 0) {
-						String error = "Error Reading input Stream";
-						log.error(error);
-						throw new DataStagingException(error);
-					}
-				}
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
 
-				// send "C0644 filesize filename", where filename should not include '/'
-				long filesize = _lfile.length();
-				command = "C0644 " + filesize + " ";
-				if (localFile.lastIndexOf('/') > 0) {
-					command += localFile.substring(localFile.lastIndexOf('/') + 1);
-				} else {
-					command += localFile;
-				}
-				command += "\n";
-				out.write(command.getBytes());
-				out.flush();
-				if (checkAck(in) != 0) {
-					String error = "Error Reading input Stream";
-					log.error(error);
-					throw new DataStagingException(error);
-				}
+		File _lfile = new File(localFile);
 
-				// send a content of localFile
-				fis = new FileInputStream(localFile);
-				byte[] buf = new byte[1024];
-				while (true) {
-					int len = fis.read(buf, 0, buf.length);
-					if (len <= 0) break;
-					out.write(buf, 0, len); //out.flush();
-				}
-				fis.close();
-				fis = null;
-				// send '\0'
-				buf[0] = 0;
-				out.write(buf, 0, 1);
-				out.flush();
-				if (checkAck(in) != 0) {
-					String error = "Error Reading input Stream";
-					log.error(error);
-					throw new DataStagingException(error);
-				}
-			}
-			stdOutReader.onOutput(channel);
-			if (stdOutReader.getStdErrorString().contains("scp:")) {
-				throw new DataStagingException(stdOutReader.getStdErrorString());
-			}
-			//since remote file is always a file  we just return the file
-			return remoteFile;
-		} catch (IOException | JSchException e) {
-			throw new DataStagingException(e);
-		} finally {
-			if (channel != null && channel.isConnected()) {
-				channel.disconnect();
+		if (ptimestamp) {
+			command = "T" + (_lfile.lastModified() / 1000) + " 0";
+			// The access time should be sent here,
+			// but it is not accessible with JavaAPI ;-<
+			command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+			out.write(command.getBytes());
+			out.flush();
+			if (checkAck(in) != 0) {
+				String error = "Error Reading input Stream";
+				log.error(error);
+				throw new SSHApiException(error);
 			}
 		}
+
+		// send "C0644 filesize filename", where filename should not include '/'
+		long filesize = _lfile.length();
+		command = "C0644 " + filesize + " ";
+		if (localFile.lastIndexOf('/') > 0) {
+			command += localFile.substring(localFile.lastIndexOf('/') + 1);
+		} else {
+			command += localFile;
+		}
+		command += "\n";
+		out.write(command.getBytes());
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+
+		// send a content of localFile
+		fis = new FileInputStream(localFile);
+		byte[] buf = new byte[1024];
+		while (true) {
+			int len = fis.read(buf, 0, buf.length);
+			if (len <= 0) break;
+			out.write(buf, 0, len); //out.flush();
+		}
+		fis.close();
+		fis = null;
+		// send '\0'
+		buf[0] = 0;
+		out.write(buf, 0, 1);
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+		out.close();
+		stdOutReader.onOutput(channel);
+
+
+		channel.disconnect();
+		if (stdOutReader.getStdErrorString().contains("scp:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
+		}
+		//since remote file is always a file  we just return the file
+		return remoteFile;
 	}
 
 	/**
@@ -157,7 +153,8 @@ public class SSHUtils {
 	 * @param localFile  This is the local file to copy, this can be a directory too
 	 * @return returns the final local file path of the new file came from the remote resource
 	 */
-	public static void scpFrom(String remoteFile, String localFile, Session session) throws
DataStagingException {
+	public static void scpFrom(String remoteFile, String localFile, Session session) throws
IOException,
+			JSchException, SSHApiException {
 		FileOutputStream fos = null;
 		try {
 			String prefix = null;
@@ -258,7 +255,6 @@ public class SSHUtils {
 
 		} catch (Exception e) {
 			log.error(e.getMessage(), e);
-			throw new DataStagingException(e);
 		} finally {
 			try {
 				if (fos != null) fos.close();
@@ -276,11 +272,8 @@ public class SSHUtils {
      * @param destinationSession JSch Session for target
      * @return returns the final local file path of the new file came from the remote resource
      */
-    public static void scpThirdParty(String sourceFile,
-									 Session sourceSession,
-									 String destinationFile,
-									 Session destinationSession,
-									 boolean ignoreEmptyFile) throws DataStagingException {
+    public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile,
Session destinationSession, boolean ignoreEmptyFile) throws
+            IOException, JSchException {
         OutputStream sout = null;
         InputStream sin = null;
         OutputStream dout = null;
@@ -408,7 +401,7 @@ public class SSHUtils {
 
         } catch (Exception e) {
             log.error(e.getMessage(), e);
-            throw new DataStagingException(e.getMessage());
+            throw new JSchException(e.getMessage());
         } finally {
             try {
                 if (dout != null) dout.close();
@@ -433,32 +426,35 @@ public class SSHUtils {
         }
     }
 
-	public static void makeDirectory(String path, Session session) throws DataStagingException
{
+	public static void makeDirectory(String path, Session session) throws IOException, JSchException,
SSHApiException {
 
-		Channel channel = null;
+		// exec 'scp -t rfile' remotely
 		String command = "mkdir -p " + path;
+		Channel channel = session.openChannel("exec");
+		StandardOutReader stdOutReader = new StandardOutReader();
+
+		((ChannelExec) channel).setCommand(command);
+
+
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
 		try {
-			// exec 'scp -t rfile' remotely
-			channel = session.openChannel("exec");
-			StandardOutReader stdOutReader = new StandardOutReader();
 			channel.connect();
-			stdOutReader.onOutput(channel);
-			if (stdOutReader.getStdErrorString().contains("mkdir:")) {
-				throw new DataStagingException(stdOutReader.getStdErrorString());
-			}
-
 		} catch (JSchException e) {
 
+			channel.disconnect();
 //            session.disconnect();
 			log.error("Unable to retrieve command output. Command - " + command +
 					" on server - " + session.getHost() + ":" + session.getPort() +
 					" connecting user name - "
 					+ session.getUserName());
-			throw new DataStagingException(e);
-		}finally {
-			if(channel != null && channel.isConnected())
-				channel.disconnect();
+			throw e;
+		}
+		stdOutReader.onOutput(channel);
+		if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
 		}
+
+		channel.disconnect();
 	}
 
 	public static List<String> listDirectory(String path, Session session) throws IOException,
JSchException,

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index 85d3fd2..88661f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -138,8 +138,13 @@ public class ArchiveTask implements Task {
             // move tar to storage resource
             remoteCluster.execute(commandInfo);
             URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath,
archiveTar);
-            remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath, destinationURI.getPath(),
-                    session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(),
sshSession, true));
+            remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath ,destinationURI.getPath(),
session -> {
+                try {
+                    SSHUtils.scpThirdParty(sourceURI.getPath(),session, destinationURI.getPath(),
sshSession, true);
+                } catch (Exception e) {
+                    throw new DataStagingException("Error while transferring " + sourceURI.getPath()
+ " to " + destinationURI.getPath());
+                }
+            });
 
             // delete tar in remote computer resource
             commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index 1811286..990b9ea 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -31,7 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
 import eu.unicore.util.httpclient.DefaultClientConfiguration;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -274,7 +273,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                         break;
                 }
             }
-        } catch (DataStagingException | AiravataException | URISyntaxException e) {
+        } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException
e) {
             log.error("Error while coping local file " + localFilePath + " to remote " +
remoteFilePath, e);
             throw new GFacException("Error while scp output files to remote storage file
location", e);
         }
@@ -318,7 +317,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                     input.setValue("file:/" + localFilePath);
                 }
             }
-        } catch ( AiravataException | DataStagingException| URISyntaxException e) {
+        } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException
e) {
             log.error("Error while coping remote file " + remoteFilePath + " to local " +
localFilePath, e);
             throw new GFacException("Error while scp input files to local file location",
e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
index a171f7f..8c6a125 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -24,7 +24,6 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
@@ -66,18 +65,14 @@ public class DataStageTask implements Task {
 					/**
 					 * copy local file to compute resource.
 					 */
-					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(
-							sourceURI.getPath(),
-							destinationURI.getPath(),
-							session -> SSHUtils.scpTo(sourceURI.getPath(), destinationURI.getPath(), session));
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(),
destinationURI
+							.getPath());
 				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
 					/**
 					 * copy remote file from compute resource.
 					 */
-					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(
-							sourceURI.getPath(),
-							destinationURI.getPath(),
-							session -> SSHUtils.scpFrom(sourceURI.getPath(), destinationURI.getPath(), session));
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(),
destinationURI
+							.getPath());
 				}
 				status.setReason("Successfully staged data");
 			} catch (SSHApiException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
index bbe77d8..7de0282 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -25,7 +25,6 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
@@ -49,8 +48,7 @@ public class EnvironmentSetupTask implements Task {
 		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
 		try {
 			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
-			String workingDir = taskContext.getParentProcessContext().getWorkingDir();
-			remoteCluster.makeDirectory(workingDir, session -> SSHUtils.makeDirectory(workingDir,
session));
+			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
 			status.setReason("Successfully created environment");
 		} catch (SSHApiException e) {
 			String msg = "Error while environment setup";

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 4189f81..2788535 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -182,7 +182,7 @@ public class SCPDataStageTask implements Task {
             errorModel.setUserFriendlyMessage(msg);
             taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
             return status;
-        } catch (ApplicationSettingsException e) {
+        } catch (ApplicationSettingsException | FileNotFoundException e) {
             String msg = "Failed while reading credentials";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -219,7 +219,7 @@ public class SCPDataStageTask implements Task {
             errorModel.setActualErrorMessage(e.getMessage());
             errorModel.setUserFriendlyMessage(msg);
             taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
-        } catch (DataStagingException e) {
+        } catch (JSchException | IOException e) {
             String msg = "Failed to do scp with client";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -249,35 +249,39 @@ public class SCPDataStageTask implements Task {
     }
 
     private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI
-            destinationURI) throws SSHApiException {
+            destinationURI) throws SSHApiException, IOException, JSchException {
         /**
          * scp third party file transfer 'to' compute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(),
session, false));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session
-> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(),
session, false);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file staging, from "
+ sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
     }
 
     private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI destinationURI)
-            throws SSHApiException, AiravataException, GFacException {
+            throws SSHApiException, AiravataException, IOException, JSchException, GFacException
{
 
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(),
sshSession, true));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session
-> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(),
sshSession, true);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file staging, from "
+ sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
         // update output locations
-        GFacUtils.saveExperimentOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.toString());
-        GFacUtils.saveProcessOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.toString());
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.toString());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.toString());
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index f5fd14d..fccce0d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -21,8 +21,10 @@
 
 package org.apache.airavata.gfac.impl.task.utils;
 
+import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
 import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -30,6 +32,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.cluster.CommandInfo;
 import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.impl.Factory;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.TimerTask;
@@ -75,23 +79,25 @@ public class StreamData extends TimerTask  {
             }
 
             // output staging should end when the job is complete
-            if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE)
-                    || jobStatus.getJobState().equals(JobState.CANCELED)
-                    || jobStatus.getJobState().equals(JobState.FAILED)){
+            if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE)
|| jobStatus.getJobState().equals(JobState.CANCELED) || jobStatus.getJobState().equals(JobState.FAILED)){
                 this.cancel();
             }
         } catch (URISyntaxException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous
path specified",
                     taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
-        } catch (AiravataException | SSHApiException e) {
+        } catch (IllegalAccessException | InstantiationException | AiravataException | IOException
| JSchException | SSHApiException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error
occurred while streaming data",
                     taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
+        } catch (CredentialStoreException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error
occurred while connecting with credential store",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
         }
     }
 
-    public void runOutputStaging() throws URISyntaxException, AiravataException,  SSHApiException
{
+    public void runOutputStaging() throws URISyntaxException, IllegalAccessException, InstantiationException,
CredentialStoreException, AiravataException, IOException, JSchException, SSHApiException {
         try {
 
             AuthenticationInfo authenticationInfo = null;
@@ -117,7 +123,7 @@ public class StreamData extends TimerTask  {
             String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
             SSHUtils.makeDirectory(targetPath, sshSession);
             outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
-        } catch (DataStagingException | GFacException e) {
+        } catch (GFacException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error
while output staging",
                     taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
@@ -141,24 +147,23 @@ public class StreamData extends TimerTask  {
     }
 
     private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI destinationURI)
-            throws SSHApiException, GFacException {
+            throws SSHApiException, AiravataException, IOException, JSchException, GFacException
{
 
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(),
sshSession, true));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session
-> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(),
sshSession, true);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file staging, from "
+ sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
         // update output locations
-        GFacUtils.saveExperimentOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.getPath());
-        GFacUtils.saveProcessOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.getPath());
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.getPath());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.getPath());
 
     }
 


Mime
View raw message