airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject airavata git commit: Fixed 3rd party file transfer , AIRAVATA-1851
Date Wed, 07 Oct 2015 15:51:16 GMT
Repository: airavata
Updated Branches:
  refs/heads/master bd6571428 -> 638d7f767


Fixed 3rd party file transfer , AIRAVATA-1851


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/638d7f76
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/638d7f76
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/638d7f76

Branch: refs/heads/master
Commit: 638d7f767c7648c581f2196bf045b32fa576520c
Parents: bd65714
Author: Shameera Rathnayaka <shameerainfo@gmail.com>
Authored: Wed Oct 7 11:51:05 2015 -0400
Committer: Shameera Rathnayaka <shameerainfo@gmail.com>
Committed: Wed Oct 7 11:51:05 2015 -0400

----------------------------------------------------------------------
 .../gfac/core/cluster/RemoteCluster.java        |  14 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      |   6 +-
 .../airavata/gfac/impl/HPCRemoteCluster.java    |  23 +-
 .../org/apache/airavata/gfac/impl/SSHUtils.java | 249 ++++++++++++-------
 .../impl/task/AdvancedSCPDataStageTask.java     |  61 ++---
 5 files changed, 211 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/638d7f76/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index ad407f9..59e7ff5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -64,10 +64,13 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	/**
 	 * This wil copy source remote file to target remote file.
 	 *
-	 * @param remoteFileSource remote file path, this has to be a full qualified path
-	 * @param remoteFileTarget This is the local file to copy, this can be a directory too
+	 * @param sourceFile remote file path, this has to be a full qualified path
+	 * @param destinationFile This is the local file to copy, this can be a directory too
+     * @param session jcraft session of other coner of thirdparty file transfer.
+     * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or
from the remote cluster(DIRECTION.OUT)
+	 *
 	 */
-	public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException;
+	public void scpThirdParty(String sourceFile, String destinationFile ,Session session , DIRECTION
inOrOut) throws SSHApiException;
 
 	/**
 	 * This will create directories in computing resources
@@ -148,4 +151,9 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with
suitable
 	 */
 	public ServerInfo getServerInfo();
 
+    enum DIRECTION {
+        TO,
+        FROM
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/638d7f76/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index b029180..834399e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -621,9 +621,9 @@ public class GFacEngineImpl implements GFacEngine {
             throw new TaskException("Error while constructing source file URI");
         }
         submodel.setSource(source.toString());
-        // TODO after thridpary scp implemented we can fix following destination location
correct one.
-		String localWorkingDir = processContext.getLocalWorkingDir();
-		submodel.setDestination("file://" + localWorkingDir);
+        // We don't know destination location at this time, data staging task will set this.
+        // because destination is required field we set dummy destination
+		submodel.setDestination("dummy://temp/file/location");
 		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
 		taskCtx.setTaskModel(taskModel);
         taskCtx.setProcessOutput(processOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/638d7f76/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index c6274cb..288c98c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -30,12 +30,7 @@ import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
-import org.apache.airavata.gfac.core.cluster.AbstractRemoteCluster;
-import org.apache.airavata.gfac.core.cluster.CommandInfo;
-import org.apache.airavata.gfac.core.cluster.CommandOutput;
-import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
-import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
+import org.apache.airavata.gfac.core.cluster.*;
 import org.apache.airavata.model.status.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,16 +158,20 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void scpThirdParty(String remoteFileSource, String remoteFileTarget) throws SSHApiException
{
+	public void scpThirdParty(String sourceFile, String destinationFile, Session clientSession,
DIRECTION direction) throws SSHApiException {
 		try {
 			if(!session.isConnected()){
 				session = getOpenSession();
 			}
-			log.info("Transferring from:" + remoteFileSource + " To: " + remoteFileTarget);
-			SSHUtils.scpThirdParty(remoteFileSource, remoteFileTarget, session);
-		} catch (IOException | JSchException e) {
-			throw new SSHApiException("Failed scp file:" + remoteFileSource + " to remote file "
-					+remoteFileTarget , e);
+			log.info("Transferring from:" + sourceFile + " To: " + destinationFile);
+            if (direction == DIRECTION.TO) {
+                SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile, session);
+            } else {
+                SSHUtils.scpThirdParty(sourceFile, session, destinationFile, clientSession);
+            }
+        } catch (IOException | JSchException e) {
+			throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file "
+					+destinationFile , e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/638d7f76/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 8a590e0..5a4ed39 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -263,94 +263,171 @@ public class SSHUtils {
 		}
 	}
 
-	/**
-	 * This method will copy a remote file to a local directory
-	 *
-	 * @param remoteFileSource remote file path, this has to be a full qualified path
-	 * @param remoteFileTarget This is the local file to copy, this can be a directory too
-	 * @param session          JSch Session object
-	 * @return returns the final local file path of the new file came from the remote resource
-	 */
-	public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session
session) throws
-			IOException, JSchException, SSHApiException {
-		FileOutputStream fos = null;
-		try {
-			String prefix = null;
-
-			// exec 'scp -f remotefile' remotely
-			String command = "scp -3 " + remoteFileSource + " " + remoteFileTarget;
-			Channel channel = session.openChannel("exec");
-			((ChannelExec) channel).setCommand(command);
-
-			StandardOutReader stdOutReader = new StandardOutReader();
-			((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
-			// get I/O streams for remote scp
-			OutputStream out = channel.getOutputStream();
-			InputStream in = channel.getInputStream();
-
-			channel.connect();
-
-			byte[] buf = new byte[1024];
-
-			// send '\0'
-			buf[0] = 0;
-			out.write(buf, 0, 1);
-			out.flush();
-
-			while (true) {
-				int c = checkAck(in);
-				if (c != 'C') {
-					break;
-				}
-
-				// read '0644 '
-				in.read(buf, 0, 5);
-
-				long filesize = 0L;
-				while (true) {
-					if (in.read(buf, 0, 1) < 0) {
-						// error
-						break;
-					}
-					if (buf[0] == ' ') break;
-					filesize = filesize * 10L + (long) (buf[0] - '0');
-				}
-				int foo;
-				while (true) {
-					if (buf.length < filesize) foo = buf.length;
-					else foo = (int) filesize;
-
-					int len = in.read(buf, 0, foo);
-					if (len <= 0) break;
-					out.write(buf, 0, len);
-				}
-				// send '\0'
-				buf[0] = 0;
-				out.write(buf, 0, 1);
-				out.flush();
-				if (checkAck(in) != 0) {
-					String error = "Error transfering the file content";
-					log.error(error);
-					throw new SSHApiException(error);
-				}
-
-			}
-			out.close();
+    /**
+     * This method will copy a remote file to a local directory
+     *
+     * @param sourceRemoteFile remote file path, this has to be a full qualified path
+     * @param sourceSession JSch session for source
+     * @param targetRemoteFile This is the local file to copy, this can be a directory too
+     * @param targetSession JSch Session for target
+     * @return returns the final local file path of the new file came from the remote resource
+     */
+    public static void scpThirdParty(String sourceRemoteFile, Session sourceSession, String
targetRemoteFile, Session targetSession) throws
+            IOException, JSchException {
+        OutputStream sout = null;
+        InputStream sin = null;
+        OutputStream tout = null;
+        InputStream tin = null;
+        try {
+            String prefix = null;
+
+            // exec 'scp -f sourceFile'
+            String sourceCommand = "scp -f " + sourceRemoteFile;
+            Channel sourceChannel = sourceSession.openChannel("exec");
+            ((ChannelExec) sourceChannel).setCommand(sourceCommand);
+            StandardOutReader sourceStdOutReader = new StandardOutReader();
+            ((ChannelExec) sourceChannel).setErrStream(sourceStdOutReader.getStandardError());
+            // get I/O streams for remote scp
+            sout = sourceChannel.getOutputStream();
+            sin = sourceChannel.getInputStream();
+            sourceChannel.connect();
+
+
+            boolean ptimestamp = true;
+            // exec 'scp -t rfile' remotely
+            String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + targetRemoteFile;
+            Channel targetChannel = targetSession.openChannel("exec");
+            StandardOutReader targetStdOutReader = new StandardOutReader();
+            ((ChannelExec) targetChannel).setErrStream(targetStdOutReader.getStandardError());
+            ((ChannelExec) targetChannel).setCommand(command);
+            // get I/O streams for remote scp
+            tout = targetChannel.getOutputStream();
+            tin = targetChannel.getInputStream();
+            targetChannel.connect();
+
+            if (checkAck(tin) != 0) {
+                String error = "Error Reading input Stream";
+                log.error(error);
+                throw new Exception(error);
+            }
 
-			stdOutReader.onOutput(channel);
-			if (stdOutReader.getStdErrorString().contains("scp:")) {
-				throw new SSHApiException(stdOutReader.getStdErrorString());
-			}
 
-		} catch (Exception e) {
-			log.error(e.getMessage(), e);
-		} finally {
-			try {
-				if (fos != null) fos.close();
-			} catch (Exception ee) {
-			}
-		}
-	}
+            byte[] buf = new byte[1024];
+
+            // send '\0'
+            buf[0] = 0;
+            sout.write(buf, 0, 1);
+            sout.flush();
+
+            while (true) {
+                int c = checkAck(sin);
+                if (c != 'C') {
+                    break;
+                }
+
+                // read '0644 '
+                sin.read(buf, 0, 5);
+
+                long filesize = 0L;
+                while (true) {
+                    if (sin.read(buf, 0, 1) < 0) {
+                        // error
+                        break;
+                    }
+                    if (buf[0] == ' ') break;
+                    filesize = filesize * 10L + (long) (buf[0] - '0');
+                }
+                String initData = "C0644 " + filesize + " " +
+                        sourceRemoteFile.substring(sourceRemoteFile.lastIndexOf('/') + 1)
+ "\n";
+                tout.write(initData.getBytes());
+                tout.flush();
+
+                String file = null;
+                for (int i = 0; ; i++) {
+                    sin.read(buf, i, 1);
+                    if (buf[i] == (byte) 0x0a) {
+                        file = new String(buf, 0, i);
+                        break;
+                    }
+                }
+
+                //System.out.println("filesize="+filesize+", file="+file);
+
+                // send '\0'
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+
+                // read a content of lfile
+//                fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+                int foo;
+                while (true) {
+                    if (buf.length < filesize) foo = buf.length;
+                    else foo = (int) filesize;
+                    foo = sin.read(buf, 0, foo);
+                    if (foo < 0) {
+                        // error
+                        break;
+                    }
+                    tout.write(buf, 0, foo);
+                    filesize -= foo;
+                    if (filesize == 0L) break;
+                }
+
+                // send '\0' to target
+                buf[0] = 0;
+                tout.write(buf, 0, 1);
+                tout.flush();
+                if (checkAck(tin) != 0) {
+                    String error = "Error Reading input Stream";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+                tout.close();
+                tout = null;
+
+                if (checkAck(sin) != 0) {
+                    String error = "Error transfering the file content";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+
+                // send '\0'
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+            }
+//            stdOutReader.onOutput(channel);
+//            if (stdOutReader.getStdErrorString().contains("scp:")) {
+//                throw new SSHApiException(stdOutReader.getStdErrorString());
+//            }
+
+        } catch (Exception e) {
+//            log.error(e.getMessage(), e);
+            System.out.println(e.getMessage());
+        } finally {
+            try {
+                if (tout != null) tout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (tin != null) tin.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (sout != null) sout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (tin != null) tin.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+        }
+    }
 
 	public static void makeDirectory(String path, Session session) throws IOException, JSchException,
SSHApiException {
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/638d7f76/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index 269ce10..e200546 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -38,6 +38,7 @@ import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
 import org.apache.airavata.gfac.core.cluster.CommandInfo;
 import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
@@ -110,7 +111,6 @@ public class AdvancedSCPDataStageTask implements Task {
                     } else {
                         status.setReason("File name is null");
                     }
-                    localDataDir = subTaskModel.getDestination();
                     return status;
                 }
             } else if (processState == ProcessState.INPUT_DATA_STAGING) {
@@ -133,20 +133,14 @@ public class AdvancedSCPDataStageTask implements Task {
                         "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases.
found " + processState.name());
             }
 
-            // use cp instead of scp if source and destination host and user name is same.
+            // use rsync instead of scp if source and destination host and user name is same.
             URI sourceURI = new URI(subTaskModel.getSource());
             String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator)
+ 1,
                     sourceURI.getPath().length());
-            String targetPath = null;
-            String targetFilePath = null;
             URI destinationURI = null;
-            if (localDataDir != null) {
-                targetPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath
+ File.separator) +
-                        taskContext.getParentProcessContext().getProcessId();
-                targetFilePath = targetPath + File.separator + fileName;
-                destinationURI = new URI("SCP", hostName, targetFilePath, null);
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = getDestinationURI(taskContext, fileName);
                 subTaskModel.setDestination(destinationURI.toString());
-
             } else {
                 destinationURI = new URI(subTaskModel.getDestination());
             }
@@ -185,25 +179,16 @@ public class AdvancedSCPDataStageTask implements Task {
             }
             status = new TaskStatus(TaskState.COMPLETED);
 
-
-            File templocalDataDir = GFacUtils.getLocalDataDir(taskContext);
-            if (!templocalDataDir.exists()) {
-                if (!templocalDataDir.mkdirs()) {
-                    // failed to create temp output location
-                }
-            }
-
-            String filePath = templocalDataDir + File.separator + fileName;
-
             ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
             Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
             if (processState == ProcessState.INPUT_DATA_STAGING) {
-                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
                 status.setReason("Successfully staged input data");
             } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
                 SSHUtils.makeDirectory(targetPath, sshSession);
                 // TODO - save updated subtask model with new destination
-                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI, filePath);
+                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
                 status.setReason("Successfully staged output data");
             }
         } catch (TException e) {
@@ -283,29 +268,22 @@ public class AdvancedSCPDataStageTask implements Task {
     }
 
     private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI
-            destinationURI, String filePath) throws SSHApiException, IOException, JSchException
{
-        /**
-         * scp remote client file to airavata local dir.
-         */
-        SSHUtils.scpFrom(sourceURI.getPath(), filePath, sshSession);
-
+            destinationURI) throws SSHApiException, IOException, JSchException {
         /**
-         * scp local file to compute resource.
+         * scp third party file transfer 'to' compute resource.
          */
-        taskContext.getParentProcessContext().getRemoteCluster().scpTo(filePath, destinationURI.getPath());
+        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO);
     }
 
-    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI destinationURI,
-                                   String filePath) throws SSHApiException, AiravataException,
IOException, JSchException, GFacException {
-        /**
-         * scp remote file from comute resource to airavata local
-         */
-        taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
filePath);
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI,
URI destinationURI)
+            throws SSHApiException, AiravataException, IOException, JSchException, GFacException
{
 
         /**
-         * scp local file to remote client
+         * scp third party file transfer 'from' comute resource.
          */
-        SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession);
+        taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.getPath());
         GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(),
destinationURI.getPath());
@@ -355,4 +333,11 @@ public class AdvancedSCPDataStageTask implements Task {
         }
         return temp.getAbsolutePath();
     }
+
+    public URI getDestinationURI(TaskContext taskContext, String fileName) throws URISyntaxException
{
+        String filePath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator)
+
+                taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        return new URI("SCP", hostName, filePath, null);
+
+    }
 }


Mime
View raw message