airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AIRAVATA-1088 - fixing
Date Tue, 25 Mar 2014 18:53:20 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 8d9a5cab4 -> f60565b39


https://issues.apache.org/jira/browse/AIRAVATA-1088 - fixing


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f60565b3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f60565b3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f60565b3

Branch: refs/heads/master
Commit: f60565b393c62e50d9c8d2f46641cf3f4e7b7377
Parents: 8d9a5ca
Author: lahiru <lahiru@apache.org>
Authored: Tue Mar 25 14:53:03 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Mar 25 14:53:03 2014 -0400

----------------------------------------------------------------------
 modules/distribution/airavata-server/pom.xml    |   7 +-
 .../src/main/assembly/bin-assembly.xml          |   3 +-
 modules/gfac/gfac-monitor/.pom.xml.swp          | Bin 0 -> 16384 bytes
 modules/gfac/gfac-monitor/pom.xml               |  95 +++++++
 .../job/monitor/AiravataJobStatusUpdator.java   | 131 +++++++++
 .../airavata/job/monitor/MonitorManager.java    | 266 +++++++++++++++++++
 .../src/main/resources/PBSTemplate.xslt         |  77 ++++++
 .../src/main/resources/gsissh.properties        |  26 ++
 .../src/test/resources/gsissh.properties        |  26 ++
 .../src/test/resources/monitor.properties       |   3 +
 modules/gfac/pom.xml                            |   1 +
 .../airavata-orchestrator-service/pom.xml       |   7 +-
 tools/job-monitor/pom.xml                       |  30 +--
 .../job/monitor/AiravataJobStatusUpdator.java   | 131 ---------
 .../airavata/job/monitor/MonitorManager.java    | 266 -------------------
 .../monitor/impl/pull/qstat/QstatMonitor.java   |   2 +-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |   2 -
 17 files changed, 647 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/distribution/airavata-server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/pom.xml b/modules/distribution/airavata-server/pom.xml
index 16aa30c..d7a1644 100644
--- a/modules/distribution/airavata-server/pom.xml
+++ b/modules/distribution/airavata-server/pom.xml
@@ -419,7 +419,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
+            <artifactId>job-monitor-tool</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>gfac-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/distribution/airavata-server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/assembly/bin-assembly.xml b/modules/distribution/airavata-server/src/main/assembly/bin-assembly.xml
index f4a335b..06ef99b 100644
--- a/modules/distribution/airavata-server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/airavata-server/src/main/assembly/bin-assembly.xml
@@ -225,7 +225,8 @@
                 <include>org.apache.airavata:airavata-workflow-tracking:jar</include>
                 <include>org.apache.airavata:gsissh:jar</include>
                 <include>org.apache.airavata:airavata-model-utils:jar</include>
-                <include>org.apache.airavata:airavata-job-monitor:jar</include>
+                <include>org.apache.airavata:job-monitor-tool:jar</include>
+                <include>org.apache.airavata:gfac-monitor:jar</include>
                 <include>org.apache.airavata:airavata-api-server:jar</include>
                 <include>org.apache.airavata:airavata-api-stubs:jar</include>
                 <include>org.apache.openjpa:openjpa-all:jar</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/.pom.xml.swp
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/.pom.xml.swp b/modules/gfac/gfac-monitor/.pom.xml.swp
new file mode 100644
index 0000000..f65293b
Binary files /dev/null and b/modules/gfac/gfac-monitor/.pom.xml.swp differ

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/pom.xml b/modules/gfac/gfac-monitor/pom.xml
new file mode 100644
index 0000000..0582e52
--- /dev/null
+++ b/modules/gfac/gfac-monitor/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file 
+    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under 
+    the Apache License, Version 2.0 (theÏ "License"); you may not use this file except in compliance with the License. You may 
+    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to 
+    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
+    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
+    the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata</artifactId>
+        <version>0.12-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>gfac-monitor</artifactId>
+    <name>GFAC Job Monitor</name>
+    <description>This component handle the Airavata Job monitoring funcationality and consume the job-monitor tool</description>
+    <url>http://airavata.apache.org/</url>
+
+    <dependencies>
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+	<!-- monitoring tool from tools/job-monitor -->
+	 <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-job-monitor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-jpa-registry</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk16</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skip>false</skip>
+                    <forkMode>always</forkMode>
+                    <failIfNoTests>false</failIfNoTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..b755e16
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.BlockingQueue;
+
+public class AiravataJobStatusUpdator{
+    private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private BlockingQueue<MonitorID> jobsToMonitor;
+
+
+    public AiravataJobStatusUpdator(Registry airavataRegistry, BlockingQueue<MonitorID> jobsToMonitor) {
+        this.airavataRegistry = airavataRegistry;
+        this.jobsToMonitor = jobsToMonitor;
+    }
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    public BlockingQueue<MonitorID> getJobsToMonitor() {
+        return jobsToMonitor;
+    }
+
+    public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
+        this.jobsToMonitor = jobsToMonitor;
+    }
+
+    @Subscribe
+    public void updateRegistry(JobStatus jobStatus) {
+        /* Here we need to parse the jobStatus message and update
+                the registry accordingly, for now we are just printing to standard Out
+                 */
+        JobState state = jobStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = jobStatus.getMonitorID().getTaskID();
+                String jobID = jobStatus.getMonitorID().getJobID();
+                updateJobStatus(taskID, jobID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+            switch (state) {
+                case COMPLETE:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+                case UNKNOWN:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    //todo implement this logic
+                    break;
+                case QUEUED:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED");
+                    break;
+                case SUBMITTED:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED");
+                    break;
+                case ACTIVE:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE");
+                    break;
+                case CANCELED:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+                case FAILED:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+                case HELD:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD");
+                    break;
+                case SUSPENDED:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+            }
+        }
+    }
+    public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
+        if(details == null) {
+            details = new JobDetails();
+        }
+        org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+        status.setJobState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setJobStatus(status);
+        details.setJobID(jobID);
+        airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, ids);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
new file mode 100644
index 0000000..7c7d31c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.job.monitor.core.Monitor;
+import org.apache.airavata.job.monitor.core.PullMonitor;
+import org.apache.airavata.job.monitor.core.PushMonitor;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
+import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
+import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/*
+this is the manager class for monitoring system of airavata,
+This simply handle the monitoring flow of the system.
+Keeps available jobs to monitor in a queue and once they are done
+remove them from the queue, this will be done by AiravataJobUpdator.
+ */
+public class MonitorManager {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
+
+    private List<PullMonitor> pullMonitors;    //todo though we have a List we only support one at a time
+
+    private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
+
+    private BlockingQueue<UserMonitorData> pullQueue;
+
+    private BlockingQueue<UserMonitorData> pushQueue;
+
+    private BlockingQueue<MonitorID> localJobQueue;
+
+    private BlockingQueue<MonitorID> finishQueue;
+
+    private MonitorPublisher monitorPublisher;
+
+    private Monitor localJobMonitor;
+
+    /**
+     * This will initialize the major monitoring system.
+     */
+    public MonitorManager() {
+        pullMonitors = new ArrayList<PullMonitor>();
+        pushMonitors = new ArrayList<PushMonitor>();
+        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+        pushQueue = new LinkedBlockingQueue<UserMonitorData>();
+        finishQueue = new LinkedBlockingQueue<MonitorID>();
+        localJobQueue = new LinkedBlockingQueue<MonitorID>();
+        monitorPublisher = new MonitorPublisher(new EventBus());
+        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
+    }
+
+    /**
+     * This can be use to add an empty AMQPMonitor object to the monitor system
+     * and tihs method will take care of the initialization
+     * todo may be we need to move this to some other class
+     * @param monitor
+     */
+    public void addAMQPMonitor(AMQPMonitor monitor) {
+        monitor.setPublisher(this.getMonitorPublisher());
+        monitor.setFinishQueue(this.getFinishQueue());
+        monitor.setRunningQueue(this.getPushQueue());
+        addPushMonitor(monitor);
+    }
+
+
+    /**
+     * This can be use to add an empty AMQPMonitor object to the monitor system
+     * and tihs method will take care of the initialization
+     * todo may be we need to move this to some other class
+     * @param monitor
+     */
+    public void addLocalMonitor(LocalJobMonitor monitor) {
+        monitor.setPublisher(this.getMonitorPublisher());
+        monitor.setJobQueue(this.getLocalJobQueue());
+        localJobMonitor = monitor;
+    }
+
+    /**
+     * This can be used to adda a QstatMonitor and it will take care of
+     * the initialization of QstatMonitor
+     * //todo may be we need to move this to some other class
+     * @param qstatMonitor
+     */
+    public void addQstatMonitor(QstatMonitor qstatMonitor) {
+        qstatMonitor.setPublisher(this.getMonitorPublisher());
+        qstatMonitor.setQueue(this.getPullQueue());
+        addPullMonitor(qstatMonitor);
+
+    }
+
+    /**
+     * To deal with the statuses users can write their own listener and implement their own logic
+     *
+     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+     */
+    public void registerListener(Object listener) {
+        monitorPublisher.registerListener(listener);
+    }
+
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
+    public void addPushMonitor(PushMonitor monitor) {
+        pushMonitors.add(monitor);
+    }
+
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
+    public void addPullMonitor(PullMonitor monitor) {
+        pullMonitors.add(monitor);
+    }
+
+    /**
+     * Adding this method will trigger the thread in launchMonitor and notify it
+     * This is going to be useful during the startup of the launching process
+     *
+     * @param monitorID
+     * @throws AiravataMonitorException
+     */
+    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
+
+        if (monitorID.getHost().getType() instanceof GsisshHostType) {
+            GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
+            if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
+                    || Constants.PULL.equals(host.getMonitorMode())) {
+                CommonUtils.addMonitortoQueue(pullQueue, monitorID);
+            } else if (Constants.PUSH.equals(host.getMonitorMode())) {
+                CommonUtils.addMonitortoQueue(pushQueue, monitorID);
+            }
+        } else if(monitorID.getHost().getType() instanceof GlobusHostType){
+            logger.error("Monitoring does not support GlubusHostType resources");
+        } else {
+            // we assume this is a type of localJobtype
+            localJobQueue.add(monitorID);
+        }
+    }
+
+    /**
+     * This method should be invoked before adding any elements to monitorQueue
+     * In this method we assume that we give higher preference to Push
+     * Monitorig mechanism if there's any configured, otherwise Pull
+     * monitoring will be launched.
+     * Ex: If there's a reasource which doesn't support Push, we have
+     * to live with Pull MOnitoring.
+     *
+     * @throws AiravataMonitorException
+     */
+    public void launchMonitor() throws AiravataMonitorException {
+        //no push monitor is configured so we launch pull monitor
+        int index = 0;
+        if(localJobMonitor != null){
+            (new Thread(localJobMonitor)).start();
+        }
+
+        for (PullMonitor monitor : pullMonitors) {
+            (new Thread(monitor)).start();
+        }
+
+        //todo fix this
+        for (PushMonitor monitor : pushMonitors) {
+            (new Thread(monitor)).start();
+            if (monitor instanceof AMQPMonitor) {
+                UnRegisterThread unRegisterThread = new
+                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
+                unRegisterThread.start();
+            }
+        }
+    }
+
+    /* getter setters for the private variables */
+
+    public List<PullMonitor> getPullMonitors() {
+        return pullMonitors;
+    }
+
+    public void setPullMonitors(List<PullMonitor> pullMonitors) {
+        this.pullMonitors = pullMonitors;
+    }
+
+    public List<PushMonitor> getPushMonitors() {
+        return pushMonitors;
+    }
+
+    public void setPushMonitors(List<PushMonitor> pushMonitors) {
+        this.pushMonitors = pushMonitors;
+    }
+
+    public BlockingQueue<UserMonitorData> getPullQueue() {
+        return pullQueue;
+    }
+
+    public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
+        this.pullQueue = pullQueue;
+    }
+
+    public MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+        this.monitorPublisher = monitorPublisher;
+    }
+
+    public BlockingQueue<MonitorID> getFinishQueue() {
+        return finishQueue;
+    }
+
+    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+        this.finishQueue = finishQueue;
+    }
+
+    public BlockingQueue<UserMonitorData> getPushQueue() {
+        return pushQueue;
+    }
+
+    public void setPushQueue(BlockingQueue<UserMonitorData> pushQueue) {
+        this.pushQueue = pushQueue;
+    }
+
+    public BlockingQueue<MonitorID> getLocalJobQueue() {
+        return localJobQueue;
+    }
+
+    public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
+        this.localJobQueue = localJobQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/main/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/resources/PBSTemplate.xslt b/modules/gfac/gfac-monitor/src/main/resources/PBSTemplate.xslt
new file mode 100644
index 0000000..e4398e0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/resources/PBSTemplate.xslt
@@ -0,0 +1,77 @@
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+	the Apache License, Version 2.0 (theÏ "License"); you may not use this file except in compliance with the License. You may
+	obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+	in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+	ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+	the License. -->
+<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns="http://airavata.apache.org/gsi/ssh/2012/12">
+<xsl:output method="text" />
+<xsl:template match="/ns:JobDescriptor">
+#! /bin/sh
+#   <xsl:choose>
+    <xsl:when test="ns:shellName">
+##PBS -S <xsl:value-of select="ns:shellName"/>
+    </xsl:when></xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:queueName">
+#PBS -q <xsl:value-of select="ns:queueName"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:mailOptions">
+#PBS -m <xsl:value-of select="ns:mailOptions"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+<xsl:when test="ns:acountString">
+#PBS -A <xsl:value-of select="ns:acountString"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:maxWallTime">
+#PBS -l walltime=<xsl:value-of select="ns:maxWallTime"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:jobName">
+#PBS -N <xsl:value-of select="ns:jobName"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:standardOutFile">
+#PBS -o <xsl:value-of select="ns:standardOutFile"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="ns:standardOutFile">
+#PBS -e <xsl:value-of select="ns:standardErrorFile"/>
+    </xsl:when>
+    </xsl:choose>
+    <xsl:choose>
+    <xsl:when test="(ns:nodes) and (ns:processesPerNode)">
+#PBS -l nodes=<xsl:value-of select="ns:nodes"/>:ppn=<xsl:value-of select="ns:processesPerNode"/>
+<xsl:text>&#xa;</xsl:text>
+    </xsl:when>
+    </xsl:choose>
+<xsl:for-each select="ns:exports/ns:name">
+<xsl:value-of select="."/>=<xsl:value-of select="./@value"/><xsl:text>&#xa;</xsl:text>
+export<xsl:text>   </xsl:text><xsl:value-of select="."/>
+<xsl:text>&#xa;</xsl:text>
+</xsl:for-each>
+<xsl:for-each select="ns:preJobCommands/ns:command">
+      <xsl:value-of select="."/><xsl:text>   </xsl:text>
+    </xsl:for-each>
+cd <xsl:text>   </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>&#xa;</xsl:text>
+    <xsl:choose><xsl:when test="ns:jobSubmitterCommand">
+<xsl:value-of select="ns:jobSubmitterCommand"/><xsl:text>   </xsl:text></xsl:when></xsl:choose><xsl:value-of select="ns:executablePath"/><xsl:text>   </xsl:text>
+<xsl:for-each select="ns:inputs/ns:input">
+      <xsl:value-of select="."/><xsl:text>   </xsl:text>
+    </xsl:for-each>
+<xsl:for-each select="ns:postJobCommands/ns:command">
+      <xsl:value-of select="."/><xsl:text>   </xsl:text>
+</xsl:for-each>
+
+</xsl:template>
+
+</xsl:stylesheet>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/main/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/resources/gsissh.properties b/modules/gfac/gfac-monitor/src/main/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/test/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/gsissh.properties b/modules/gfac/gfac-monitor/src/test/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/gfac-monitor/src/test/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/monitor.properties b/modules/gfac/gfac-monitor/src/test/resources/monitor.properties
new file mode 100644
index 0000000..a4d68cf
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/monitor.properties
@@ -0,0 +1,3 @@
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
+connection.name=xsede_private
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/gfac/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/pom.xml b/modules/gfac/pom.xml
index fb11144..2b57a11 100644
--- a/modules/gfac/pom.xml
+++ b/modules/gfac/pom.xml
@@ -32,6 +32,7 @@
             <modules>
                 <module>gfac-core</module>
                 <module>gfac-ec2</module>
+		<module>gfac-monitor</module>
             </modules>
         </profile>
     </profiles>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/modules/orchestrator/airavata-orchestrator-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/pom.xml b/modules/orchestrator/airavata-orchestrator-service/pom.xml
index c1af70b..2b151da 100644
--- a/modules/orchestrator/airavata-orchestrator-service/pom.xml
+++ b/modules/orchestrator/airavata-orchestrator-service/pom.xml
@@ -43,12 +43,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-gfac-core</artifactId>
+            <artifactId>gfac-monitor</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/tools/job-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/tools/job-monitor/pom.xml b/tools/job-monitor/pom.xml
index 185b068..f782dff 100644
--- a/tools/job-monitor/pom.xml
+++ b/tools/job-monitor/pom.xml
@@ -18,7 +18,7 @@
     </parent>
 
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>airavata-job-monitor</artifactId>
+    <artifactId>job-monitor-tool</artifactId>
     <name>Airavata Job Monitor</name>
     <description>This component handle the Airavata Job monitoring funcationality</description>
     <url>http://airavata.apache.org/</url>
@@ -55,24 +55,8 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-registry-cpi</artifactId>
-            <version>${project.version}</version>
-        </dependency>
 
         <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-jpa-registry</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-jdk16</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>com.rabbitmq</groupId>
             <artifactId>amqp-client</artifactId>
             <version>3.2.3</version>
@@ -116,13 +100,23 @@
             <version>12.0</version>
         </dependency>
 
-        <!-- gsi-ssh api dependencies -->
+        <!-- gsi-ssh api and other util dependencies -->
         <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>gsissh</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-gfac-schema-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.jcraft</groupId>
             <artifactId>jsch</artifactId>
             <version>0.1.50</version>

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
deleted file mode 100644
index b755e16..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.CompositeIdentifier;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-import java.util.concurrent.BlockingQueue;
-
-public class AiravataJobStatusUpdator{
-    private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
-
-    private Registry airavataRegistry;
-
-    private BlockingQueue<MonitorID> jobsToMonitor;
-
-
-    public AiravataJobStatusUpdator(Registry airavataRegistry, BlockingQueue<MonitorID> jobsToMonitor) {
-        this.airavataRegistry = airavataRegistry;
-        this.jobsToMonitor = jobsToMonitor;
-    }
-
-    public Registry getAiravataRegistry() {
-        return airavataRegistry;
-    }
-
-    public void setAiravataRegistry(Registry airavataRegistry) {
-        this.airavataRegistry = airavataRegistry;
-    }
-
-    public BlockingQueue<MonitorID> getJobsToMonitor() {
-        return jobsToMonitor;
-    }
-
-    public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
-        this.jobsToMonitor = jobsToMonitor;
-    }
-
-    @Subscribe
-    public void updateRegistry(JobStatus jobStatus) {
-        /* Here we need to parse the jobStatus message and update
-                the registry accordingly, for now we are just printing to standard Out
-                 */
-        JobState state = jobStatus.getState();
-        if (state != null) {
-            try {
-                String taskID = jobStatus.getMonitorID().getTaskID();
-                String jobID = jobStatus.getMonitorID().getJobID();
-                updateJobStatus(taskID, jobID, state);
-            } catch (Exception e) {
-                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
-            }
-            switch (state) {
-                case COMPLETE:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case UNKNOWN:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    //todo implement this logic
-                    break;
-                case QUEUED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED");
-                    break;
-                case SUBMITTED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED");
-                    break;
-                case ACTIVE:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE");
-                    break;
-                case CANCELED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case FAILED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-                case HELD:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD");
-                    break;
-                case SUSPENDED:
-                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-            }
-        }
-    }
-    public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
-        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
-        JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
-        if(details == null) {
-            details = new JobDetails();
-        }
-        org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
-        status.setJobState(state);
-        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-        details.setJobStatus(status);
-        details.setJobID(jobID);
-        airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, ids);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
deleted file mode 100644
index 7c7d31c..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.job.monitor.core.Monitor;
-import org.apache.airavata.job.monitor.core.PullMonitor;
-import org.apache.airavata.job.monitor.core.PushMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
-import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/*
-this is the manager class for monitoring system of airavata,
-This simply handle the monitoring flow of the system.
-Keeps available jobs to monitor in a queue and once they are done
-remove them from the queue, this will be done by AiravataJobUpdator.
- */
-public class MonitorManager {
-    private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
-
-    private List<PullMonitor> pullMonitors;    //todo though we have a List we only support one at a time
-
-    private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
-
-    private BlockingQueue<UserMonitorData> pullQueue;
-
-    private BlockingQueue<UserMonitorData> pushQueue;
-
-    private BlockingQueue<MonitorID> localJobQueue;
-
-    private BlockingQueue<MonitorID> finishQueue;
-
-    private MonitorPublisher monitorPublisher;
-
-    private Monitor localJobMonitor;
-
-    /**
-     * This will initialize the major monitoring system.
-     */
-    public MonitorManager() {
-        pullMonitors = new ArrayList<PullMonitor>();
-        pushMonitors = new ArrayList<PushMonitor>();
-        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
-        pushQueue = new LinkedBlockingQueue<UserMonitorData>();
-        finishQueue = new LinkedBlockingQueue<MonitorID>();
-        localJobQueue = new LinkedBlockingQueue<MonitorID>();
-        monitorPublisher = new MonitorPublisher(new EventBus());
-        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
-    }
-
-    /**
-     * This can be use to add an empty AMQPMonitor object to the monitor system
-     * and tihs method will take care of the initialization
-     * todo may be we need to move this to some other class
-     * @param monitor
-     */
-    public void addAMQPMonitor(AMQPMonitor monitor) {
-        monitor.setPublisher(this.getMonitorPublisher());
-        monitor.setFinishQueue(this.getFinishQueue());
-        monitor.setRunningQueue(this.getPushQueue());
-        addPushMonitor(monitor);
-    }
-
-
-    /**
-     * This can be use to add an empty AMQPMonitor object to the monitor system
-     * and tihs method will take care of the initialization
-     * todo may be we need to move this to some other class
-     * @param monitor
-     */
-    public void addLocalMonitor(LocalJobMonitor monitor) {
-        monitor.setPublisher(this.getMonitorPublisher());
-        monitor.setJobQueue(this.getLocalJobQueue());
-        localJobMonitor = monitor;
-    }
-
-    /**
-     * This can be used to adda a QstatMonitor and it will take care of
-     * the initialization of QstatMonitor
-     * //todo may be we need to move this to some other class
-     * @param qstatMonitor
-     */
-    public void addQstatMonitor(QstatMonitor qstatMonitor) {
-        qstatMonitor.setPublisher(this.getMonitorPublisher());
-        qstatMonitor.setQueue(this.getPullQueue());
-        addPullMonitor(qstatMonitor);
-
-    }
-
-    /**
-     * To deal with the statuses users can write their own listener and implement their own logic
-     *
-     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
-     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
-     */
-    public void registerListener(Object listener) {
-        monitorPublisher.registerListener(listener);
-    }
-
-    /**
-     * todo write
-     *
-     * @param monitor
-     */
-    public void addPushMonitor(PushMonitor monitor) {
-        pushMonitors.add(monitor);
-    }
-
-    /**
-     * todo write
-     *
-     * @param monitor
-     */
-    public void addPullMonitor(PullMonitor monitor) {
-        pullMonitors.add(monitor);
-    }
-
-    /**
-     * Adding this method will trigger the thread in launchMonitor and notify it
-     * This is going to be useful during the startup of the launching process
-     *
-     * @param monitorID
-     * @throws AiravataMonitorException
-     */
-    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
-
-        if (monitorID.getHost().getType() instanceof GsisshHostType) {
-            GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
-            if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
-                    || Constants.PULL.equals(host.getMonitorMode())) {
-                CommonUtils.addMonitortoQueue(pullQueue, monitorID);
-            } else if (Constants.PUSH.equals(host.getMonitorMode())) {
-                CommonUtils.addMonitortoQueue(pushQueue, monitorID);
-            }
-        } else if(monitorID.getHost().getType() instanceof GlobusHostType){
-            logger.error("Monitoring does not support GlubusHostType resources");
-        } else {
-            // we assume this is a type of localJobtype
-            localJobQueue.add(monitorID);
-        }
-    }
-
-    /**
-     * This method should be invoked before adding any elements to monitorQueue
-     * In this method we assume that we give higher preference to Push
-     * Monitorig mechanism if there's any configured, otherwise Pull
-     * monitoring will be launched.
-     * Ex: If there's a reasource which doesn't support Push, we have
-     * to live with Pull MOnitoring.
-     *
-     * @throws AiravataMonitorException
-     */
-    public void launchMonitor() throws AiravataMonitorException {
-        //no push monitor is configured so we launch pull monitor
-        int index = 0;
-        if(localJobMonitor != null){
-            (new Thread(localJobMonitor)).start();
-        }
-
-        for (PullMonitor monitor : pullMonitors) {
-            (new Thread(monitor)).start();
-        }
-
-        //todo fix this
-        for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).start();
-            if (monitor instanceof AMQPMonitor) {
-                UnRegisterThread unRegisterThread = new
-                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
-                unRegisterThread.start();
-            }
-        }
-    }
-
-    /* getter setters for the private variables */
-
-    public List<PullMonitor> getPullMonitors() {
-        return pullMonitors;
-    }
-
-    public void setPullMonitors(List<PullMonitor> pullMonitors) {
-        this.pullMonitors = pullMonitors;
-    }
-
-    public List<PushMonitor> getPushMonitors() {
-        return pushMonitors;
-    }
-
-    public void setPushMonitors(List<PushMonitor> pushMonitors) {
-        this.pushMonitors = pushMonitors;
-    }
-
-    public BlockingQueue<UserMonitorData> getPullQueue() {
-        return pullQueue;
-    }
-
-    public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
-        this.pullQueue = pullQueue;
-    }
-
-    public MonitorPublisher getMonitorPublisher() {
-        return monitorPublisher;
-    }
-
-    public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
-        this.monitorPublisher = monitorPublisher;
-    }
-
-    public BlockingQueue<MonitorID> getFinishQueue() {
-        return finishQueue;
-    }
-
-    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
-        this.finishQueue = finishQueue;
-    }
-
-    public BlockingQueue<UserMonitorData> getPushQueue() {
-        return pushQueue;
-    }
-
-    public void setPushQueue(BlockingQueue<UserMonitorData> pushQueue) {
-        this.pushQueue = pushQueue;
-    }
-
-    public BlockingQueue<MonitorID> getLocalJobQueue() {
-        return localJobQueue;
-    }
-
-    public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
-        this.localJobQueue = localJobQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index d4af60f..1a098da 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -146,7 +146,7 @@ public class QstatMonitor extends PullMonitor {
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                             completedJobs.add(iMonitorID);
                         } else {
-                            // Everything is good but jjob is not yet completed, so we just change the last monitor
+                            // Evey
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                             // if the job is complete we remove it from the Map, if any of these maps
                             // get empty this userMonitorData will get delete from the queue

http://git-wip-us.apache.org/repos/asf/airavata/blob/f60565b3/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 3e2d0fe..44d4653 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -22,7 +22,6 @@ package org.apache.airavata.job.monitor.impl.push.amqp;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
-import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
@@ -32,7 +31,6 @@ import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 


Mime
View raw message