airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: More improvements to amqp monitor
Date Wed, 26 Mar 2014 02:52:34 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 6435ad35b -> 9825ab203


More improvements to amqp monitor


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9825ab20
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9825ab20
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9825ab20

Branch: refs/heads/master
Commit: 9825ab203ac52b931780810fd4983bc8d5e8a61f
Parents: 6435ad3
Author: lahiru <lahiru@apache.org>
Authored: Tue Mar 25 22:52:26 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Mar 25 22:52:26 2014 -0400

----------------------------------------------------------------------
 modules/distribution/airavata-client/pom.xml    |   2 +-
 modules/gfac/gfac-monitor/.pom.xml.swp          | Bin 16384 -> 0 bytes
 .../airavata/job/monitor/MonitorManager.java    |  15 +-
 .../airavata/job/monitor/AMQPMonitorTest.java   | 138 +++++++++++++++++++
 .../airavata/job/monitor/QstatMonitorTest.java  | 136 ++++++++++++++++++
 .../server/OrchestratorServerHandler.java       |   1 -
 .../orchestrator-client-sdks/pom.xml            |   2 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   2 +-
 .../job/monitor/core/MessageParser.java         |   4 +-
 .../airavata/job/monitor/core/PushMonitor.java  |   2 +-
 .../job/monitor/event/MonitorPublisher.java     |   4 +
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 121 +++++++++-------
 .../monitor/impl/push/amqp/BasicConsumer.java   |  30 ++--
 .../impl/push/amqp/JSONMessageParser.java       |   4 +-
 .../airavata/job/monitor/util/CommonUtils.java  |  87 +++++++++++-
 15 files changed, 457 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/distribution/airavata-client/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-client/pom.xml b/modules/distribution/airavata-client/pom.xml
index 90ce9a3..b7bb975 100644
--- a/modules/distribution/airavata-client/pom.xml
+++ b/modules/distribution/airavata-client/pom.xml
@@ -289,7 +289,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
+            <artifactId>gfac-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/gfac/gfac-monitor/.pom.xml.swp
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/.pom.xml.swp b/modules/gfac/gfac-monitor/.pom.xml.swp
deleted file mode 100644
index f65293b..0000000
Binary files a/modules/gfac/gfac-monitor/.pom.xml.swp and /dev/null differ

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 2334048..272053c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -30,7 +30,6 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterWorker;
 import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
 import org.apache.airavata.schemas.gfac.GlobusHostType;
@@ -59,7 +58,7 @@ public class MonitorManager {
 
     private BlockingQueue<UserMonitorData> pullQueue;
 
-    private BlockingQueue<UserMonitorData> pushQueue;
+    private BlockingQueue<MonitorID> pushQueue;
 
     private BlockingQueue<MonitorID> localJobQueue;
 
@@ -77,11 +76,11 @@ public class MonitorManager {
         pullMonitors = new ArrayList<PullMonitor>();
         pushMonitors = new ArrayList<PushMonitor>();
         pullQueue = new LinkedBlockingQueue<UserMonitorData>();
-        pushQueue = new LinkedBlockingQueue<UserMonitorData>();
+        pushQueue = new LinkedBlockingQueue<MonitorID>();
         finishQueue = new LinkedBlockingQueue<MonitorID>();
         localJobQueue = new LinkedBlockingQueue<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
-        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
+        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), getFinishQueue()));
     }
 
     /**
@@ -95,7 +94,6 @@ public class MonitorManager {
         monitor.setFinishQueue(this.getFinishQueue());
         monitor.setRunningQueue(this.getPushQueue());
         addPushMonitor(monitor);
-        registerListener(new UnRegisterWorker(monitor.getAvailableChannels()));
     }
 
 
@@ -167,7 +165,8 @@ public class MonitorManager {
                     || Constants.PULL.equals(host.getMonitorMode())) {
                 CommonUtils.addMonitortoQueue(pullQueue, monitorID);
             } else if (Constants.PUSH.equals(host.getMonitorMode())) {
-                CommonUtils.addMonitortoQueue(pushQueue, monitorID);
+                pushQueue.put(monitorID);
+                finishQueue.put(monitorID);
             }
         } else if(monitorID.getHost().getType() instanceof GlobusHostType){
             logger.error("Monitoring does not support GlubusHostType resources");
@@ -249,11 +248,11 @@ public class MonitorManager {
         this.finishQueue = finishQueue;
     }
 
-    public BlockingQueue<UserMonitorData> getPushQueue() {
+    public BlockingQueue<MonitorID> getPushQueue() {
         return pushQueue;
     }
 
-    public void setPushQueue(BlockingQueue<UserMonitorData> pushQueue) {
+    public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
         this.pushQueue = pushQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
b/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
new file mode 100644
index 0000000..4e35434
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AMQPMonitorTest {
+    private MonitorManager monitorManager;
+
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null)
{
+            System.out.println(">>>>>> Please run tests with my proxy user
name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        AMQPMonitor amqpMonitor = new
+                AMQPMonitor(monitorManager.getMonitorPublisher(),
+                monitorManager.getPullQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
+                Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
+        try {
+            monitorManager.addPushMonitor(amqpMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testAMQPMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu");
+
+
+        Cluster pbsCluster = new
+                PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+        System.out.println(jobID);
+        try {
+            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null,
"ogce"));
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+        }
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
b/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
new file mode 100644
index 0000000..548b9c5
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class QstatMonitorTest {
+    private String myProxyUserName;
+    private String myProxyPassword;
+    private String certificateLocation;
+    private String pbsFilePath;
+    private String workingDirectory;
+    private HostDescription hostDescription;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("myproxy.user", "ogce");
+        System.setProperty("myproxy.password", "");
+        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+        System.setProperty("gsi.working.directory", "/home/ogce");
+        myProxyUserName = System.getProperty("myproxy.user");
+        myProxyPassword = System.getProperty("myproxy.password");
+        workingDirectory = System.getProperty("gsi.working.directory");
+        String pomDirectory = System.getProperty("basedir");
+        certificateLocation = "/Users/lahirugunathilake/Downloads/certificates";
+        if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null)
{
+            System.out.println(">>>>>> Please run tests with my proxy user
name and password. " +
+                    "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+            throw new Exception("Need my proxy user name password to run tests.");
+        }
+
+        monitorManager = new MonitorManager();
+        QstatMonitor qstatMonitor = new
+                QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
+        try {
+            monitorManager.addPullMonitor(qstatMonitor);
+            monitorManager.launchMonitor();
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
+
+        hostDescription = new HostDescription(GsisshHostType.type);
+        hostDescription.getType().setHostAddress("trestles.sdsc.edu");
+        hostDescription.getType().setHostName("gsissh-gordon");
+    }
+
+    @Test
+    public void testQstatMonitor() throws SSHApiException {
+        /* now have to submit a job to some machine and add that job to the queue */
+        //Create authentication
+        GSIAuthenticationInfo authenticationInfo
+                = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+                7512, 17280000, certificateLocation);
+
+        // Server info
+        ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
+
+
+        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+
+
+        // Execute command
+        System.out.println("Target PBS file path: " + workingDirectory);
+        // constructing the job object
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        jobDescriptor.setWorkingDirectory(workingDirectory);
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+        jobDescriptor.setExecutablePath("/bin/echo");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setMailOptions("n");
+        jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+        jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+        jobDescriptor.setNodes(1);
+        jobDescriptor.setProcessesPerNode(1);
+        jobDescriptor.setQueueName("normal");
+        jobDescriptor.setMaxWallTime("60");
+        jobDescriptor.setAcountString("sds128");
+        List<String> inputs = new ArrayList<String>();
+        jobDescriptor.setOwner("ogce");
+        inputs.add("Hello World");
+        jobDescriptor.setInputValues(inputs);
+        //finished construction of job object
+        System.out.println(jobDescriptor.toXML());
+        for (int i = 0; i < 1; i++) {
+            String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+            MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+            monitorID.setAuthenticationInfo(authenticationInfo);
+            try {
+                monitorManager.addAJobToMonitor(monitorID);
+            } catch (AiravataMonitorException e) {
+                e.printStackTrace();
+            }
+        }
+        try {
+            Thread.sleep(10000000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 763549d..e2f25fc 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -115,7 +115,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
                     } else if (monitor instanceof PushMonitor) {
                         if (monitor instanceof AMQPMonitor) {
                             ((AMQPMonitor) monitor).initialize(proxyPath, connectionName,
list);
-                            monitorManager.registerListener(monitor);
                             monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
                         }
                     } else if(monitor instanceof LocalJobMonitor){

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/orchestrator/orchestrator-client-sdks/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-client-sdks/pom.xml b/modules/orchestrator/orchestrator-client-sdks/pom.xml
index d9f2e4e..0888556 100644
--- a/modules/orchestrator/orchestrator-client-sdks/pom.xml
+++ b/modules/orchestrator/orchestrator-client-sdks/pom.xml
@@ -48,7 +48,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
+            <artifactId>gfac-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index e5081d6..cbe10b0 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -72,7 +72,7 @@ the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
+            <artifactId>gfac-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index f44cf65..1a79a17 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -25,6 +25,7 @@ import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
 
 /**
  * This is an interface to implement messageparser, it could be
@@ -40,8 +41,7 @@ public interface MessageParser {
      * we have to makesure the correct message is given to the messageparser
      * parse method, it will not do any filtering
      * @param message content of the message
-     * @param hostMonitorData monitorID object
      * @return
      */
-    JobStatus parseMessage(String message,HostMonitorData hostMonitorData)throws AiravataMonitorException;
+    JobState parseMessage(String message)throws AiravataMonitorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
index 77ae1e7..f172ece 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
@@ -39,7 +39,7 @@ public abstract class PushMonitor extends AiravataAbstractMonitor {
      * @param monitorID
      * @return
      */
-    public abstract boolean registerListener(UserMonitorData monitorID)throws AiravataMonitorException;
+    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
 
     /**
      * This method can be invoked to unregister a listener with the

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index 95b64ab..0f75206 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -41,4 +41,8 @@ public class MonitorPublisher {
     public void publish(JobStatus jobState) {
         eventBus.post(jobState);
     }
+
+    public void publish(MonitorID monitorID){
+        eventBus.post(monitorID);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 8025b15..68c2cc9 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -20,18 +20,22 @@
 */
 package org.apache.airavata.job.monitor.impl.push.amqp;
 
+import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +60,9 @@ public class AMQPMonitor extends PushMonitor {
 
     private MonitorPublisher publisher;
 
-    private BlockingQueue<UserMonitorData> runningQueue;
+    private MonitorPublisher localPublisher;
+
+    private BlockingQueue<MonitorID> runningQueue;
 
     private BlockingQueue<MonitorID> finishQueue;
 
@@ -71,7 +77,7 @@ public class AMQPMonitor extends PushMonitor {
     public AMQPMonitor(){
 
     }
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<UserMonitorData> runningQueue,
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
                        BlockingQueue<MonitorID> finishQueue,
                        String proxyPath,String connectionName,List<String> hosts) {
         this.publisher = publisher;
@@ -88,38 +94,37 @@ public class AMQPMonitor extends PushMonitor {
         this.connectionName = connectionName;
         this.proxyPath = proxyPath;
         this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
     }
 
     @Override
-    public boolean registerListener(UserMonitorData userMonitorData) throws AiravataMonitorException
{
-        List<HostMonitorData> hostNames = userMonitorData.getHostMonitorData();
-        String userName = userMonitorData.getUserName();
-        for (HostMonitorData host : hostNames) {
-            // with amqp monitor we do not use individual monitorID list but
-            // we subscribe to read user-host based subscription
-            String hostAddress = host.getHost().getType().getHostAddress();
-            // in amqp case there are no multiple jobs per each host, because once a job
is put in to the queue it
-            // will be picked by the Monitor, so always new usermonitorData object will get
create
-            MonitorID monitorID = host.getMonitorIDs().get(0);
-            String channelID = CommonUtils.getChannelID(monitorID);
-            try {
-                    //todo need to fix this rather getting it from a file
-                    Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName,
proxyPath);
-                    Channel channel = null;
-                    channel = connection.createChannel();
-                    availableChannels.put(channelID, channel);
-                    String queueName = channel.queueDeclare().getQueue();
-
-                    BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher,
host);
-                    channel.basicConsume(queueName, true, consumer);
-                    String filterString = CommonUtils.getRoutingKey(userName, hostAddress);
-                    // here we queuebind to a particular user in a particular machine
-                    channel.queueBind(queueName, "glue2.computing_activity", filterString);
-                    logger.info("Using filtering string to monitor: " + filterString);
-            } catch (IOException e) {
-                logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
-            }
-
+    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException
{
+        // we subscribe to read user-host based subscription
+        HostDescription host = monitorID.getHost();
+        String hostAddress = host.getType().getHostAddress();
+        // in amqp case there are no multiple jobs per each host, because once a job is put
in to the queue it
+        // will be picked by the Monitor, so always new usermonitorData object will get create
+        String channelID = CommonUtils.getChannelID(monitorID);
+        if(availableChannels.get(channelID) == null){
+        try {
+            //todo need to fix this rather getting it from a file
+            Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName,
proxyPath);
+            Channel channel = null;
+            channel = connection.createChannel();
+            availableChannels.put(channelID, channel);
+            String queueName = channel.queueDeclare().getQueue();
+
+            BasicConsumer consumer = new
+                    BasicConsumer(new JSONMessageParser(), localPublisher);          // here
we use local publisher
+            channel.basicConsume(queueName, true, consumer);
+            String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+            // here we queuebind to a particular user in a particular machine
+            channel.queueBind(queueName, "glue2.computing_activity", filterString);
+            logger.info("Using filtering string to monitor: " + filterString);
+        } catch (IOException e) {
+            logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+        }
         }
         return true;
     }
@@ -129,7 +134,7 @@ public class AMQPMonitor extends PushMonitor {
         startRegister = true; // this will be unset by someone else
         while (startRegister || !ServerSettings.isStopAllThreads()) {
             try {
-                UserMonitorData take = runningQueue.take();
+                MonitorID take = runningQueue.take();
                 this.registerListener(take);
             } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
                 e.printStackTrace();
@@ -150,31 +155,43 @@ public class AMQPMonitor extends PushMonitor {
         }
     }
 
-
-
-
-
-
     @Subscribe
     public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException
{
         String channelID = CommonUtils.getChannelID(monitorID);
-        Channel channel = availableChannels.get(channelID);
-        if (channel == null) {
-            logger.error("Already Unregistered the listener");
-            throw new AiravataMonitorException("Already Unregistered the listener");
-        } else {
-            try {
-                channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity",
CommonUtils.getRoutingKey(monitorID));
-                channel.close();
-                channel.getConnection().close();
-            } catch (IOException e) {
-                logger.error("Error unregistering the listener");
-                throw new AiravataMonitorException("Error unregistering the listener");
+        Iterator<MonitorID> iterator = finishQueue.iterator();
+        MonitorID next = null;
+        while(iterator.hasNext()){
+             next = iterator.next();
+            if(next.getJobID().endsWith(monitorID.getJobID())){
+                break;
             }
         }
+        if(next == null) {
+            logger.error("Job has removed from the queue, old obsolete message recieved");
+            return false;
+        }
+        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus()))
{
+            finishQueue.remove(next);
+            Channel channel = availableChannels.get(channelID);
+            if (channel == null) {
+                logger.error("Already Unregistered the listener");
+                throw new AiravataMonitorException("Already Unregistered the listener");
+            } else {
+                try {
+                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity",
CommonUtils.getRoutingKey(monitorID));
+                    channel.close();
+                    channel.getConnection().close();
+                    availableChannels.remove(channelID);
+                } catch (IOException e) {
+                    logger.error("Error unregistering the listener");
+                    throw new AiravataMonitorException("Error unregistering the listener");
+                }
+            }
+        }
+        next.setStatus(monitorID.getStatus());
+        publisher.publish(new JobStatus(next,next.getStatus()));
         return true;
     }
-
     @Override
     public boolean stopRegister() throws AiravataMonitorException {
         return false;  //To change body of implemented methods use File | Settings | File
Templates.
@@ -196,11 +213,11 @@ public class AMQPMonitor extends PushMonitor {
         this.publisher = publisher;
     }
 
-    public BlockingQueue<UserMonitorData> getRunningQueue() {
+    public BlockingQueue<MonitorID> getRunningQueue() {
         return runningQueue;
     }
 
-    public void setRunningQueue(BlockingQueue<UserMonitorData> runningQueue) {
+    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
         this.runningQueue = runningQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index 9c08399..94fee5b 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -31,24 +31,23 @@ import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 
 public class BasicConsumer implements Consumer {
     private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
 
-    MessageParser parser;
+    private MessageParser parser;
 
-    MonitorPublisher publisher;
+    private MonitorPublisher publisher;
 
-    HostMonitorData hostMonitorData;
-
-    public BasicConsumer(MessageParser parser, MonitorPublisher publisher, HostMonitorData
hostMonitorData) {
+    public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
         this.parser = parser;
         this.publisher = publisher;
-        this.hostMonitorData = hostMonitorData;
     }
 
     public void handleCancel(java.lang.String consumerTag) {
@@ -71,26 +70,15 @@ public class BasicConsumer implements Consumer {
         // Here we parse the message and get the job status and push it
         // to the Event bus, this will be picked by
 //        AiravataJobStatusUpdator and store in to registry
+
         logger.debug("************************************************************");
         logger.debug("AMQP Message recieved \n" + message);
         logger.debug("************************************************************");
         try {
             String jobID = envelope.getRoutingKey().split("\\.")[0];
-            List<MonitorID> monitorIDs = hostMonitorData.getMonitorIDs();
-            MonitorID currentMonitorID = null;
-            for(MonitorID iMonitorID:monitorIDs){
-                if(jobID.equals(iMonitorID.getJobID())){
-                   currentMonitorID = iMonitorID;
-                   break;
-                }
-            }
-            if(currentMonitorID == null) {
-                logger.error("Wrong message came for the Consumer, so skipping this notification");
-                return;
-            }
-            JobStatus jobStatus = parser.parseMessage(message, hostMonitorData);
-            jobStatus.setMonitorID(currentMonitorID);
-            publisher.publish(jobStatus);
+            MonitorID monitorID = new MonitorID(null, jobID, null, null, null);
+            monitorID.setStatus(parser.parseMessage(message));
+            publisher.publish(monitorID);
         } catch (AiravataMonitorException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index 8c6eeb9..1f85fc4 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -38,7 +38,7 @@ import java.util.List;
 public class JSONMessageParser implements MessageParser {
     private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
 
-    public JobStatus parseMessage(String message, HostMonitorData userMonitorData)throws
AiravataMonitorException{
+    public JobState parseMessage(String message)throws AiravataMonitorException{
         /*todo write a json message parser here*/
         logger.debug(message);
         ObjectMapper objectMapper = new ObjectMapper();
@@ -51,7 +51,7 @@ public class JSONMessageParser implements MessageParser {
                 jobState = getStatusFromString(aState);
             }
             // we get the last value of the state array
-            return new JobStatus(null, jobState);
+            return jobState;
         } catch (IOException e) {
             throw new AiravataMonitorException(e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9825ab20/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index 8a7b160..ec620e1 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -29,6 +29,7 @@ import org.apache.airavata.schemas.gfac.GsisshHostType;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
 public class CommonUtils {
@@ -50,7 +51,7 @@ public class CommonUtils {
         }
     }
     public static String getChannelID(MonitorID monitorID) {
-        return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName()
+ "-" + monitorID.getJobID();
+        return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName();
     }
 
     public static String getRoutingKey(MonitorID monitorID) {
@@ -132,4 +133,88 @@ public class CommonUtils {
                 monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
 
     }
+
+    public static void addJobToMonitor(Map<String, UserMonitorData> queue, MonitorID
monitorID) throws AiravataMonitorException {
+        Iterator<UserMonitorData> iterator = queue.values().iterator();
+        while (iterator.hasNext()) {
+            UserMonitorData next = iterator.next();
+            if (next.getUserName().equals(monitorID.getUserName())) {
+                // then this is the right place to update
+                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+                for (HostMonitorData host : monitorIDs) {
+                    if (host.getHost().equals(monitorID.getHost())) {
+                        // ok we found right place to add this monitorID
+                        host.addMonitorIDForHost(monitorID);
+                        return;
+                    }
+                }
+                // there is a userMonitor object for this user name but no Hosts for this
host
+                // so we have to create new Hosts
+                HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+                hostMonitorData.addMonitorIDForHost(monitorID);
+                next.addHostMonitorData(hostMonitorData);
+                return;
+            }
+        }
+        HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+        hostMonitorData.addMonitorIDForHost(monitorID);
+
+        UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+        userMonitorData.addHostMonitorData(hostMonitorData);
+        queue.put(monitorID.getUserName(), userMonitorData);
+    }
+
+    public static HostMonitorData getHostMonitorData(Map<String, UserMonitorData> queue,
MonitorID monitorID) throws AiravataMonitorException {
+        Iterator<UserMonitorData> iterator = queue.values().iterator();
+        while (iterator.hasNext()) {
+            UserMonitorData next = iterator.next();
+            if (next.getUserName().equals(monitorID.getUserName())) {
+                // then this is the right place to update
+                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+                for (HostMonitorData host : monitorIDs) {
+                    if (host.getHost().equals(monitorID.getHost())) {
+                        // ok we found right place to add this monitorID
+                        return host;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public static void removeJobFromMonitor(BlockingQueue<UserMonitorData> queue,MonitorID
monitorID) throws AiravataMonitorException {
+        Iterator<UserMonitorData> iterator = queue.iterator();
+        while(iterator.hasNext()){
+            UserMonitorData next = iterator.next();
+            if(next.getUserName().equals(monitorID.getUserName())){
+                // then this is the right place to update
+                List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+                for(HostMonitorData iHostMonitorID:hostMonitorData){
+                    if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+                        List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+                        for(MonitorID iMonitorID:monitorIDs){
+                            if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
+                                // OK we found the object, we cannot do list.remove(object)
states of two objects
+                                // could be different, thats why we check the jobID
+                                monitorIDs.remove(iMonitorID);
+                                if(monitorIDs.size()==0) {
+                                    hostMonitorData.remove(iHostMonitorID);
+                                    if (hostMonitorData.size() == 0) {
+                                        // no useful data so we have to remove the element
from the queue
+                                        queue.remove(next);
+                                    }
+                                }
+                                return;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        throw new AiravataMonitorException("Cannot find the given MonitorID in the queue
with userName " +
+                monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
+
+    }
+
+
 }


Mime
View raw message