airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From goshe...@apache.org
Subject airavata-sandbox git commit: Add SSH Job Workflow
Date Wed, 28 Jun 2017 19:39:57 GMT
Repository: airavata-sandbox
Updated Branches:
  refs/heads/master 8b284d8f8 -> 2e56d5ce3


Add SSH Job Workflow


Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/2e56d5ce
Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/2e56d5ce
Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/2e56d5ce

Branch: refs/heads/master
Commit: 2e56d5ce3a30c8f546dc70d80879a86680e0c196
Parents: 8b284d8
Author: Gourav Shenoy <shenoy.200@gmail.com>
Authored: Wed Jun 28 15:39:52 2017 -0400
Committer: Gourav Shenoy <shenoy.200@gmail.com>
Committed: Wed Jun 28 15:39:52 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 helix-playground/pom.xml                        |   9 +-
 .../iu/helix/airavata/HelixClusterManager.java  |  18 +-
 .../java/edu/iu/helix/airavata/HelixUtil.java   | 222 +++++++++--
 .../edu/iu/helix/airavata/ParticipantNode.java  |   7 +
 .../java/edu/iu/helix/airavata/ZkUtils.java     |  72 ++++
 .../airavata/tasks/ssh/Authentication.java      |   4 +-
 .../airavata/tasks/ssh/SSHApiException.java     |  35 ++
 .../tasks/ssh/SSHCommandOutputReader.java       |  90 +++++
 .../tasks/ssh/SSHKeyAuthentication.java         |   2 +-
 .../iu/helix/airavata/tasks/ssh/SSHRunner.java  | 394 +++++++++++++++++++
 .../iu/helix/airavata/tasks/ssh/SSHTask.java    |  38 +-
 .../airavata/tasks/ssh/SSHTaskContext.java      |  17 +-
 .../helix/airavata/tasks/ssh/SSHUserInfo.java   |   4 +-
 .../iu/helix/airavata/tasks/ssh/ServerInfo.java |   4 +-
 .../src/main/resources/ssh/id_rsa.pub           |   1 -
 16 files changed, 865 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4a9b5e3..d6e588b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
 *.class
+*id_rsa*
 
 # Mobile Tools for Java (J2ME)
 .mtj.tmp/

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/pom.xml
----------------------------------------------------------------------
diff --git a/helix-playground/pom.xml b/helix-playground/pom.xml
index 0b624c6..809c712 100644
--- a/helix-playground/pom.xml
+++ b/helix-playground/pom.xml
@@ -19,8 +19,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>1.6</source>
-                    <target>1.6</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                 </configuration>
             </plugin>
         </plugins>
@@ -54,6 +54,11 @@
             <artifactId>jsch</artifactId>
             <version>0.1.53</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>2.8.0</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
index 313a674..927b377 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
@@ -102,10 +102,10 @@ public class HelixClusterManager {
     }
 
     public boolean submitDag(HelixUtil.DAGType dagType) {
-        Workflow workflow = HelixUtil.getWorkflow(dagType);
-        taskDriver.start(workflow);
-        System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
         try {
+            Workflow workflow = HelixUtil.getWorkflow(dagType);
+            taskDriver.start(workflow);
+            System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
             taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED);
 //            while (true) {
 //                Thread.sleep(100);
@@ -129,8 +129,8 @@ public class HelixClusterManager {
     }
 
     public static void main(String[] args) {
-        String clusterName = "HelixDemoCluster";
-        String zkAddress = "localhost:2199";
+        String clusterName = HelixUtil.CLUSTER_NAME;
+        String zkAddress = HelixUtil.ZK_ADDRESS;
         int numWorkers = 3;
         int numPartitions = 1;
 
@@ -142,11 +142,11 @@ public class HelixClusterManager {
             HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions);
             manager.startHelixCluster();
 
-            System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.TYPE_A);
-            if (manager.submitDag(HelixUtil.DAGType.TYPE_A)) {
-                System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+            System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.SSH);
+            if (manager.submitDag(HelixUtil.DAGType.SSH)) {
+                System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.SSH);
             } else {
-                throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+                throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.SSH);
             }
         } catch (Exception ex) {
             logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex);

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
index 2cea68f..b2b634d 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
@@ -4,11 +4,19 @@ import edu.iu.helix.airavata.tasks.HelixTaskA;
 import edu.iu.helix.airavata.tasks.HelixTaskB;
 import edu.iu.helix.airavata.tasks.HelixTaskC;
 import edu.iu.helix.airavata.tasks.HelixTaskD;
+import edu.iu.helix.airavata.tasks.ssh.SSHKeyAuthentication;
+import edu.iu.helix.airavata.tasks.ssh.SSHServerInfo;
+import edu.iu.helix.airavata.tasks.ssh.SSHTask;
+import edu.iu.helix.airavata.tasks.ssh.SSHTaskContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.helix.task.*;
 import org.jboss.netty.util.internal.ThreadLocalRandom;
 
+import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Created by goshenoy on 6/21/17.
@@ -16,62 +24,121 @@ import java.util.List;
 public class HelixUtil {
 
     public static final String TASK_STATE_DEF = "Task";
+    public static final String ZK_ADDRESS = "localhost:2199";
+    public static final String CLUSTER_NAME = "HelixDemoCluster";
+
+    public static final String SSH_WORKFLOW = "SSH_Workflow";
+    public static final String CREATE_DIR_TASK = "Task_CreateDir";
+    public static final String COPY_PBS_TASK = "Task_CopyPBS";
+    public static final String COPY_PY_TASK = "Task_CopyPY";
+    public static final String QSUB_TASK = "Task_QSUB";
+
+
+    public static final String USERNAME = "username";
+    public static final String PRIVATE_KEY = "private_key";
+    public static final String PUBLIC_KEY  = "public_key";
+    public static final String HOSTNAME = "hostname";
+    public static final String PORT = "port";
+    public static final String COMMAND = "command";
+    public static final String SRC_PATH = "src_path";
+    public static final String DEST_PATH = "dest_path";
+
 
     public enum DAGType {
         TYPE_A,
         TYPE_B,
-        TYPE_C
+        TYPE_C,
+        SSH
     }
 
-    private static Workflow.Builder getWorkflowBuilder(DAGType dagType) {
-        // create task configs
-        List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
-        List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
-        List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
-        List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
-        taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
-        taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
-        taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
-        taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
-
-        // create job configs
-        JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
-        JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
-        JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
-        JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
-
-        // create workflow
-        Workflow.Builder workflowBuilder = new Workflow.Builder("helix_workflow").setExpiry(0);
-        workflowBuilder.addJob("helix_job_a", jobConfig1);
-        workflowBuilder.addJob("helix_job_b", jobConfig2);
-        workflowBuilder.addJob("helix_job_c", jobConfig3);
-        workflowBuilder.addJob("helix_job_d", jobConfig4);
+    private static Workflow.Builder getWorkflowBuilder(DAGType dagType) throws Exception {
+        Workflow.Builder workflow = null;
+
+        if (dagType.equals(DAGType.SSH)) {
+            if (!setWorkflowData()) {
+                throw new Exception("Failed to create zk data for SSH workflow");
+            }
+            // create dir task - 1 task for job
+            List<TaskConfig> createDirTask = new ArrayList<TaskConfig>();
+            createDirTask.add(new TaskConfig.Builder().setTaskId(CREATE_DIR_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+            // copy files task - 2 tasks for job
+            List<TaskConfig> copyFilesTask = new ArrayList<TaskConfig>();
+            copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PBS_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+            copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PY_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+            // qsub task - 1 task for job
+            List<TaskConfig> qsubTask = new ArrayList<TaskConfig>();
+            qsubTask.add(new TaskConfig.Builder().setTaskId(QSUB_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+            // create-dir job config
+            JobConfig.Builder createDirJob = new JobConfig.Builder().addTaskConfigs(createDirTask).setMaxAttemptsPerTask(3);
 
+            // copy-files job config
+            JobConfig.Builder copyFilesJob = new JobConfig.Builder().addTaskConfigs(copyFilesTask).setMaxAttemptsPerTask(3);
+
+            // qsub job config
+            JobConfig.Builder qsubJob = new JobConfig.Builder().addTaskConfigs(qsubTask).setMaxAttemptsPerTask(1);
+
+            // create workflow
+            workflow = new Workflow.Builder(SSH_WORKFLOW).setExpiry(0);
+            workflow.addJob("createDirJob", createDirJob);
+            workflow.addJob("copyFilesJob", copyFilesJob);
+            workflow.addJob("qsubJob", qsubJob);
+        } else {
+            // create task configs
+            List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
+            List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
+            List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
+            List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
+            taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
+            taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
+            taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
+            taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
+
+            // create job configs
+            JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
+            JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
+            JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
+            JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
+
+            // create workflow
+            workflow = new Workflow.Builder("helix_workflow").setExpiry(0);
+            workflow.addJob("helix_job_a", jobConfig1);
+            workflow.addJob("helix_job_b", jobConfig2);
+            workflow.addJob("helix_job_c", jobConfig3);
+            workflow.addJob("helix_job_d", jobConfig4);
+        }
 
         switch (dagType) {
             case TYPE_A:
-                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_b");
-                workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
-                workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+                workflow.addParentChildDependency("helix_job_a", "helix_job_b");
+                workflow.addParentChildDependency("helix_job_b", "helix_job_c");
+                workflow.addParentChildDependency("helix_job_c", "helix_job_d");
                 break;
 
             case TYPE_B:
-                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_c");
-                workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
-                workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+                workflow.addParentChildDependency("helix_job_a", "helix_job_c");
+                workflow.addParentChildDependency("helix_job_c", "helix_job_d");
+                workflow.addParentChildDependency("helix_job_d", "helix_job_b");
                 break;
 
             case TYPE_C:
-                workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_d");
-                workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
-                workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+                workflow.addParentChildDependency("helix_job_a", "helix_job_d");
+                workflow.addParentChildDependency("helix_job_d", "helix_job_b");
+                workflow.addParentChildDependency("helix_job_b", "helix_job_c");
+                break;
+
+            case SSH:
+                workflow.addParentChildDependency("createDirJob", "copyFilesJob");
+                workflow.addParentChildDependency("copyFilesJob", "qsubJob");
                 break;
         }
 
-        return workflowBuilder;
+        return workflow;
     }
 
-    public static Workflow getWorkflow(DAGType dagType) {
+    public static Workflow getWorkflow(DAGType dagType) throws Exception {
         Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType);
         return workflowBuilder.build();
     }
@@ -79,4 +146,89 @@ public class HelixUtil {
     private static String generateWorkflowName() {
         return "workflow_" + ThreadLocalRandom.current().nextInt(9999);
     }
+
+    private static boolean setWorkflowData() {
+        try {
+            CuratorFramework curatorClient = ZkUtils.getCuratorClient();
+
+            SSHKeyAuthentication br2SshAuthentication = new SSHKeyAuthentication(
+                    "snakanda",
+                    IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")),
+                    IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")),
+                    "dummy",
+                    HelixUtil.class.getClassLoader().getResource("ssh/known_hosts").getPath(),
+                    false
+            );
+
+            SSHServerInfo br2 = new SSHServerInfo("snakanda", "bigred2.uits.iu.edu", br2SshAuthentication,22);
+
+            SSHTaskContext createDirTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
+                    br2SshAuthentication, null, br2, "mkdir -p airavata");
+
+            SSHTaskContext qsubTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
+                    br2SshAuthentication, null, br2, "qsub ~/airavata/job_tf.pbs");
+
+
+            SSHTaskContext copyPbsTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
+                    br2SshAuthentication, null, br2,
+                    HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
+
+            SSHTaskContext copyPyTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
+                    br2SshAuthentication, null, br2,
+                    HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
+
+            ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, CREATE_DIR_TASK, getBytes(createDirTC));
+            ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PBS_TASK, getBytes(copyPbsTC));
+            ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PY_TASK, getBytes(copyPyTC));
+            ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, QSUB_TASK, getBytes(qsubTC));
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    private static byte[] getBytes(Object object) throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutput out = new ObjectOutputStream(bos);
+        out.writeObject(object);
+        out.flush();
+        return bos.toByteArray();
+    }
+
+    public static Object getObject(byte[] objectBytes) throws Exception {
+        ByteArrayInputStream bis = new ByteArrayInputStream(objectBytes);
+        ObjectInput in = new ObjectInputStream(bis);
+        return in.readObject();
+    }
+
+//    public static void main(String[] args) throws  Exception {
+//
+//        CuratorFramework curatorClient = ZkUtils.getCuratorClient();
+//
+//        // set common data
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, USERNAME, "snakanda");
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, HOSTNAME, "bigred2.uits.iu.edu");
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PORT, "22");
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PUBLIC_KEY,
+//                IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")));
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PRIVATE_KEY,
+//                IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")));
+//
+//
+//        // set data for mkdir
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "mkdir -p airavata");
+//
+//        // set data for copy files
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, SRC_PATH,
+//                HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath());
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, DEST_PATH, "~/airavata/");
+//
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, SRC_PATH,
+//                HelixUtil.class.getClassLoader().getResource("code_tf.py").getPath());
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, DEST_PATH, "~/airavata/");
+//
+//        // set data for qsub
+//        ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "qsub ~/airavata/job_tf.pbs");
+//    }
 }

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
index 54466f2..9bca76e 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
@@ -4,6 +4,7 @@ import edu.iu.helix.airavata.tasks.HelixTaskA;
 import edu.iu.helix.airavata.tasks.HelixTaskB;
 import edu.iu.helix.airavata.tasks.HelixTaskC;
 import edu.iu.helix.airavata.tasks.HelixTaskD;
+import edu.iu.helix.airavata.tasks.ssh.SSHTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.examples.OnlineOfflineStateModelFactory;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -93,6 +94,12 @@ public class ParticipantNode implements Runnable {
 
             // register task model
             Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+            taskRegistry.put(SSHTask.TASK_COMMAND, new TaskFactory() {
+                @Override
+                public Task createNewTask(TaskCallbackContext context) {
+                    return new SSHTask(context);
+                }
+            });
             taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
                 @Override
                 public Task createNewTask(TaskCallbackContext context) {

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
new file mode 100644
index 0000000..d07a17a
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
@@ -0,0 +1,72 @@
+package edu.iu.helix.airavata;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by goshenoy on 6/28/17.
+ */
+public class ZkUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(ZkUtils.class);
+    private static CuratorFramework curatorClient;
+
+    /**
+     *  Get curatorFramework instance
+     * @return
+     * @throws Exception
+     */
+    public static CuratorFramework getCuratorClient() throws Exception {
+        if (curatorClient == null) {
+            synchronized (ZkUtils.class) {
+                if (curatorClient == null) {
+                    String connectionSting = HelixUtil.ZK_ADDRESS;
+                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+                    curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+                    curatorClient.start();
+                }
+            }
+        }
+
+        return curatorClient;
+    }
+
+    public static String getZkPath(String parentNode, String childNode) throws Exception {
+        return ZKPaths.makePath(parentNode, childNode);
+    }
+
+    public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, String data) throws Exception {
+        String zkPath = getZkPath(parentNode, childNode);
+        logger.debug("Creating Zk node for: " + zkPath);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath);
+
+        setZkData(curatorClient, zkPath, data.getBytes());
+    }
+
+    public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, byte[] data) throws Exception {
+        String zkPath = getZkPath(parentNode, childNode);
+        logger.debug("Creating Zk node for: " + zkPath);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath);
+
+        setZkData(curatorClient, zkPath, data);
+    }
+
+    public static void setZkData(CuratorFramework curatorClient, String zkPath, byte[] data) throws Exception {
+        curatorClient.setData().withVersion(-1).forPath(zkPath, data);
+    }
+
+    public static Object getZkData(CuratorFramework curatorClient, String parentNode, String childNode) throws Exception {
+        String zkPath = getZkPath(parentNode, childNode);
+        return getZkData(curatorClient, zkPath);
+    }
+
+    public static Object getZkData(CuratorFramework curatorClient, String zkPath) throws Exception {
+        byte[] data = curatorClient.getData().forPath(zkPath);
+        return HelixUtil.getObject(data);
+    }
+ }

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
index 49a8177..2d77210 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
@@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Authentication {
+import java.io.Serializable;
+
+public class Authentication implements Serializable {
     private final static Logger logger = LoggerFactory.getLogger(Authentication.class);
 
     protected String userName;

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
new file mode 100644
index 0000000..b398248
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+/**
+ * An exception class to wrap SSH command execution related errors.
+ */
+public class SSHApiException extends Exception {
+
+    public SSHApiException(String message) {
+        super(message);
+    }
+
+    public SSHApiException(String message, Exception e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
new file mode 100644
index 0000000..89e64c4
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+import com.jcraft.jsch.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSHCommandOutputReader {
+
+    private static final Logger logger = LoggerFactory.getLogger(SSHCommandOutputReader.class);
+    String stdOutputString = null;
+    String stdErrorString = null;
+    ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+    private int exitCode;
+
+    public void onOutput(Channel channel) {
+        try {
+            this.setStdOutputString(getOutputStream(channel, channel.getInputStream()));
+            this.setStdErrorString(new String(errorStream.toByteArray(), "UTF-8"));
+            this.exitCode = channel.getExitStatus();
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+
+    private String getOutputStream(Channel channel, InputStream inputStream) throws IOException {
+        StringBuilder output = new StringBuilder("");
+        byte[] tmp = new byte[1024];
+        do {
+            while (inputStream.available() > 0) {
+                int i = inputStream.read(tmp, 0, 1024);
+                if (i < 0) break;
+                output.append(new String(tmp, 0, i));
+            }
+        } while (!channel.isClosed()) ;
+        return  output.toString();
+    }
+
+    public void setExitCode(int exitCode) {
+        this.exitCode = exitCode;
+    }
+
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    public String getStdOutputString() {
+        return stdOutputString;
+    }
+
+    public void setStdOutputString(String stdOutputString) {
+        this.stdOutputString = stdOutputString;
+    }
+
+    public String getStdErrorString() {
+        return stdErrorString;
+    }
+
+    public void setStdErrorString(String stdErrorString) {
+        this.stdErrorString = stdErrorString;
+    }
+
+    public ByteArrayOutputStream getErrorStream() {
+        return errorStream;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
index 0ddf61e..add975c 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
@@ -21,7 +21,7 @@
 package edu.iu.helix.airavata.tasks.ssh;
 
 
-public class SSHKeyAuthentication extends Authentication{
+public class SSHKeyAuthentication extends Authentication {
     private byte[] privateKey;
     private byte[] publicKey;
     private String passphrase;

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
new file mode 100644
index 0000000..b2486ef
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
@@ -0,0 +1,394 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+import com.jcraft.jsch.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+public class SSHRunner {
+    private final static Logger log = LoggerFactory.getLogger(SSHRunner.class);
+
+    public Session createSSHSession(SSHServerInfo serverInfo, SSHKeyAuthentication authentication) throws JSchException {
+            JSch jSch = new JSch();
+            jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(),
+                    authentication.getPassphrase().getBytes());
+            Session session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(),
+                    serverInfo.getSshPort());
+            session.setUserInfo(new SSHUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+            if (authentication.getStrictHostKeyChecking().equals("yes")) {
+                jSch.setKnownHosts(authentication.getKnownHostsFilePath());
+            } else {
+                session.setConfig("StrictHostKeyChecking", "no");
+            }
+            session.connect();
+
+        return session;
+    }
+
+    public String scpTo(String routingKey, String localFile, String remoteFile, SSHServerInfo serverInfo,
+                        SSHKeyAuthentication authentication) throws IOException, JSchException, SSHApiException {
+
+        Session session =  createSSHSession(serverInfo, authentication);
+
+        FileInputStream fis = null;
+        String prefix = null;
+        if (new File(localFile).isDirectory()) {
+            prefix = localFile + File.separator;
+        }
+        boolean ptimestamp = true;
+
+        // exec 'scp -t rfile' remotely
+        String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+        Channel channel = session.openChannel("exec");
+
+        SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+        ((ChannelExec) channel).setCommand(command);
+        ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+        // get I/O streams for remote scp
+        OutputStream out = channel.getOutputStream();
+        InputStream in = channel.getInputStream();
+
+        channel.connect();
+
+        if (checkAck(in) != 0) {
+            String error = "Error Reading input Stream";
+            log.error(error);
+            throw new SSHApiException(error);
+        }
+
+        File _lfile = new File(localFile);
+
+        if (ptimestamp) {
+            command = "T" + (_lfile.lastModified() / 1000) + " 0";
+            // The access time should be sent here,
+            // but it is not accessible with JavaAPI ;-<
+            command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+            out.write(command.getBytes());
+            out.flush();
+            if (checkAck(in) != 0) {
+                String error = "Error Reading input Stream";
+                log.error(error);
+                throw new SSHApiException(error);
+            }
+        }
+
+        // send "C0644 filesize filename", where filename should not include '/'
+        long filesize = _lfile.length();
+        command = "C0644 " + filesize + " ";
+        if (localFile.lastIndexOf('/') > 0) {
+            command += localFile.substring(localFile.lastIndexOf('/') + 1);
+        } else {
+            command += localFile;
+        }
+        command += "\n";
+        out.write(command.getBytes());
+        out.flush();
+        if (checkAck(in) != 0) {
+            String error = "Error Reading input Stream";
+            log.error(error);
+            throw new SSHApiException(error);
+        }
+
+        // send a content of localFile
+        fis = new FileInputStream(localFile);
+        byte[] buf = new byte[1024];
+        while (true) {
+            int len = fis.read(buf, 0, buf.length);
+            if (len <= 0) break;
+            out.write(buf, 0, len); //out.flush();
+        }
+        fis.close();
+        fis = null;
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+        if (checkAck(in) != 0) {
+            String error = "Error Reading input Stream";
+            log.error(error);
+            throw new SSHApiException(error);
+        }
+        out.close();
+        stdOutReader.onOutput(channel);
+
+
+        channel.disconnect();
+        if (stdOutReader.getStdErrorString().contains("scp:")) {
+            throw new SSHApiException(stdOutReader.getStdErrorString());
+        }
+
+        session.disconnect();
+
+        //since remote file is always a file  we just return the file
+        return remoteFile;
+    }
+
+    /**
+     * This method will copy a remote file to a local directory
+     *
+     * @param remoteFile remote file path, this has to be a full qualified path
+     * @param localFile  This is the local file to copy, this can be a directory too
+     * @return returns the final local file path of the new file came from the remote resource
+     */
+    public void scpFrom(String routingKey, String remoteFile, String localFile, SSHServerInfo serverInfo,
+                        SSHKeyAuthentication authentication) throws IOException,
+            JSchException, SSHApiException {
+        Session session = createSSHSession(serverInfo, authentication);
+        FileOutputStream fos = null;
+        try {
+            String prefix = null;
+            if (new File(localFile).isDirectory()) {
+                prefix = localFile + File.separator;
+            }
+
+            SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+            // exec 'scp -f remotefile' remotely
+            String command = "scp -f " + remoteFile;
+            Channel channel = session.openChannel("exec");
+            ((ChannelExec) channel).setCommand(command);
+            ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+            // get I/O streams for remote scp
+            OutputStream out = channel.getOutputStream();
+            InputStream in = channel.getInputStream();
+
+            if (!channel.isClosed()){
+                channel.connect();
+            }
+
+            byte[] buf = new byte[1024];
+
+            // send '\0'
+            buf[0] = 0;
+            out.write(buf, 0, 1);
+            out.flush();
+
+            while (true) {
+                int c = checkAck(in);
+                if (c != 'C') {
+                    break;
+                }
+
+                // read '0644 '
+                in.read(buf, 0, 5);
+
+                long filesize = 0L;
+                while (true) {
+                    if (in.read(buf, 0, 1) < 0) {
+                        // error
+                        break;
+                    }
+                    if (buf[0] == ' ') break;
+                    filesize = filesize * 10L + (long) (buf[0] - '0');
+                }
+
+                String file = null;
+                for (int i = 0; ; i++) {
+                    in.read(buf, i, 1);
+                    if (buf[i] == (byte) 0x0a) {
+                        file = new String(buf, 0, i);
+                        break;
+                    }
+                }
+
+                //System.out.println("filesize="+filesize+", file="+file);
+
+                // send '\0'
+                buf[0] = 0;
+                out.write(buf, 0, 1);
+                out.flush();
+
+                // read a content of lfile
+                fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+                int foo;
+                while (true) {
+                    if (buf.length < filesize) foo = buf.length;
+                    else foo = (int) filesize;
+                    foo = in.read(buf, 0, foo);
+                    if (foo < 0) {
+                        // error
+                        break;
+                    }
+                    fos.write(buf, 0, foo);
+                    filesize -= foo;
+                    if (filesize == 0L) break;
+                }
+                fos.close();
+                fos = null;
+
+                if (checkAck(in) != 0) {
+                    String error = "Error transfering the file content";
+                    log.error(error);
+                    throw new SSHApiException(error);
+                }
+
+                // send '\0'
+                buf[0] = 0;
+                out.write(buf, 0, 1);
+                out.flush();
+            }
+            stdOutReader.onOutput(channel);
+            if (stdOutReader.getStdErrorString().contains("scp:")) {
+                throw new SSHApiException(stdOutReader.getStdErrorString());
+            }
+
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            try {
+                if (fos != null) fos.close();
+                session.disconnect();
+            } catch (Exception ee) {
+            }
+        }
+    }
+
+    public void makeDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication)
+            throws IOException, JSchException, Exception {
+
+        Session session = createSSHSession(serverInfo, authentication);
+
+        // exec 'scp -t rfile' remotely
+        String command = "mkdir -p " + path;
+        Channel channel = session.openChannel("exec");
+        SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+        ((ChannelExec) channel).setCommand(command);
+        ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+        try {
+            channel.connect();
+        } catch (JSchException e) {
+
+            channel.disconnect();
+            log.error("Unable to retrieve command output. Command - " + command +
+                    " on server - " + session.getHost() + ":" + session.getPort() +
+                    " connecting user name - "
+                    + session.getUserName());
+            throw e;
+        }
+        stdOutReader.onOutput(channel);
+        if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+            throw new Exception(stdOutReader.getStdErrorString());
+        }
+
+        channel.disconnect();
+        session.disconnect();
+    }
+
+    public List<String> listDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication)
+            throws IOException, JSchException, Exception {
+
+        Session session = createSSHSession(serverInfo, authentication);
+
+        // exec 'scp -t rfile' remotely
+        String command = "ls " + path;
+        Channel channel = session.openChannel("exec");
+        SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+        ((ChannelExec) channel).setCommand(command);
+        ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+        try {
+            channel.connect();
+        } catch (JSchException e) {
+
+            channel.disconnect();
+
+            throw new Exception("Unable to retrieve command output. Command - " + command +
+                    " on server - " + session.getHost() + ":" + session.getPort() +
+                    " connecting user name - "
+                    + session.getUserName(), e);
+        }
+        if (stdOutReader.getStdErrorString().contains("ls:")) {
+            throw new Exception(stdOutReader.getStdErrorString());
+        }
+        channel.disconnect();
+        session.disconnect();
+        return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+    }
+
+    public SSHCommandOutputReader executeCommand(String routingKey, String command, SSHServerInfo serverInfo,
+                                              SSHKeyAuthentication authentication) throws Exception {
+
+        Session session = createSSHSession(serverInfo, authentication);
+
+        Map<String, String> results = new HashMap<>();
+
+        Channel channel = session.openChannel("exec");
+        SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+        ((ChannelExec) channel).setCommand(command);
+        ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+        try {
+            channel.connect();
+        } catch (JSchException e) {
+
+            channel.disconnect();
+
+            throw new Exception("Unable to retrieve command output. Command - " + command +
+                    " on server - " + session.getHost() + ":" + session.getPort() +
+                    " connecting user name - "
+                    + session.getUserName(), e);
+        }
+        stdOutReader.onOutput(channel);
+        session.disconnect();
+
+        return stdOutReader;
+    }
+
+    public SSHCommandOutputReader executeCommand(String routingKey, String[] commands, SSHServerInfo serverInfo,
+                                              SSHKeyAuthentication authentication) throws Exception {
+        return executeCommand(routingKey, String.join(" && ", commands), serverInfo, authentication);
+    }
+
+    private int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            if (b == 1) { // error
+                System.out.print(sb.toString());
+            }
+            if (b == 2) { // fatal error
+                System.out.print(sb.toString());
+            }
+            log.warn(sb.toString());
+        }
+        return b;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
index 4e72ade..0ea53a7 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
@@ -20,19 +20,26 @@
 */
 package edu.iu.helix.airavata.tasks.ssh;
 
+import edu.iu.helix.airavata.HelixUtil;
+import edu.iu.helix.airavata.ZkUtils;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.UUID;
+
 public class SSHTask implements Task {
     private final static Logger logger = LoggerFactory.getLogger(SSHTask.class);
 
+    public static final String TASK_COMMAND = "SSH_TASK";
     private TaskCallbackContext callbackContext;
+    private String taskId;
 
     public SSHTask(TaskCallbackContext callbackContext){
         this.callbackContext = callbackContext;
+        this.taskId = callbackContext.getTaskConfig().getId();
     }
 
     @Override
@@ -46,7 +53,36 @@ public class SSHTask implements Task {
 // Todo Deserialize the TaskContext from data store.
 //        byte[] output = curator.getData().forPath(path);
 //        List<String> newList = (List<String>)SerializationUtils.deserialize(output);
-        return null;
+
+        System.out.println("Running SSH Task for ID: " + taskId);
+        try {
+            SSHTaskContext taskContext = (SSHTaskContext) ZkUtils.getZkData(ZkUtils.getCuratorClient(), HelixUtil.SSH_WORKFLOW, taskId);
+            String routingKey = UUID.randomUUID().toString();
+            SSHRunner sshExecutor = new SSHRunner();
+
+            System.out.println("Task: " + taskId + ", is of Type: " + taskContext.getTask_type());
+            switch (taskContext.getTask_type()) {
+
+                case EXECUTE_COMMAND:
+                    SSHCommandOutputReader sshOut = sshExecutor.executeCommand(routingKey, taskContext.getCommand(),
+                            (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication());
+                    System.out.println("SSH Command Output: " + sshOut.getStdOutputString());
+                    break;
+
+                case FILE_COPY:
+                    String scpOut = sshExecutor.scpTo(routingKey, taskContext.getSourceFilePath(), taskContext.getDestFilePath(),
+                            (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication());
+                    System.out.println("SCP Command Output: " + scpOut);
+                    break;
+
+                default:
+                    throw new Exception("Unknown SSH Task Type: " + taskContext.getTask_type());
+            }
+        } catch (Exception ex) {
+            System.err.println("Something went wrong for task: " + taskId + ", reason: " + ex);
+            return new TaskResult(TaskResult.Status.FAILED, "SSH command completed!");
+        }
+        return new TaskResult(TaskResult.Status.COMPLETED, "SSH command completed!");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
index 2b2024c..e780331 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
@@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SSHTaskContext {
+import java.io.Serializable;
+
+public class SSHTaskContext implements Serializable {
     private final static Logger logger = LoggerFactory.getLogger(SSHTaskContext.class);
 
     public static enum TASK_TYPE {FILE_COPY, EXECUTE_COMMAND;}
@@ -127,4 +129,17 @@ public class SSHTaskContext {
     public void setCommand(String command) {
         this.command = command;
     }
+
+    @Override
+    public String toString() {
+        return "SSHTaskContext{" +
+                "task_type=" + task_type +
+                ", sshKeyAuthentication=" + sshKeyAuthentication +
+                ", sshUserInfo=" + sshUserInfo +
+                ", serverInfo=" + serverInfo +
+                ", sourceFilePath='" + sourceFilePath + '\'' +
+                ", destFilePath='" + destFilePath + '\'' +
+                ", command='" + command + '\'' +
+                '}';
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
index a0ac81c..df5a575 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
@@ -22,7 +22,9 @@ package edu.iu.helix.airavata.tasks.ssh;
 
 import com.jcraft.jsch.UserInfo;
 
-public class SSHUserInfo implements UserInfo {
+import java.io.Serializable;
+
+public class SSHUserInfo implements UserInfo, Serializable {
 
     private String userName;
     private String password;

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
index f47fc5b..4917e47 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
@@ -20,7 +20,9 @@
 */
 package edu.iu.helix.airavata.tasks.ssh;
 
-public class ServerInfo {
+import java.io.Serializable;
+
+public class ServerInfo implements Serializable {
 
     public static enum ComProtocol {SSH, LOCAL}
 

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/resources/ssh/id_rsa.pub
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/resources/ssh/id_rsa.pub b/helix-playground/src/main/resources/ssh/id_rsa.pub
index c3ac0b0..e378be0 100644
--- a/helix-playground/src/main/resources/ssh/id_rsa.pub
+++ b/helix-playground/src/main/resources/ssh/id_rsa.pub
@@ -1,2 +1 @@
-
 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCr9VP22p0I+2W5o/klPv/OvfTihvcBQwZXKPrSLFF+OB9nVNtIfDDETIwwex7mknn3Kks1jFvEdKMrvRjOFeFInDv3N40LjohHu4v2tiawAON7MOLpz/iX5dWp0wteixlDKfGe7PAEMAk054kLSDiB3em2zBK4d9ApedA5k2JG1dmAsNK0KkbfgFPd5+iXrzgTg4XiefHQoaCSUyS7w6t8645djbYOP+b+SJtgslaf2RqeoBVvrA6YQJE1pUYjcm9yL4KwyqaPo+N/2XZ6xys5+WN8svtL3uRduENU1MQSTpdFq+GLCY4SgLMFgLJKoxHjcjPRfKyE/eYk1gQA7b/Z snakanda@149-161-141-51.dhcp-bl.indiana.edu


Mime
View raw message