airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: fixing test cases
Date Wed, 26 Mar 2014 03:40:02 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 9825ab203 -> dc01f7cec


fixing test cases


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dc01f7ce
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dc01f7ce
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dc01f7ce

Branch: refs/heads/master
Commit: dc01f7cec777f77aeb0e8ce3897df7fe4b96de5a
Parents: 9825ab2
Author: lahiru <lahiru@apache.org>
Authored: Tue Mar 25 23:39:48 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Mar 25 23:39:48 2014 -0400

----------------------------------------------------------------------
 .../airavata/job/monitor/AMQPMonitorTest.java   | 140 +++++++++++++++++++
 .../airavata/job/monitor/QstatMonitorTest.java  | 139 ++++++++++++++++++
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |   2 +
 .../airavata/job/monitor/util/CommonUtils.java  |  84 -----------
 .../airavata/job/monitor/AMQPMonitorTest.java   |  31 ++--
 .../airavata/job/monitor/QstatMonitorTest.java  |  26 ++--
 6 files changed, 306 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
new file mode 100644
index 0000000..107f0dc
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AMQPMonitorTest {
+    private MonitorManager monitorManager;
+
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null)
{
+            System.out.println(">>>>>> Please run tests with my proxy user
name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        AMQPMonitor amqpMonitor = new
+                AMQPMonitor(monitorManager.getMonitorPublisher(),
+                monitorManager.getPushQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
+                Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
+        try {
+            monitorManager.addPushMonitor(amqpMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testAMQPMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+        Cluster pbsCluster = new
+                PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+        System.out.println(jobID);
+        try {
+            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null,
"ogce"));
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+        } catch (InterruptedException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+        }
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
new file mode 100644
index 0000000..68460d4
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class QstatMonitorTest {
+    private MonitorManager monitorManager;
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null)
{
+            System.out.println(">>>>>> Please run tests with my proxy user
name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        QstatMonitor qstatMonitor = new
+                QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
+        try {
+            monitorManager.addPullMonitor(qstatMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("trestles.sdsc.edu");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testQstatMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
+
+
+        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        for (int i = 0; i < 1; i++) {
+            String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+            monitorID.setAuthenticationInfo(authenticationInfo);
+            try {
+                monitorManager.addAJobToMonitor(monitorID);
+            } catch (AiravataMonitorException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+            }
+        }
+        try {
+            Thread.sleep(10000000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 68c2cc9..9af677d 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -87,6 +87,8 @@ public class AMQPMonitor extends PushMonitor {
         this.connectionName = connectionName;
         this.proxyPath = proxyPath;
         this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
     }
 
     public void initialize(String proxyPath, String connectionName, List<String> hosts)
{

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index ec620e1..6d8214d 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -133,88 +133,4 @@ public class CommonUtils {
                 monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
 
     }
-
-    public static void addJobToMonitor(Map<String, UserMonitorData> queue, MonitorID
monitorID) throws AiravataMonitorException {
-        Iterator<UserMonitorData> iterator = queue.values().iterator();
-        while (iterator.hasNext()) {
-            UserMonitorData next = iterator.next();
-            if (next.getUserName().equals(monitorID.getUserName())) {
-                // then this is the right place to update
-                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
-                for (HostMonitorData host : monitorIDs) {
-                    if (host.getHost().equals(monitorID.getHost())) {
-                        // ok we found right place to add this monitorID
-                        host.addMonitorIDForHost(monitorID);
-                        return;
-                    }
-                }
-                // there is a userMonitor object for this user name but no Hosts for this
host
-                // so we have to create new Hosts
-                HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
-                hostMonitorData.addMonitorIDForHost(monitorID);
-                next.addHostMonitorData(hostMonitorData);
-                return;
-            }
-        }
-        HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
-        hostMonitorData.addMonitorIDForHost(monitorID);
-
-        UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
-        userMonitorData.addHostMonitorData(hostMonitorData);
-        queue.put(monitorID.getUserName(), userMonitorData);
-    }
-
-    public static HostMonitorData getHostMonitorData(Map<String, UserMonitorData> queue,
MonitorID monitorID) throws AiravataMonitorException {
-        Iterator<UserMonitorData> iterator = queue.values().iterator();
-        while (iterator.hasNext()) {
-            UserMonitorData next = iterator.next();
-            if (next.getUserName().equals(monitorID.getUserName())) {
-                // then this is the right place to update
-                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
-                for (HostMonitorData host : monitorIDs) {
-                    if (host.getHost().equals(monitorID.getHost())) {
-                        // ok we found right place to add this monitorID
-                        return host;
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    public static void removeJobFromMonitor(BlockingQueue<UserMonitorData> queue,MonitorID
monitorID) throws AiravataMonitorException {
-        Iterator<UserMonitorData> iterator = queue.iterator();
-        while(iterator.hasNext()){
-            UserMonitorData next = iterator.next();
-            if(next.getUserName().equals(monitorID.getUserName())){
-                // then this is the right place to update
-                List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
-                for(HostMonitorData iHostMonitorID:hostMonitorData){
-                    if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
-                        List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
-                        for(MonitorID iMonitorID:monitorIDs){
-                            if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
-                                // OK we found the object, we cannot do list.remove(object)
states of two objects
-                                // could be different, thats why we check the jobID
-                                monitorIDs.remove(iMonitorID);
-                                if(monitorIDs.size()==0) {
-                                    hostMonitorData.remove(iHostMonitorID);
-                                    if (hostMonitorData.size() == 0) {
-                                        // no useful data so we have to remove the element
from the queue
-                                        queue.remove(next);
-                                    }
-                                }
-                                return;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        throw new AiravataMonitorException("Cannot find the given MonitorID in the queue
with userName " +
-                monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
-
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
b/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index 4e35434..3ab8115 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.job.monitor;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@ -28,6 +29,7 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -38,17 +40,19 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class AMQPMonitorTest {
-    private MonitorManager monitorManager;
-
     private String myProxyUserName;
     private String myProxyPassword;
     private String certificateLocation;
-    private String pbsFilePath;
     private String workingDirectory;
     private HostDescription hostDescription;
-
+    private BlockingQueue<MonitorID> amqpQueue;
+    private BlockingQueue<MonitorID> finishQueue;
+    private AMQPMonitor amqpMonitor;
     @Before
     public void setUp() throws Exception {
         System.setProperty("myproxy.user", "ogce");
@@ -65,19 +69,12 @@ public class AMQPMonitorTest {
                     "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
             throw new Exception("Need my proxy user name password to run tests.");
         }
-
-        monitorManager = new MonitorManager();
-        AMQPMonitor amqpMonitor = new
-                AMQPMonitor(monitorManager.getMonitorPublisher(),
-                monitorManager.getPullQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
+        amqpQueue = new LinkedBlockingQueue<MonitorID>();
+        finishQueue = new LinkedBlockingQueue<MonitorID>();
+        amqpMonitor = new
+                AMQPMonitor(new MonitorPublisher(new EventBus()),
+                amqpQueue,finishQueue,"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
                 Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
-        try {
-            monitorManager.addPushMonitor(amqpMonitor);
-            monitorManager.launchMonitor();
-        } catch (AiravataMonitorException e) {
-            e.printStackTrace();
-        }
-
         hostDescription = new HostDescription(GsisshHostType.type);
         hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
         hostDescription.getType().setHostName("gsissh-gordon");
@@ -125,7 +122,7 @@ public class AMQPMonitorTest {
         String jobID = pbsCluster.submitBatchJob(jobDescriptor);
         System.out.println(jobID);
         try {
-            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null,
"ogce"));
+             amqpMonitor.registerListener(new MonitorID(hostDescription,jobID,null,null,myProxyUserName));
         } catch (AiravataMonitorException e) {
             e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc01f7ce/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
b/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
index 126b8ae..bc6c4c5 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.job.monitor;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@ -28,9 +29,10 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,17 +40,17 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 
 public class QstatMonitorTest {
-    private MonitorManager monitorManager;
-
     private String myProxyUserName;
     private String myProxyPassword;
     private String certificateLocation;
     private String pbsFilePath;
     private String workingDirectory;
     private HostDescription hostDescription;
-
+    private BlockingQueue<UserMonitorData> q;
+    private QstatMonitor qstatMonitor;
     @Before
     public void setUp() throws Exception {
         System.setProperty("myproxy.user", "ogce");
@@ -66,19 +68,13 @@ public class QstatMonitorTest {
             throw new Exception("Need my proxy user name password to run tests.");
         }
 
-        monitorManager = new MonitorManager();
-        QstatMonitor qstatMonitor = new
-                QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
-        try {
-            monitorManager.addPullMonitor(qstatMonitor);
-            monitorManager.launchMonitor();
-        } catch (AiravataMonitorException e) {
-            e.printStackTrace();
-        }
+        qstatMonitor = new
+                QstatMonitor(q, new MonitorPublisher(new EventBus()));
 
         hostDescription = new HostDescription(GsisshHostType.type);
         hostDescription.getType().setHostAddress("trestles.sdsc.edu");
         hostDescription.getType().setHostName("gsissh-gordon");
+        qstatMonitor.startPulling();
     }
 
     @Test
@@ -93,7 +89,7 @@ public class QstatMonitorTest {
         ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
 
 
-        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/opt/torque/bin/"));
 
 
         // Execute command
@@ -124,7 +120,7 @@ public class QstatMonitorTest {
             MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
             monitorID.setAuthenticationInfo(authenticationInfo);
             try {
-                monitorManager.addAJobToMonitor(monitorID);
+                CommonUtils.addMonitortoQueue(q,monitorID);
             } catch (AiravataMonitorException e) {
                 e.printStackTrace();
             }


Mime
View raw message