Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 46A92200CAD for ; Wed, 28 Jun 2017 21:40:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 45068160BF7; Wed, 28 Jun 2017 19:40:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A69D160BD9 for ; Wed, 28 Jun 2017 21:39:58 +0200 (CEST) Received: (qmail 85841 invoked by uid 500); 28 Jun 2017 19:39:57 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 85832 invoked by uid 99); 28 Jun 2017 19:39:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 19:39:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 76D11DFA28; Wed, 28 Jun 2017 19:39:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: goshenoy@apache.org To: commits@airavata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: airavata-sandbox git commit: Add SSH Job Workflow Date: Wed, 28 Jun 2017 19:39:57 +0000 (UTC) archived-at: Wed, 28 Jun 2017 19:40:00 -0000 Repository: airavata-sandbox Updated Branches: refs/heads/master 8b284d8f8 -> 2e56d5ce3 Add SSH Job Workflow Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/2e56d5ce Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/2e56d5ce Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/2e56d5ce Branch: refs/heads/master Commit: 2e56d5ce3a30c8f546dc70d80879a86680e0c196 Parents: 8b284d8 Author: Gourav Shenoy Authored: Wed Jun 28 15:39:52 2017 -0400 Committer: Gourav Shenoy Committed: Wed Jun 28 15:39:52 2017 -0400 ---------------------------------------------------------------------- .gitignore | 1 + helix-playground/pom.xml | 9 +- .../iu/helix/airavata/HelixClusterManager.java | 18 +- .../java/edu/iu/helix/airavata/HelixUtil.java | 222 +++++++++-- .../edu/iu/helix/airavata/ParticipantNode.java | 7 + .../java/edu/iu/helix/airavata/ZkUtils.java | 72 ++++ .../airavata/tasks/ssh/Authentication.java | 4 +- .../airavata/tasks/ssh/SSHApiException.java | 35 ++ .../tasks/ssh/SSHCommandOutputReader.java | 90 +++++ .../tasks/ssh/SSHKeyAuthentication.java | 2 +- .../iu/helix/airavata/tasks/ssh/SSHRunner.java | 394 +++++++++++++++++++ .../iu/helix/airavata/tasks/ssh/SSHTask.java | 38 +- .../airavata/tasks/ssh/SSHTaskContext.java | 17 +- .../helix/airavata/tasks/ssh/SSHUserInfo.java | 4 +- .../iu/helix/airavata/tasks/ssh/ServerInfo.java | 4 +- .../src/main/resources/ssh/id_rsa.pub | 1 - 16 files changed, 865 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 4a9b5e3..d6e588b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.class +*id_rsa* # Mobile Tools for Java (J2ME) .mtj.tmp/ http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/pom.xml ---------------------------------------------------------------------- diff --git a/helix-playground/pom.xml b/helix-playground/pom.xml index 0b624c6..809c712 100644 --- a/helix-playground/pom.xml +++ b/helix-playground/pom.xml @@ -19,8 +19,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + 1.8 + 1.8 @@ -54,6 +54,11 @@ jsch 0.1.53 + + org.apache.curator + curator-framework + 2.8.0 + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java index 313a674..927b377 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java @@ -102,10 +102,10 @@ public class HelixClusterManager { } public boolean submitDag(HelixUtil.DAGType dagType) { - Workflow workflow = HelixUtil.getWorkflow(dagType); - taskDriver.start(workflow); - System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName); try { + Workflow workflow = HelixUtil.getWorkflow(dagType); + taskDriver.start(workflow); + System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName); taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED); // while (true) { // Thread.sleep(100); @@ -129,8 +129,8 @@ public class HelixClusterManager { } public static void main(String[] args) { - String clusterName = "HelixDemoCluster"; - String zkAddress = "localhost:2199"; + String clusterName = HelixUtil.CLUSTER_NAME; + String zkAddress = HelixUtil.ZK_ADDRESS; int numWorkers = 3; int numPartitions = 1; @@ -142,11 +142,11 @@ public class HelixClusterManager { HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions); manager.startHelixCluster(); - System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.TYPE_A); - if (manager.submitDag(HelixUtil.DAGType.TYPE_A)) { - System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.TYPE_A); + System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.SSH); + if (manager.submitDag(HelixUtil.DAGType.SSH)) { + System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.SSH); } else { - throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.TYPE_A); + throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.SSH); } } catch (Exception ex) { logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex); http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java index 2cea68f..b2b634d 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java @@ -4,11 +4,19 @@ import edu.iu.helix.airavata.tasks.HelixTaskA; import edu.iu.helix.airavata.tasks.HelixTaskB; import edu.iu.helix.airavata.tasks.HelixTaskC; import edu.iu.helix.airavata.tasks.HelixTaskD; +import edu.iu.helix.airavata.tasks.ssh.SSHKeyAuthentication; +import edu.iu.helix.airavata.tasks.ssh.SSHServerInfo; +import edu.iu.helix.airavata.tasks.ssh.SSHTask; +import edu.iu.helix.airavata.tasks.ssh.SSHTaskContext; +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.helix.task.*; import org.jboss.netty.util.internal.ThreadLocalRandom; +import java.io.*; import java.util.ArrayList; import java.util.List; +import java.util.UUID; /** * Created by goshenoy on 6/21/17. @@ -16,62 +24,121 @@ import java.util.List; public class HelixUtil { public static final String TASK_STATE_DEF = "Task"; + public static final String ZK_ADDRESS = "localhost:2199"; + public static final String CLUSTER_NAME = "HelixDemoCluster"; + + public static final String SSH_WORKFLOW = "SSH_Workflow"; + public static final String CREATE_DIR_TASK = "Task_CreateDir"; + public static final String COPY_PBS_TASK = "Task_CopyPBS"; + public static final String COPY_PY_TASK = "Task_CopyPY"; + public static final String QSUB_TASK = "Task_QSUB"; + + + public static final String USERNAME = "username"; + public static final String PRIVATE_KEY = "private_key"; + public static final String PUBLIC_KEY = "public_key"; + public static final String HOSTNAME = "hostname"; + public static final String PORT = "port"; + public static final String COMMAND = "command"; + public static final String SRC_PATH = "src_path"; + public static final String DEST_PATH = "dest_path"; + public enum DAGType { TYPE_A, TYPE_B, - TYPE_C + TYPE_C, + SSH } - private static Workflow.Builder getWorkflowBuilder(DAGType dagType) { - // create task configs - List taskConfig1 = new ArrayList(); - List taskConfig2 = new ArrayList(); - List taskConfig3 = new ArrayList(); - List taskConfig4 = new ArrayList(); - taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build()); - taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build()); - taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build()); - taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build()); - - // create job configs - JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3); - JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3); - JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3); - JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3); - - // create workflow - Workflow.Builder workflowBuilder = new Workflow.Builder("helix_workflow").setExpiry(0); - workflowBuilder.addJob("helix_job_a", jobConfig1); - workflowBuilder.addJob("helix_job_b", jobConfig2); - workflowBuilder.addJob("helix_job_c", jobConfig3); - workflowBuilder.addJob("helix_job_d", jobConfig4); + private static Workflow.Builder getWorkflowBuilder(DAGType dagType) throws Exception { + Workflow.Builder workflow = null; + + if (dagType.equals(DAGType.SSH)) { + if (!setWorkflowData()) { + throw new Exception("Failed to create zk data for SSH workflow"); + } + // create dir task - 1 task for job + List createDirTask = new ArrayList(); + createDirTask.add(new TaskConfig.Builder().setTaskId(CREATE_DIR_TASK).setCommand(SSHTask.TASK_COMMAND).build()); + + // copy files task - 2 tasks for job + List copyFilesTask = new ArrayList(); + copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PBS_TASK).setCommand(SSHTask.TASK_COMMAND).build()); + copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PY_TASK).setCommand(SSHTask.TASK_COMMAND).build()); + + // qsub task - 1 task for job + List qsubTask = new ArrayList(); + qsubTask.add(new TaskConfig.Builder().setTaskId(QSUB_TASK).setCommand(SSHTask.TASK_COMMAND).build()); + + // create-dir job config + JobConfig.Builder createDirJob = new JobConfig.Builder().addTaskConfigs(createDirTask).setMaxAttemptsPerTask(3); + // copy-files job config + JobConfig.Builder copyFilesJob = new JobConfig.Builder().addTaskConfigs(copyFilesTask).setMaxAttemptsPerTask(3); + + // qsub job config + JobConfig.Builder qsubJob = new JobConfig.Builder().addTaskConfigs(qsubTask).setMaxAttemptsPerTask(1); + + // create workflow + workflow = new Workflow.Builder(SSH_WORKFLOW).setExpiry(0); + workflow.addJob("createDirJob", createDirJob); + workflow.addJob("copyFilesJob", copyFilesJob); + workflow.addJob("qsubJob", qsubJob); + } else { + // create task configs + List taskConfig1 = new ArrayList(); + List taskConfig2 = new ArrayList(); + List taskConfig3 = new ArrayList(); + List taskConfig4 = new ArrayList(); + taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build()); + taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build()); + taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build()); + taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build()); + + // create job configs + JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3); + JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3); + JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3); + JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3); + + // create workflow + workflow = new Workflow.Builder("helix_workflow").setExpiry(0); + workflow.addJob("helix_job_a", jobConfig1); + workflow.addJob("helix_job_b", jobConfig2); + workflow.addJob("helix_job_c", jobConfig3); + workflow.addJob("helix_job_d", jobConfig4); + } switch (dagType) { case TYPE_A: - workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_b"); - workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c"); - workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d"); + workflow.addParentChildDependency("helix_job_a", "helix_job_b"); + workflow.addParentChildDependency("helix_job_b", "helix_job_c"); + workflow.addParentChildDependency("helix_job_c", "helix_job_d"); break; case TYPE_B: - workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_c"); - workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d"); - workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b"); + workflow.addParentChildDependency("helix_job_a", "helix_job_c"); + workflow.addParentChildDependency("helix_job_c", "helix_job_d"); + workflow.addParentChildDependency("helix_job_d", "helix_job_b"); break; case TYPE_C: - workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_d"); - workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b"); - workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c"); + workflow.addParentChildDependency("helix_job_a", "helix_job_d"); + workflow.addParentChildDependency("helix_job_d", "helix_job_b"); + workflow.addParentChildDependency("helix_job_b", "helix_job_c"); + break; + + case SSH: + workflow.addParentChildDependency("createDirJob", "copyFilesJob"); + workflow.addParentChildDependency("copyFilesJob", "qsubJob"); break; } - return workflowBuilder; + return workflow; } - public static Workflow getWorkflow(DAGType dagType) { + public static Workflow getWorkflow(DAGType dagType) throws Exception { Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType); return workflowBuilder.build(); } @@ -79,4 +146,89 @@ public class HelixUtil { private static String generateWorkflowName() { return "workflow_" + ThreadLocalRandom.current().nextInt(9999); } + + private static boolean setWorkflowData() { + try { + CuratorFramework curatorClient = ZkUtils.getCuratorClient(); + + SSHKeyAuthentication br2SshAuthentication = new SSHKeyAuthentication( + "snakanda", + IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")), + IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")), + "dummy", + HelixUtil.class.getClassLoader().getResource("ssh/known_hosts").getPath(), + false + ); + + SSHServerInfo br2 = new SSHServerInfo("snakanda", "bigred2.uits.iu.edu", br2SshAuthentication,22); + + SSHTaskContext createDirTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND, + br2SshAuthentication, null, br2, "mkdir -p airavata"); + + SSHTaskContext qsubTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND, + br2SshAuthentication, null, br2, "qsub ~/airavata/job_tf.pbs"); + + + SSHTaskContext copyPbsTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY, + br2SshAuthentication, null, br2, + HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/"); + + SSHTaskContext copyPyTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY, + br2SshAuthentication, null, br2, + HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/"); + + ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, CREATE_DIR_TASK, getBytes(createDirTC)); + ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PBS_TASK, getBytes(copyPbsTC)); + ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PY_TASK, getBytes(copyPyTC)); + ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, QSUB_TASK, getBytes(qsubTC)); + } catch (Exception ex) { + ex.printStackTrace(); + return false; + } + return true; + } + + private static byte[] getBytes(Object object) throws Exception { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bos); + out.writeObject(object); + out.flush(); + return bos.toByteArray(); + } + + public static Object getObject(byte[] objectBytes) throws Exception { + ByteArrayInputStream bis = new ByteArrayInputStream(objectBytes); + ObjectInput in = new ObjectInputStream(bis); + return in.readObject(); + } + +// public static void main(String[] args) throws Exception { +// +// CuratorFramework curatorClient = ZkUtils.getCuratorClient(); +// +// // set common data +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, USERNAME, "snakanda"); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, HOSTNAME, "bigred2.uits.iu.edu"); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PORT, "22"); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PUBLIC_KEY, +// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub"))); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PRIVATE_KEY, +// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa"))); +// +// +// // set data for mkdir +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "mkdir -p airavata"); +// +// // set data for copy files +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, SRC_PATH, +// HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath()); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, DEST_PATH, "~/airavata/"); +// +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, SRC_PATH, +// HelixUtil.class.getClassLoader().getResource("code_tf.py").getPath()); +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, DEST_PATH, "~/airavata/"); +// +// // set data for qsub +// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "qsub ~/airavata/job_tf.pbs"); +// } } http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java index 54466f2..9bca76e 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java @@ -4,6 +4,7 @@ import edu.iu.helix.airavata.tasks.HelixTaskA; import edu.iu.helix.airavata.tasks.HelixTaskB; import edu.iu.helix.airavata.tasks.HelixTaskC; import edu.iu.helix.airavata.tasks.HelixTaskD; +import edu.iu.helix.airavata.tasks.ssh.SSHTask; import org.apache.helix.InstanceType; import org.apache.helix.examples.OnlineOfflineStateModelFactory; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -93,6 +94,12 @@ public class ParticipantNode implements Runnable { // register task model Map taskRegistry = new HashMap(); + taskRegistry.put(SSHTask.TASK_COMMAND, new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new SSHTask(context); + } + }); taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() { @Override public Task createNewTask(TaskCallbackContext context) { http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java new file mode 100644 index 0000000..d07a17a --- /dev/null +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java @@ -0,0 +1,72 @@ +package edu.iu.helix.airavata; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by goshenoy on 6/28/17. + */ +public class ZkUtils { + + private static final Logger logger = LoggerFactory.getLogger(ZkUtils.class); + private static CuratorFramework curatorClient; + + /** + * Get curatorFramework instance + * @return + * @throws Exception + */ + public static CuratorFramework getCuratorClient() throws Exception { + if (curatorClient == null) { + synchronized (ZkUtils.class) { + if (curatorClient == null) { + String connectionSting = HelixUtil.ZK_ADDRESS; + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); + curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy); + curatorClient.start(); + } + } + } + + return curatorClient; + } + + public static String getZkPath(String parentNode, String childNode) throws Exception { + return ZKPaths.makePath(parentNode, childNode); + } + + public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, String data) throws Exception { + String zkPath = getZkPath(parentNode, childNode); + logger.debug("Creating Zk node for: " + zkPath); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath); + + setZkData(curatorClient, zkPath, data.getBytes()); + } + + public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, byte[] data) throws Exception { + String zkPath = getZkPath(parentNode, childNode); + logger.debug("Creating Zk node for: " + zkPath); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath); + + setZkData(curatorClient, zkPath, data); + } + + public static void setZkData(CuratorFramework curatorClient, String zkPath, byte[] data) throws Exception { + curatorClient.setData().withVersion(-1).forPath(zkPath, data); + } + + public static Object getZkData(CuratorFramework curatorClient, String parentNode, String childNode) throws Exception { + String zkPath = getZkPath(parentNode, childNode); + return getZkData(curatorClient, zkPath); + } + + public static Object getZkData(CuratorFramework curatorClient, String zkPath) throws Exception { + byte[] data = curatorClient.getData().forPath(zkPath); + return HelixUtil.getObject(data); + } + } http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java index 49a8177..2d77210 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java @@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Authentication { +import java.io.Serializable; + +public class Authentication implements Serializable { private final static Logger logger = LoggerFactory.getLogger(Authentication.class); protected String userName; http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java new file mode 100644 index 0000000..b398248 --- /dev/null +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package edu.iu.helix.airavata.tasks.ssh; + +/** + * An exception class to wrap SSH command execution related errors. + */ +public class SSHApiException extends Exception { + + public SSHApiException(String message) { + super(message); + } + + public SSHApiException(String message, Exception e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java new file mode 100644 index 0000000..89e64c4 --- /dev/null +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package edu.iu.helix.airavata.tasks.ssh; + +import com.jcraft.jsch.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +public class SSHCommandOutputReader { + + private static final Logger logger = LoggerFactory.getLogger(SSHCommandOutputReader.class); + String stdOutputString = null; + String stdErrorString = null; + ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); + private int exitCode; + + public void onOutput(Channel channel) { + try { + this.setStdOutputString(getOutputStream(channel, channel.getInputStream())); + this.setStdErrorString(new String(errorStream.toByteArray(), "UTF-8")); + this.exitCode = channel.getExitStatus(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + + } + + private String getOutputStream(Channel channel, InputStream inputStream) throws IOException { + StringBuilder output = new StringBuilder(""); + byte[] tmp = new byte[1024]; + do { + while (inputStream.available() > 0) { + int i = inputStream.read(tmp, 0, 1024); + if (i < 0) break; + output.append(new String(tmp, 0, i)); + } + } while (!channel.isClosed()) ; + return output.toString(); + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public int getExitCode() { + return exitCode; + } + + public String getStdOutputString() { + return stdOutputString; + } + + public void setStdOutputString(String stdOutputString) { + this.stdOutputString = stdOutputString; + } + + public String getStdErrorString() { + return stdErrorString; + } + + public void setStdErrorString(String stdErrorString) { + this.stdErrorString = stdErrorString; + } + + public ByteArrayOutputStream getErrorStream() { + return errorStream; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java index 0ddf61e..add975c 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java @@ -21,7 +21,7 @@ package edu.iu.helix.airavata.tasks.ssh; -public class SSHKeyAuthentication extends Authentication{ +public class SSHKeyAuthentication extends Authentication { private byte[] privateKey; private byte[] publicKey; private String passphrase; http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java new file mode 100644 index 0000000..b2486ef --- /dev/null +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java @@ -0,0 +1,394 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package edu.iu.helix.airavata.tasks.ssh; + +import com.jcraft.jsch.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; + +public class SSHRunner { + private final static Logger log = LoggerFactory.getLogger(SSHRunner.class); + + public Session createSSHSession(SSHServerInfo serverInfo, SSHKeyAuthentication authentication) throws JSchException { + JSch jSch = new JSch(); + jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(), + authentication.getPassphrase().getBytes()); + Session session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(), + serverInfo.getSshPort()); + session.setUserInfo(new SSHUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase())); + if (authentication.getStrictHostKeyChecking().equals("yes")) { + jSch.setKnownHosts(authentication.getKnownHostsFilePath()); + } else { + session.setConfig("StrictHostKeyChecking", "no"); + } + session.connect(); + + return session; + } + + public String scpTo(String routingKey, String localFile, String remoteFile, SSHServerInfo serverInfo, + SSHKeyAuthentication authentication) throws IOException, JSchException, SSHApiException { + + Session session = createSSHSession(serverInfo, authentication); + + FileInputStream fis = null; + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + boolean ptimestamp = true; + + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; + Channel channel = session.openChannel("exec"); + + SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader(); + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream()); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + File _lfile = new File(localFile); + + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (localFile.lastIndexOf('/') > 0) { + command += localFile.substring(localFile.lastIndexOf('/') + 1); + } else { + command += localFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of localFile + fis = new FileInputStream(localFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + stdOutReader.onOutput(channel); + + + channel.disconnect(); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + session.disconnect(); + + //since remote file is always a file we just return the file + return remoteFile; + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + * @return returns the final local file path of the new file came from the remote resource + */ + public void scpFrom(String routingKey, String remoteFile, String localFile, SSHServerInfo serverInfo, + SSHKeyAuthentication authentication) throws IOException, + JSchException, SSHApiException { + Session session = createSSHSession(serverInfo, authentication); + FileOutputStream fos = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader(); + + // exec 'scp -f remotefile' remotely + String command = "scp -f " + remoteFile; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream()); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + if (!channel.isClosed()){ + channel.connect(); + } + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + session.disconnect(); + } catch (Exception ee) { + } + } + } + + public void makeDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication) + throws IOException, JSchException, Exception { + + Session session = createSSHSession(serverInfo, authentication); + + // exec 'scp -t rfile' remotely + String command = "mkdir -p " + path; + Channel channel = session.openChannel("exec"); + SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader(); + + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream()); + + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); + log.error("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName()); + throw e; + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("mkdir:")) { + throw new Exception(stdOutReader.getStdErrorString()); + } + + channel.disconnect(); + session.disconnect(); + } + + public List listDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication) + throws IOException, JSchException, Exception { + + Session session = createSSHSession(serverInfo, authentication); + + // exec 'scp -t rfile' remotely + String command = "ls " + path; + Channel channel = session.openChannel("exec"); + SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader(); + + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream()); + + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); + + throw new Exception("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + if (stdOutReader.getStdErrorString().contains("ls:")) { + throw new Exception(stdOutReader.getStdErrorString()); + } + channel.disconnect(); + session.disconnect(); + return Arrays.asList(stdOutReader.getStdOutputString().split("\n")); + } + + public SSHCommandOutputReader executeCommand(String routingKey, String command, SSHServerInfo serverInfo, + SSHKeyAuthentication authentication) throws Exception { + + Session session = createSSHSession(serverInfo, authentication); + + Map results = new HashMap<>(); + + Channel channel = session.openChannel("exec"); + SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader(); + + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream()); + + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); + + throw new Exception("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + session.disconnect(); + + return stdOutReader; + } + + public SSHCommandOutputReader executeCommand(String routingKey, String[] commands, SSHServerInfo serverInfo, + SSHKeyAuthentication authentication) throws Exception { + return executeCommand(routingKey, String.join(" && ", commands), serverInfo, authentication); + } + + private int checkAck(InputStream in) throws IOException { + int b = in.read(); + if (b == 0) return b; + if (b == -1) return b; + + if (b == 1 || b == 2) { + StringBuffer sb = new StringBuffer(); + int c; + do { + c = in.read(); + sb.append((char) c); + } + while (c != '\n'); + if (b == 1) { // error + System.out.print(sb.toString()); + } + if (b == 2) { // fatal error + System.out.print(sb.toString()); + } + log.warn(sb.toString()); + } + return b; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java index 4e72ade..0ea53a7 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java @@ -20,19 +20,26 @@ */ package edu.iu.helix.airavata.tasks.ssh; +import edu.iu.helix.airavata.HelixUtil; +import edu.iu.helix.airavata.ZkUtils; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; + public class SSHTask implements Task { private final static Logger logger = LoggerFactory.getLogger(SSHTask.class); + public static final String TASK_COMMAND = "SSH_TASK"; private TaskCallbackContext callbackContext; + private String taskId; public SSHTask(TaskCallbackContext callbackContext){ this.callbackContext = callbackContext; + this.taskId = callbackContext.getTaskConfig().getId(); } @Override @@ -46,7 +53,36 @@ public class SSHTask implements Task { // Todo Deserialize the TaskContext from data store. // byte[] output = curator.getData().forPath(path); // List newList = (List)SerializationUtils.deserialize(output); - return null; + + System.out.println("Running SSH Task for ID: " + taskId); + try { + SSHTaskContext taskContext = (SSHTaskContext) ZkUtils.getZkData(ZkUtils.getCuratorClient(), HelixUtil.SSH_WORKFLOW, taskId); + String routingKey = UUID.randomUUID().toString(); + SSHRunner sshExecutor = new SSHRunner(); + + System.out.println("Task: " + taskId + ", is of Type: " + taskContext.getTask_type()); + switch (taskContext.getTask_type()) { + + case EXECUTE_COMMAND: + SSHCommandOutputReader sshOut = sshExecutor.executeCommand(routingKey, taskContext.getCommand(), + (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication()); + System.out.println("SSH Command Output: " + sshOut.getStdOutputString()); + break; + + case FILE_COPY: + String scpOut = sshExecutor.scpTo(routingKey, taskContext.getSourceFilePath(), taskContext.getDestFilePath(), + (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication()); + System.out.println("SCP Command Output: " + scpOut); + break; + + default: + throw new Exception("Unknown SSH Task Type: " + taskContext.getTask_type()); + } + } catch (Exception ex) { + System.err.println("Something went wrong for task: " + taskId + ", reason: " + ex); + return new TaskResult(TaskResult.Status.FAILED, "SSH command completed!"); + } + return new TaskResult(TaskResult.Status.COMPLETED, "SSH command completed!"); } @Override http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java index 2b2024c..e780331 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java @@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SSHTaskContext { +import java.io.Serializable; + +public class SSHTaskContext implements Serializable { private final static Logger logger = LoggerFactory.getLogger(SSHTaskContext.class); public static enum TASK_TYPE {FILE_COPY, EXECUTE_COMMAND;} @@ -127,4 +129,17 @@ public class SSHTaskContext { public void setCommand(String command) { this.command = command; } + + @Override + public String toString() { + return "SSHTaskContext{" + + "task_type=" + task_type + + ", sshKeyAuthentication=" + sshKeyAuthentication + + ", sshUserInfo=" + sshUserInfo + + ", serverInfo=" + serverInfo + + ", sourceFilePath='" + sourceFilePath + '\'' + + ", destFilePath='" + destFilePath + '\'' + + ", command='" + command + '\'' + + '}'; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java index a0ac81c..df5a575 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java @@ -22,7 +22,9 @@ package edu.iu.helix.airavata.tasks.ssh; import com.jcraft.jsch.UserInfo; -public class SSHUserInfo implements UserInfo { +import java.io.Serializable; + +public class SSHUserInfo implements UserInfo, Serializable { private String userName; private String password; http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java index f47fc5b..4917e47 100644 --- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java +++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java @@ -20,7 +20,9 @@ */ package edu.iu.helix.airavata.tasks.ssh; -public class ServerInfo { +import java.io.Serializable; + +public class ServerInfo implements Serializable { public static enum ComProtocol {SSH, LOCAL} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/resources/ssh/id_rsa.pub ---------------------------------------------------------------------- diff --git a/helix-playground/src/main/resources/ssh/id_rsa.pub b/helix-playground/src/main/resources/ssh/id_rsa.pub index c3ac0b0..e378be0 100644 --- a/helix-playground/src/main/resources/ssh/id_rsa.pub +++ b/helix-playground/src/main/resources/ssh/id_rsa.pub @@ -1,2 +1 @@ - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCr9VP22p0I+2W5o/klPv/OvfTihvcBQwZXKPrSLFF+OB9nVNtIfDDETIwwex7mknn3Kks1jFvEdKMrvRjOFeFInDv3N40LjohHu4v2tiawAON7MOLpz/iX5dWp0wteixlDKfGe7PAEMAk054kLSDiB3em2zBK4d9ApedA5k2JG1dmAsNK0KkbfgFPd5+iXrzgTg4XiefHQoaCSUyS7w6t8645djbYOP+b+SJtgslaf2RqeoBVvrA6YQJE1pUYjcm9yL4KwyqaPo+N/2XZ6xys5+WN8svtL3uRduENU1MQSTpdFq+GLCY4SgLMFgLJKoxHjcjPRfKyE/eYk1gQA7b/Z snakanda@149-161-141-51.dhcp-bl.indiana.edu