airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: Committing basic Qstat implementation: AIRAVATA-1023
Date Mon, 24 Feb 2014 19:16:53 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 5c7fa2df3 -> 81faafb91


Committing basic  Qstat implementation: AIRAVATA-1023


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34d6c266
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34d6c266
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34d6c266

Branch: refs/heads/master
Commit: 34d6c266d03a6da99277b547c8c6a086288ba2a6
Parents: 197daf1
Author: lahiru <lahiru@apache.org>
Authored: Mon Feb 24 14:14:32 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Mon Feb 24 14:14:32 2014 -0500

----------------------------------------------------------------------
 modules/airavata-job-monitor/pom.xml            |   7 +-
 .../job/monitor/AiravataJobStatusUpdator.java   |  36 ++-
 .../apache/airavata/job/monitor/MonitorID.java  |  39 +++
 .../airavata/job/monitor/MonitorManager.java    |  71 +++--
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 114 +++++++-
 .../impl/pull/qstat/ResourceConnection.java     | 279 +++++++++++++++++++
 .../airavata/job/monitor/state/JobStatus.java   |   3 +
 .../airavata/job/monitor/AMQPMonitorTest.java   | 149 ++++++++++
 .../airavata/job/monitor/QstatMonitorTest.java  | 159 +++++++++++
 .../airavata/job/monitor/SimpleMonitorTest.java | 135 ---------
 10 files changed, 813 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/pom.xml b/modules/airavata-job-monitor/pom.xml
index 4476a5d..d42e25e 100644
--- a/modules/airavata-job-monitor/pom.xml
+++ b/modules/airavata-job-monitor/pom.xml
@@ -48,14 +48,13 @@
         </dependency>
         <dependency>
             <groupId>org.bouncycastle</groupId>
-            <artifactId>bcprov-jdk15on</artifactId>
-            <version>1.48</version>
+            <artifactId>bcprov-jdk16</artifactId>
         </dependency>
-        <dependency>
+        <!--dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcpkix-jdk15on</artifactId>
             <version>1.48</version>
-        </dependency>
+        </dependency-->
         <!-- Logging -->
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index c94c8f8..db76e1d 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -67,10 +67,38 @@ public class AiravataJobStatusUpdator{
         System.out.println("Job ID: " + jobStatus.getMonitorID().getJobID());
         System.out.println("Username: " + jobStatus.getMonitorID().getUserName());
         System.out.println("Job Status: " + jobStatus.getState().toString());
-        if (JobState.COMPLETE.equals(jobStatus.getState())) {
-            // When job is done we remove the job from the queue
-            logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is DONE");
-            jobsToMonitor.add(jobStatus.getMonitorID());
+
+
+        switch (state) {
+            case COMPLETE:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is DONE");
+                jobsToMonitor.remove(jobStatus.getMonitorID());
+                break;
+            case UNKNOWN:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is UNKNOWN");
+                System.out.println("Unknown job status came, if the old job status is RUNNING or something active, we have to make it complete");
+                //todo implement this logic
+                break;
+            case QUEUED:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is QUEUED");
+
+            case SUBMITTED:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is SUBMITTED");
+            case ACTIVE:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is ACTIVE");
+                break;
+            case CANCELED:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is CANCELED");
+                break;
+            case FAILED:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is FAILED");
+                break;
+            case HELD:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is HELD");
+                break;
+            case SUSPENDED:
+                logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + "is SUSPENDED");
+                break;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 302faa1..eae078a 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -21,10 +21,12 @@
 package org.apache.airavata.job.monitor;
 
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
+import java.util.Date;
 
 /*
 This is the object which contains the data to identify a particular
@@ -37,17 +39,30 @@ public class MonitorID {
 
     private String jobID;
 
+    private Timestamp jobStartedTime;
+
     private Timestamp lastMonitored;
 
     private HostDescription host;
 
+    private int port = 22;
+
+    private AuthenticationInfo authenticationInfo = null;
 
     public MonitorID(HostDescription host, String jobID, String userName) {
         this.host = host;
         this.jobID = jobID;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.userName = userName;
     }
 
+    public MonitorID(HostDescription host, String jobID, String userName,AuthenticationInfo authenticationInfo) {
+        this.host = host;
+        this.jobID = jobID;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.authenticationInfo = authenticationInfo;
+        this.userName = userName;
+    }
     public HostDescription getHost() {
         return host;
     }
@@ -79,4 +94,28 @@ public class MonitorID {
     public void setJobID(String jobID) {
         this.jobID = jobID;
     }
+
+    public Timestamp getJobStartedTime() {
+        return jobStartedTime;
+    }
+
+    public void setJobStartedTime(Timestamp jobStartedTime) {
+        this.jobStartedTime = jobStartedTime;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 6cc5566..77fecc8 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -25,6 +25,7 @@ import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
@@ -64,13 +65,33 @@ public class MonitorManager {
         runningQueue = new LinkedBlockingDeque<MonitorID>();
         finishQueue = new LinkedBlockingDeque<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
-        monitorPublisher.registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
+        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
     }
 
+    /**
+     * To deal with the statuses users can write their own listener and implement their own logic
+     *
+     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+     */
+    public void registerListener(Object listener) {
+        monitorPublisher.registerListener(listener);
+    }
+
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
     public void addPushMonitor(PushMonitor monitor) {
         pushMonitors.add(monitor);
     }
 
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
     public void addPullMonitor(PullMonitor monitor) {
         pullMonitors.add(monitor);
     }
@@ -78,6 +99,7 @@ public class MonitorManager {
     /**
      * Adding this method will trigger the thread in launchMonitor and notify it
      * This is going to be useful during the startup of the launching process
+     *
      * @param monitorID
      */
     public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
@@ -96,38 +118,29 @@ public class MonitorManager {
      * monitoring will be launched.
      * Ex: If there's a reasource which doesn't support Push, we have
      * to live with Pull MOnitoring.
+     *
      * @throws AiravataMonitorException
      */
     public void launchMonitor() throws AiravataMonitorException {
-        new Thread(){
-            public void run() {
-                if (pushMonitors.isEmpty()) {
-                    if (pullMonitors.isEmpty()) {
-                        logger.error("Before launching MonitorManager should have atleast one Monitor");
-                        return;
-                    } else {
-                        //no push monitor is configured so we launch pull monitor
-                        PullMonitor pullMonitor = pullMonitors.get(0);
-                        try {
-                            pullMonitor.startPulling();
-                        } catch (AiravataMonitorException e) {
-                            logger.error(e.getLocalizedMessage());
-                        }
-                    }
-                } else {
-                    // there is a push monitor configured, so we schedule the push monitor
-                    // We currently support dealing with one type of monitor
-                    PushMonitor pushMonitor = pushMonitors.get(0);
-                    if(pushMonitor instanceof AMQPMonitor){
-                        ((AMQPMonitor) pushMonitor).run();
-                    }
-                    UnRegisterThread unRegisterThread = new
-                            UnRegisterThread(((AMQPMonitor) pushMonitor).getFinishQueue(),((AMQPMonitor) pushMonitor).getAvailableChannels());
-                    unRegisterThread.run();
-
-                }
+        if (pushMonitors.isEmpty()) {
+            if (pullMonitors.isEmpty()) {
+                logger.error("Before launching MonitorManager should have atleast one Monitor");
+                return;
+            } else {
+                //no push monitor is configured so we launch pull monitor
+                QstatMonitor pullMonitor = (QstatMonitor)pullMonitors.get(0);
+                (new Thread(pullMonitor)).start();
             }
-        }.start();
+        } else {
+            // there is a push monitor configured, so we schedule the push monitor
+            // We currently support dealing with one type of monitor
+            AMQPMonitor pushMonitor = (AMQPMonitor) pushMonitors.get(0);
+            (new Thread(pushMonitor)).start();
+
+            UnRegisterThread unRegisterThread = new
+                    UnRegisterThread(pushMonitor.getFinishQueue(), pushMonitor.getAvailableChannels());
+            unRegisterThread.start();
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index 30f6899..a5d1553 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,48 +20,148 @@
 */
 package org.apache.airavata.job.monitor.impl.pull.qstat;
 
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PullMonitor;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.experiment.JobState;
+import org.apache.derby.client.am.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLEngineResult;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
 /**
  * This monitor is based on qstat command which can be run
  * in grid resources and retrieve the job status.
  */
-public class QstatMonitor extends PullMonitor implements Runnable{
-   private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
+public class QstatMonitor extends PullMonitor implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
 
     // I think this should use DelayedBlocking Queue to do the monitoring*/
-   private BlockingQueue<MonitorID> queue;
+    private BlockingQueue<MonitorID> queue;
 
-    public QstatMonitor(BlockingQueue<MonitorID> queue) {
+    private boolean startPulling = false;
+
+    private Map<String, ResourceConnection> connections;
+
+    private MonitorPublisher publisher;
+
+    public QstatMonitor(BlockingQueue<MonitorID> queue, MonitorPublisher publisher) {
         this.queue = queue;
+        this.publisher = publisher;
+        connections = new HashMap<String, ResourceConnection>();
     }
 
     public void run() {
         /* implement a logic to pick each monitorID object from the queue and do the
         monitoring
          */
+        this.startPulling = true;
+        while (this.startPulling) {
+            try {
+                startPulling();
+                // After finishing one iteration of the full queue this thread sleeps 1 second
+                Thread.sleep(1000);
+            } catch (AiravataMonitorException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            } catch (InterruptedException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
     }
 
     /**
      * This method will can invoke when PullMonitor needs to start
      * and it has to invoke in the frequency specified below,
+     *
      * @return if the start process is successful return true else false
      */
-    public boolean startPulling(){
+    public boolean startPulling() throws AiravataMonitorException {
+        // take the top element in the queue and pull the data and put that element
+        // at the tail of the queue
+        MonitorID take = null;
+        JobStatus jobStatus = new JobStatus();
+        while (!this.queue.isEmpty()) {
+            try {
+                take = this.queue.take();
+                long monitorDiff = 0;
+                long startedDiff = 0;
+                if (take.getLastMonitored() != null) {
+                    monitorDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getLastMonitored().getTime();
+                    startedDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getJobStartedTime().getTime();
+//                    System.out.println(monitorDiff + "-" + startedDiff);
+                    if ((monitorDiff / 1000) < 5) {
+                        // its too early to monitor this job, so we put it at the tail of the queue
+                        this.queue.put(take);
+                    }
+                }
+                if(take.getLastMonitored() == null || ((monitorDiff/1000) >= 5)){
+                        String hostName = take.getHost().getType().getHostAddress();
+                        ResourceConnection connection = null;
+                        if (connections.containsKey(hostName)) {
+                            logger.debug("We already have this connection so not going to create one");
+                            connection = connections.get(hostName);
+                        } else {
+                            connection = new ResourceConnection(take, "/opt/torque/bin");
+                        }
+                        jobStatus.setMonitorID(take);
+                        jobStatus.setState(connection.getJobStatus(take));
+                        publisher.publish(jobStatus);
+                        // if the job is completed we do not have to put the job to the queue again
+                        if (!jobStatus.getState().equals(JobState.COMPLETE)) {
+                            take.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            this.queue.put(take);
+                        }
+                    }
+            } catch (InterruptedException e) {
+                if(!this.queue.contains(take)){
+                    try {
+                        this.queue.put(take);
+                    } catch (InterruptedException e1) {
+                        e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+                logger.error("Error handling the job with Job ID:" + take.getJobID());
+                throw new AiravataMonitorException(e);
+            } catch (SSHApiException e) {
+                if(e.getMessage().contains("Unknown Job Id Error")){
+                    // in this case job is finished or may be the given job ID is wrong
+                    jobStatus.setState(JobState.UNKNOWN);
+                    publisher.publish(jobStatus);
+                }else if(!this.queue.contains(take)){   // we put the job back to the queue only if its state is not unknown
+                    try {
+                        this.queue.put(take);
+                    } catch (InterruptedException e1) {
+                        e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+                logger.error("Error retrieving the job status");
+                throw new AiravataMonitorException("Error retrieving the job status", e);
+            }
+        }
+
+
         return true;
     }
 
+
     /**
      * This is the method to stop the polling process
+     *
      * @return if the stopping process is successful return true else false
      */
-    public boolean stopPulling(){
-         return true;
+    public boolean stopPulling() {
+        this.startPulling = false;
+        return true;
     }
 
     public boolean authenticate() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
new file mode 100644
index 0000000..2867f74
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
@@ -0,0 +1,279 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.pull.qstat;
+
+import com.jcraft.jsch.*;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.authentication.*;
+import org.apache.airavata.gsi.ssh.config.ConfigReader;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.gsi.ssh.jsch.ExtendedJSch;
+import org.apache.airavata.gsi.ssh.util.SSHAPIUIKeyboardInteractive;
+import org.apache.airavata.gsi.ssh.util.SSHKeyPasswordHandler;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ResourceConnection {
+    static {
+        JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gsi.ssh.GSSContextX509");
+        JSch.setConfig("userauth.gssapi-with-mic", "com.jcraft.jsch.UserAuthGSSAPIWithMICGSSCredentials");
+
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
+    public static final String X509_CERT_DIR = "X509_CERT_DIR";
+    public static final String SSH_SESSION_TIMEOUT = "ssh.session.timeout";
+
+    private Session session;
+
+    private ConfigReader configReader;
+
+    private String installedPath;
+
+    public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
+        AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
+        String hostAddress = monitorID.getHost().getType().getHostAddress();
+        String userName = monitorID.getUserName();
+        int port = monitorID.getPort();
+        if (authenticationInfo instanceof GSIAuthenticationInfo) {
+            System.setProperty(X509_CERT_DIR, (String) ((GSIAuthenticationInfo) authenticationInfo).getProperties().
+                    get("X509_CERT_DIR"));
+        }
+        if (installedPath == null) {
+            throw new SSHApiException("Installed path cannot be null !!");
+        }
+        if (installedPath.endsWith("/")) {
+            this.installedPath = installedPath;
+        } else {
+            this.installedPath = installedPath + "/";
+        }
+
+
+        try {
+            this.configReader = new ConfigReader();
+        } catch (IOException e) {
+            throw new SSHApiException("Unable to load system configurations.", e);
+        }
+        JSch jSch = new ExtendedJSch();
+
+        log.debug("Connecting to server - " + monitorID.getHost().getType().getHostName() + ":" + "22" + " with user name - "
+                + userName);
+
+        try {
+            session = jSch.getSession(userName, hostAddress, 22);
+            session.setTimeout(Integer.parseInt(configReader.getConfiguration(SSH_SESSION_TIMEOUT)));
+        } catch (Exception e) {
+            throw new SSHApiException("An exception occurred while creating SSH session." +
+                    "Connecting server - " + hostAddress + ":" + 22 +
+                    " connecting user name - "
+                    + userName, e);
+        }
+
+        java.util.Properties config = this.configReader.getProperties();
+        session.setConfig(config);
+
+
+        //=============================================================
+        // Handling vanilla SSH pieces
+        //=============================================================
+        if (authenticationInfo instanceof SSHPasswordAuthentication) {
+            String password = ((SSHPasswordAuthentication) authenticationInfo).
+                    getPassword(userName, hostAddress);
+
+            session.setUserInfo(new SSHAPIUIKeyboardInteractive(password));
+
+            // TODO figure out why we need to set password to session
+            session.setPassword(password);
+
+        } else if (authenticationInfo instanceof SSHPublicKeyFileAuthentication) {
+            SSHPublicKeyFileAuthentication sshPublicKeyFileAuthentication
+                    = (SSHPublicKeyFileAuthentication) authenticationInfo;
+
+            String privateKeyFile = sshPublicKeyFileAuthentication.
+                    getPrivateKeyFile(userName, hostAddress);
+
+            log.debug("The private key file for vanilla SSH " + privateKeyFile);
+
+            String publicKeyFile = sshPublicKeyFileAuthentication.
+                    getPrivateKeyFile(userName, hostAddress);
+
+            log.debug("The public key file for vanilla SSH " + publicKeyFile);
+
+            Identity identityFile;
+
+            try {
+                identityFile = GSISSHIdentityFile.newInstance(privateKeyFile, null, jSch);
+            } catch (JSchException e) {
+                throw new SSHApiException("An exception occurred while initializing keys using files. " +
+                        "(private key and public key)." +
+                        "Connecting server - " + hostAddress + ":" + port +
+                        " connecting user name - "
+                        + userName + " private key file - " + privateKeyFile + ", public key file - " +
+                        publicKeyFile, e);
+            }
+
+            // Add identity to identity repository
+            GSISSHIdentityRepository identityRepository = new GSISSHIdentityRepository(jSch);
+            identityRepository.add(identityFile);
+
+            // Set repository to session
+            session.setIdentityRepository(identityRepository);
+
+            // Set the user info
+            SSHKeyPasswordHandler sshKeyPasswordHandler
+                    = new SSHKeyPasswordHandler((SSHKeyAuthentication) authenticationInfo);
+
+            session.setUserInfo(sshKeyPasswordHandler);
+
+        } else if (authenticationInfo instanceof SSHPublicKeyAuthentication) {
+
+            SSHPublicKeyAuthentication sshPublicKeyAuthentication
+                    = (SSHPublicKeyAuthentication) authenticationInfo;
+
+            Identity identityFile;
+
+            try {
+                String name = userName + "_" + hostAddress;
+                identityFile = GSISSHIdentityFile.newInstance(name,
+                        sshPublicKeyAuthentication.getPrivateKey(userName, hostAddress),
+                        sshPublicKeyAuthentication.getPublicKey(userName, hostAddress), jSch);
+            } catch (JSchException e) {
+                throw new SSHApiException("An exception occurred while initializing keys using byte arrays. " +
+                        "(private key and public key)." +
+                        "Connecting server - " + hostAddress + ":" + port +
+                        " connecting user name - "
+                        + userName, e);
+            }
+
+            // Add identity to identity repository
+            GSISSHIdentityRepository identityRepository = new GSISSHIdentityRepository(jSch);
+            identityRepository.add(identityFile);
+
+            // Set repository to session                                                                            j
+            session.setIdentityRepository(identityRepository);
+
+            // Set the user info
+            SSHKeyPasswordHandler sshKeyPasswordHandler
+                    = new SSHKeyPasswordHandler((SSHKeyAuthentication) authenticationInfo);
+
+            session.setUserInfo(sshKeyPasswordHandler);
+
+        }
+
+        // Not a good way, but we dont have any choice
+        if (session instanceof ExtendedSession) {
+            if (authenticationInfo instanceof GSIAuthenticationInfo) {
+                ((ExtendedSession) session).setAuthenticationInfo((GSIAuthenticationInfo) authenticationInfo);
+            }
+        }
+
+        try {
+            session.connect();
+        } catch (JSchException e) {
+            throw new SSHApiException("An exception occurred while connecting to server." +
+                    "Connecting server - " + hostAddress + ":" + port +
+                    " connecting user name - "
+                    + userName, e);
+        }
+        System.out.println(session.isConnected());
+    }
+
+    public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
+        String jobID = monitorID.getJobID();
+        RawCommandInfo rawCommandInfo = new RawCommandInfo(this.installedPath + "qstat -f " + jobID);
+
+        StandardOutReader stdOutReader = new StandardOutReader();
+        CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+
+
+        String result = getOutputifAvailable(stdOutReader, "Error getting job status with job ID: " + jobID);
+        String[] info = result.split("\n");
+        String[] line = null;
+        for (String anInfo : info) {
+            if (anInfo.contains("=")) {
+                line = anInfo.split("=", 2);
+                if (line.length != 0) {
+                    if (line[0].contains("job_state")) {
+                       return getStatusFromString(line[1].replaceAll(" ", ""));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    private JobState getStatusFromString(String status) {
+        if(status != null){
+            if("C".equals(status)){
+                return JobState.COMPLETE;
+            }else if("E".equals(status)){
+                return JobState.COMPLETE;
+            }else if("H".equals(status)){
+                return JobState.HELD;
+            }else if("Q".equals(status)){
+                return JobState.QUEUED;
+            }else if("R".equals(status)){
+                return JobState.ACTIVE;
+            }else if ("T".equals(status)) {
+                return JobState.HELD;
+            } else if ("W".equals(status)) {
+                return JobState.QUEUED;
+            } else if ("S".equals(status)) {
+                return JobState.SUSPENDED;
+            }
+        }
+        return null;
+    }
+    public Session getSession() {
+        return session;
+    }
+
+    public void setSession(Session session) {
+        this.session = session;
+    }
+
+    /**
+     * This method will read standard output and if there's any it will be parsed
+     *
+     * @param jobIDReaderCommandOutput
+     * @param errorMsg
+     * @return
+     * @throws SSHApiException
+     */
+    private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+        String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+        String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+        if (stdOutputString == null && "".equals(stdOutputString) ||
+                ((stdErrorString != null) && !("".equals(stdErrorString)))) {
+            log.error("Standard Error output : " + stdErrorString);
+            throw new SSHApiException(errorMsg + stdErrorString);
+        }
+        return stdOutputString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
index 7cf2e38..56b2798 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.state;
 
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.model.experiment.JobState;
 
 /**
  * This is the primary job state object used in
@@ -49,4 +50,6 @@ public class JobStatus {
     public void setMonitorID(MonitorID monitorID) {
         this.monitorID = monitorID;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
new file mode 100644
index 0000000..50b6c38
--- /dev/null
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AMQPMonitorTest {
+    private MonitorManager monitorManager;
+
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) {
+            System.out.println(">>>>>> Please run tests with my proxy user name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        AMQPMonitor amqpMonitor = new
+                AMQPMonitor(monitorManager.getMonitorPublisher(),
+                monitorManager.getRunningQueue(), monitorManager.getFinishQueue());
+        try {
+            monitorManager.addPushMonitor(amqpMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testAMQPMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+
+        Thread test = new TestThread(monitorManager);
+        test.start();
+        try {
+            test.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private class TestThread extends Thread {
+        private MonitorManager manager;
+
+        public TestThread(MonitorManager manager) {
+            this.manager = manager;
+        }
+
+        @Override
+        public void run() {
+            try {
+                monitorManager.addAJobToMonitor(new MonitorID(hostDescription, "gordon.sdsc.xsede.org", "ogce"));
+            } catch (AiravataMonitorException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
new file mode 100644
index 0000000..8985ecd
--- /dev/null
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class QstatMonitorTest {
+    private MonitorManager monitorManager;
+
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) {
+            System.out.println(">>>>>> Please run tests with my proxy user name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        QstatMonitor qstatMonitor = new
+                QstatMonitor(monitorManager.getRunningQueue(), monitorManager.getMonitorPublisher());
+        try {
+            monitorManager.addPullMonitor(qstatMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("trestles.sdsc.edu");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testAMQPMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
+
+
+        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+
+        Thread test = new TestThread(monitorManager,jobID);
+        test.start();
+        try {
+            Thread.sleep(1000000);
+            test.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private class TestThread extends Thread {
+        private MonitorManager manager;
+
+        private String jobID;
+
+        public TestThread(MonitorManager manager,String jobID) {
+            this.manager = manager;
+            this.jobID = jobID;
+        }
+
+        @Override
+        public void run() {
+            try {
+                MonitorID monitorID = new MonitorID(hostDescription, jobID, "ogce");
+                GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+                monitorID.setAuthenticationInfo(authenticationInfo);
+
+                monitorManager.addAJobToMonitor(monitorID);
+            } catch (AiravataMonitorException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/34d6c266/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/SimpleMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/SimpleMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/SimpleMonitorTest.java
deleted file mode 100644
index 5257cbe..0000000
--- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/SimpleMonitorTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SimpleMonitorTest {
-    private MonitorManager monitorManager;
-
-    private String myProxyUserName;
-    private String myProxyPassword;
-    private String certificateLocation;
-    private String pbsFilePath;
-    private String workingDirectory;
-    private HostDescription hostDescription;
-
-    @Before
-    public void setUp() throws Exception {
-        System.setProperty("myproxy.user", "ogce");
-        System.setProperty("myproxy.password", "");
-        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
-        System.setProperty("gsi.working.directory", "/home/ogce");
-        myProxyUserName = System.getProperty("myproxy.user");
-        myProxyPassword = System.getProperty("myproxy.password");
-        workingDirectory = System.getProperty("gsi.working.directory");
-        String pomDirectory = System.getProperty("basedir");
-        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
-        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) {
-            System.out.println(">>>>>> Please run tests with my proxy user name and password. " +
-                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
-            throw new Exception("Need my proxy user name password to run tests.");
-        }
-
-        monitorManager = new MonitorManager();
-        AMQPMonitor amqpMonitor = new
-                AMQPMonitor(monitorManager.getMonitorPublisher(),
-                monitorManager.getRunningQueue(), monitorManager.getFinishQueue());
-        try {
-            monitorManager.addPushMonitor(amqpMonitor);
-            monitorManager.launchMonitor();
-        } catch (AiravataMonitorException e) {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
-
-        hostDescription = new HostDescription(GsisshHostType.type);
-        hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
-        hostDescription.getType().setHostName("gsissh-gordon");
-    }
-
-    @Test
-    public void testAMQPMonitor() throws SSHApiException {
-        /* now have to submit a job to some machine and add that job to the queue */
-//Create authentication
-      /*  GSIAuthenticationInfo authenticationInfo
-                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
-                7512, 17280000, certificateLocation);
-//
-//        // Server info
-        ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
-//
-//
-        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, "/opt/torque/bin/");
-//
-//
-//        // Execute command
-        System.out.println("Target PBS file path: " + workingDirectory);
-//        // constructing the job object
-        JobDescriptor jobDescriptor = new JobDescriptor();
-        jobDescriptor.setWorkingDirectory(workingDirectory);
-        jobDescriptor.setShellName("/bin/bash");
-        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
-        jobDescriptor.setExecutablePath("/bin/echo");
-        jobDescriptor.setAllEnvExport(true);
-        jobDescriptor.setMailOptions("n");
-        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
-        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
-        jobDescriptor.setNodes(1);
-        jobDescriptor.setProcessesPerNode(1);
-        jobDescriptor.setQueueName("normal");
-        jobDescriptor.setMaxWallTime("60");
-        jobDescriptor.setAcountString("sds128");
-        List<String> inputs = new ArrayList<String>();
-        jobDescriptor.setOwner("ogce");
-        inputs.add("Hello World");
-        jobDescriptor.setInputValues(inputs);
-//        //finished construction of job object
-        System.out.println(jobDescriptor.toXML());
-//        String jobID = pbsCluster.submitBatchJob(jobDescriptor);   */
-
-        try {
-            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, "gordon.sdsc.xsede.org", "ogce"));
-        } catch (AiravataMonitorException e) {
-            e.printStackTrace();
-        }
-        try {
-            Thread.sleep(100000000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-}


Mime
View raw message