airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [21/81] [abbrv] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl
Date Thu, 04 Jun 2015 20:15:34 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
new file mode 100644
index 0000000..0710d9e
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/UGEEmailParser.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class UGEEmailParser implements EmailParser {
+
+    private static final Logger log = LoggerFactory.getLogger(UGEEmailParser.class);
+    private static final String REGEX = "[\\w]*[ ]*(?<"+ JOBID + ">[\\d]*)[ ]*\\((?<" + JOBNAME
+            + ">[a-zA-Z0-9]*)\\)[ ]*(?<" + STATUS + ">[a-zA-Z]*)";
+    public static final String STARTED = "Started";
+    public static final String COMPLETE = "Complete";
+    public static final String FAILED = "Failed";
+    private static final String REGEX_EXIT_STATUS = "Exit Status[ ]*=[ ]*(?<" + EXIT_STATUS + ">[\\d]+)";
+    public static final String ABORTED = "Aborted";
+
+
+    @Override
+    public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
+        JobStatusResult jobStatusResult = new JobStatusResult();
+
+        String subject = message.getSubject();
+        Pattern pattern = Pattern.compile(REGEX);
+        Matcher matcher = pattern.matcher(subject);
+        try {
+            if (matcher.find()) {
+                jobStatusResult.setJobId(matcher.group(JOBID));
+                jobStatusResult.setJobName(matcher.group(JOBNAME));
+                String content = (String) message.getContent();
+                jobStatusResult.setState(getJobState(matcher.group(STATUS), content));
+            } else {
+                log.error("[EJM]: No matched found for subject => \n" + subject);
+            }
+        } catch (IOException e) {
+            throw new AiravataException("[EJM]: Error while reading content of the email message");
+        }
+        return jobStatusResult;
+    }
+
+    private JobState getJobState(String status, String content) {
+        switch (status) {
+            case STARTED:
+                return JobState.ACTIVE;
+            case COMPLETE:
+                int exitStatus = getExitStatus(content);
+                if (exitStatus == 0) {
+                    return JobState.COMPLETE;
+                } else {
+                    log.info("[EJM]: Job returns with Exit Status = " + exitStatus + "  , Marked as Failed");
+                    return JobState.FAILED;
+                }
+            case FAILED:
+                return JobState.FAILED;
+            case ABORTED:
+                return JobState.FAILED;
+            default:
+                return JobState.UNKNOWN;
+
+        }
+    }
+
+    private int getExitStatus(String content) {
+        Pattern statusPattern = Pattern.compile(REGEX_EXIT_STATUS);
+        Matcher statusMatcher = statusPattern.matcher(content);
+        if (statusMatcher.find()) {
+            String group = statusMatcher.group(EXIT_STATUS);
+            if (group != null && !group.trim().isEmpty()) {
+                return Integer.valueOf(group.trim());
+            }
+        }
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
new file mode 100644
index 0000000..3acef66
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.exception;
+
+public class AiravataMonitorException extends Exception {
+    private static final long serialVersionUID = -2849422320139467602L;
+
+    public AiravataMonitorException(Throwable e) {
+        super(e);
+    }
+
+    public AiravataMonitorException(String message) {
+        super(message, null);
+    }
+
+    public AiravataMonitorException(String message, Throwable e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
new file mode 100644
index 0000000..e31458d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.util.Properties;
+
+/**
+ * this handler is responsible for monitoring jobs in a pull mode
+ * and currently this support multiple pull monitoring in grid resource and uses
+ * commands like qstat,squeue and this supports sun grid enging monitoring too
+ * which is a slight variation of qstat monitoring.
+ */
+public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
+    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GridPullMonitorHandler.class);
+
+    private HPCPullMonitor hpcPullMonitor;
+
+    private AuthenticationInfo authenticationInfo;
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+        String myProxyUser = null;
+        try {
+            myProxyUser = ServerSettings.getSetting("myproxy.username");
+            String myProxyPass = ServerSettings.getSetting("myproxy.password");
+            String certPath = ServerSettings.getSetting("trusted.cert.location");
+            String myProxyServer = ServerSettings.getSetting("myproxy.server");
+            setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+                    7512, 17280000, certPath));
+            hpcPullMonitor = new HPCPullMonitor(null,getAuthenticationInfo());    // we use our own credentials for monitoring, not from the store
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error while  reading server properties", e);
+            throw new GFacHandlerException("Error while  reading server properties", e);
+        }
+    }
+
+    public void run() {
+        hpcPullMonitor.run();
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
+        hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher());
+        MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
+        try {
+           /* ZooKeeper zk = jobExecutionContext.getZk();
+            try {
+                String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
+                String path = experimentEntry + File.separator + "operation";
+                Stat exists = zk.exists(path, this);
+                if (exists != null) {
+                    zk.getData(path, this, exists); // watching the operations node
+                }
+            } catch (KeeperException e) {
+                logger.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }*/
+            CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext);
+            CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
+        } catch (AiravataMonitorException e) {
+            logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}",
+                    monitorID.getJobID(),  monitorID.getExperimentID());
+        }
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public HPCPullMonitor getHpcPullMonitor() {
+        return hpcPullMonitor;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) {
+        this.hpcPullMonitor = hpcPullMonitor;
+    }
+
+
+    public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
+        if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
+            // node data is changed, this means node is cancelled.
+            logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
+
+            String[] split = watchedEvent.getPath().split("/");
+            for(String element:split) {
+                if (element.contains("+")) {
+                    logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element);
+                    hpcPullMonitor.getCancelJobList().add(element);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
new file mode 100644
index 0000000..6db7da5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *   this handler is responsible monitoring jobs in push mode
+ *   and currently this support multiple push monitoring in grid resource
+ */
+public class GridPushMonitorHandler extends ThreadedHandler {
+    private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class);
+
+    private AMQPMonitor amqpMonitor;
+
+    private AuthenticationInfo authenticationInfo;
+
+    @Override
+    public void initProperties(Properties properties) throws GFacHandlerException {
+        String myProxyUser=null;
+        try{
+            myProxyUser = ServerSettings.getSetting("myproxy.username");
+            String myProxyPass = ServerSettings.getSetting("myproxy.password");
+            String certPath = ServerSettings.getSetting("trusted.cert.location");
+            String myProxyServer = ServerSettings.getSetting("myproxy.server");
+            setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+                    7512, 17280000, certPath));
+
+            String hostList=(String)properties.get("hosts");
+            String proxyFilePath = ServerSettings.getSetting("proxy.file.path");
+            String connectionName=ServerSettings.getSetting("connection.name");
+            LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
+            LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
+            List<String> hosts= Arrays.asList(hostList.split(","));
+            amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
+        }catch (ApplicationSettingsException e){
+            logger.error(e.getMessage(), e);
+            throw new GFacHandlerException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void run() {
+        amqpMonitor.run();
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
+        super.invoke(jobExecutionContext);
+        MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
+        amqpMonitor.getRunningQueue().add(monitorID);
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    public AMQPMonitor getAmqpMonitor() {
+        return amqpMonitor;
+    }
+
+    public void setAmqpMonitor(AMQPMonitor amqpMonitor) {
+        this.amqpMonitor = amqpMonitor;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
new file mode 100644
index 0000000..553ded9
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -0,0 +1,471 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This monitor is based on qstat command which can be run
+ * in grid resources and retrieve the job status.
+ */
+public class HPCPullMonitor extends PullMonitor {
+
+    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
+    public static final int FAILED_COUNT = 5;
+
+    // I think this should use DelayedBlocking Queue to do the monitoring*/
+    private BlockingQueue<UserMonitorData> queue;
+
+    private boolean startPulling = false;
+
+    private Map<String, ResourceConnection> connections;
+
+    private MonitorPublisher publisher;
+
+    private LinkedBlockingQueue<String> cancelJobList;
+
+    private List<String> completedJobsFromPush;
+
+    private GFac gfac;
+
+    private AuthenticationInfo authenticationInfo;
+
+    private ArrayList<MonitorID> removeList;
+
+    public HPCPullMonitor() {
+        connections = new HashMap<String, ResourceConnection>();
+        queue = new LinkedBlockingDeque<UserMonitorData>();
+        publisher = new MonitorPublisher(new EventBus());
+        cancelJobList = new LinkedBlockingQueue<String>();
+        completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+        removeList = new ArrayList<MonitorID>();
+    }
+
+    public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
+        connections = new HashMap<String, ResourceConnection>();
+        queue = new LinkedBlockingDeque<UserMonitorData>();
+        publisher = monitorPublisher;
+        authenticationInfo = authInfo;
+        cancelJobList = new LinkedBlockingQueue<String>();
+        this.completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+        removeList = new ArrayList<MonitorID>();
+    }
+
+    public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+        this.queue = queue;
+        this.publisher = publisher;
+        connections = new HashMap<String, ResourceConnection>();
+        cancelJobList = new LinkedBlockingQueue<String>();
+        this.completedJobsFromPush = new ArrayList<String>();
+        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+        removeList = new ArrayList<MonitorID>();
+    }
+
+
+    public void run() {
+        /* implement a logic to pick each monitorID object from the queue and do the
+        monitoring
+         */
+        this.startPulling = true;
+        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
+            try {
+                // After finishing one iteration of the full queue this thread sleeps 1 second
+                synchronized (this.queue) {
+                    if (this.queue.size() > 0) {
+                        startPulling();
+                }
+            }
+                Thread.sleep(10000);
+            } catch (Exception e) {
+                // we catch all the exceptions here because no matter what happens we do not stop running this
+                // thread, but ideally we should report proper error messages, but this is handled in startPulling
+                // method, incase something happen in Thread.sleep we handle it with this catch block.
+                logger.error(e.getMessage(),e);
+            }
+        }
+        // thread is going to return so we close all the connections
+        Iterator<String> iterator = connections.keySet().iterator();
+        while (iterator.hasNext()) {
+            String next = iterator.next();
+            ResourceConnection resourceConnection = connections.get(next);
+            try {
+                resourceConnection.getCluster().disconnect();
+            } catch (SSHApiException e) {
+                logger.error("Erro while connecting to the cluster", e);
+            }
+        }
+    }
+
+    /**
+     * This method will can invoke when PullMonitor needs to start
+     * and it has to invoke in the frequency specified below,
+     *
+     * @return if the start process is successful return true else false
+     */
+     public boolean startPulling() throws AiravataMonitorException {
+        // take the top element in the queue and pull the data and put that element
+        // at the tail of the queue
+        //todo this polling will not work with multiple usernames but with single user
+        // and multiple hosts, currently monitoring will work
+        UserMonitorData take = null;
+        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+        MonitorID currentMonitorID = null;
+        try {
+            take = this.queue.take();
+            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+            for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) {
+                HostMonitorData iHostMonitorData = hostIterator.next();
+                if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+                    String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
+                    ResourceConnection connection = null;
+                    if (connections.containsKey(hostName)) {
+                        if (!connections.get(hostName).isConnected()) {
+                            connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
+                            connections.put(hostName, connection);
+                        } else {
+                            logger.debug("We already have this connection so not going to create one");
+                            connection = connections.get(hostName);
+                        }
+                    } else {
+                        connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
+                        connections.put(hostName, connection);
+                    }
+
+                    // before we get the statuses, we check the cancel job list and remove them permanently
+                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+                    Iterator<String> iterator1 = cancelJobList.iterator();
+                    ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
+                    while (monitorIDListIterator.hasNext()) {
+                        MonitorID iMonitorID = monitorIDListIterator.next();
+                        while (iterator1.hasNext()) {
+                            String cancelMId = iterator1.next();
+                            if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
+                                iMonitorID.setStatus(JobState.CANCELED);
+//                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+                                removeList.add(iMonitorID);
+                                logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " +
+                                                "completed job queue, experiment {}, task {} , job {}",
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID());
+                                logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .",
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+                                sendNotification(iMonitorID);
+                                logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
+                                Thread.sleep(10000);
+                                GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                                break;
+                            }
+                        }
+                        iterator1 = cancelJobList.iterator();
+                    }
+
+                    cleanup(take);
+
+                    synchronized (completedJobsFromPush) {
+                        for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) {
+                            String completeId = iterator.next();
+                            for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) {
+                                MonitorID iMonitorID = monitorIDListIterator.next();
+                                if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+                                    logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
+                                    iMonitorID.setStatus(JobState.COMPLETE);
+//                                    CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak
+                                    removeList.add(iMonitorID);
+                                    logger.debugId(completeId, "Push notification updated job {} status to {}. " +
+                                                    "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
+                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+                                    logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+
+                                    sendNotification(iMonitorID);
+                                    logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
+                                    Thread.sleep(10000);
+                                    GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    cleanup(take);
+
+                    // we have to get this again because we removed the already completed jobs with amqp messages
+                    monitorID = iHostMonitorData.getMonitorIDs();
+                    Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
+                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
+                        MonitorID iMonitorID = iterator.next();
+                        currentMonitorID = iMonitorID;
+                        if (!JobState.CANCELED.equals(iMonitorID.getStatus()) &&
+                                !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
+                        } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+                            logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " +
+                                    "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+//                            CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+                            removeList.add(iMonitorID);
+                            logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+                                    iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+                            GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                        }
+                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
+                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                        sendNotification(iMonitorID);
+                        // if the job is completed we do not have to put the job to the queue again
+                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                    }
+
+                    cleanup(take);
+
+
+                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
+                        MonitorID iMonitorID = iterator.next();
+                        if (iMonitorID.getFailedCount() > FAILED_COUNT) {
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            String outputDir = iMonitorID.getJobExecutionContext().getOutputDir();
+                            List<String> stdOut = null;
+                            try {
+                                stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory
+                            } catch (SSHApiException e) {
+                                if (e.getMessage().contains("No such file or directory")) {
+                                    // this is because while we run output handler something failed and during exception
+                                    // we store all the jobs in the monitor queue again
+                                    logger.error("We know this  job is already attempted to run out-handlers");
+//                                    CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
+                                }
+                            }
+                            if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this
+                                iMonitorID.setStatus(JobState.COMPLETE);
+                                logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " +
+                                                " Experiment {} , task {}", iMonitorID.getFailedCount(),
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+                                logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+                                sendNotification(iMonitorID);
+//                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+                                removeList.add(iMonitorID);
+                                GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                            } else {
+                                iMonitorID.setFailedCount(0);
+                            }
+                        } else {
+                            // Evey
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            // if the job is complete we remove it from the Map, if any of these maps
+                            // get empty this userMonitorData will get delete from the queue
+                        }
+                    }
+
+                    cleanup(take);
+
+
+                } else {
+                    logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.
+                            getComputeResourceDescription().getHostName());
+                }
+            }
+            // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+            // now the userMonitorData goes back to the tail of the queue
+            // during individual monitorID removal we remove the HostMonitorData object if it become empty
+            // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object
+            // we should remove it from the queue so here we do not put it back.
+            for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) {
+                HostMonitorData iHostMonitorID = iterator1.next();
+                if (iHostMonitorID.getMonitorIDs().size() == 0) {
+                    iterator1.remove();
+                    logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName());
+                }
+            }
+            if(take.getHostMonitorData().size()!=0) {
+                queue.put(take);
+            }
+        } catch (InterruptedException e) {
+            if (!this.queue.contains(take)) {
+                try {
+                    this.queue.put(take);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+            throw new AiravataMonitorException(e);
+        } catch (SSHApiException e) {
+            logger.error(e.getMessage());
+            if (e.getMessage().contains("Unknown Job Id Error")) {
+                // in this case job is finished or may be the given job ID is wrong
+                jobStatus.setState(JobState.UNKNOWN);
+                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
+                if (currentMonitorID != null){
+                    jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
+                    jobIdentifier.setTaskId(currentMonitorID.getTaskID());
+                    jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
+                    jobIdentifier.setJobId(currentMonitorID.getJobID());
+                    jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
+                }
+                jobStatus.setJobIdentity(jobIdentifier);
+                publisher.publish(jobStatus);
+            } else if (e.getMessage().contains("illegally formed job identifier")) {
+                logger.error("Wrong job ID is given so dropping the job from monitoring system");
+            } else if (!this.queue.contains(take)) {
+                try {
+                    queue.put(take);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
+                }
+            }
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        } catch (Exception e) {
+            try {
+                queue.put(take);
+            } catch (InterruptedException e1) {
+                e1.printStackTrace();
+            }
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        }
+        return true;
+    }
+
+    private void sendNotification(MonitorID iMonitorID) {
+        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
+                iMonitorID.getTaskID(),
+                iMonitorID.getWorkflowNodeID(),
+                iMonitorID.getExperimentID(),
+                iMonitorID.getJobExecutionContext().getGatewayID());
+        jobStatus.setJobIdentity(jobIdentity);
+        jobStatus.setState(iMonitorID.getStatus());
+        // we have this JobStatus class to handle amqp monitoring
+        logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
+                "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+        jobStatus.getJobIdentity().getTaskId());
+
+        publisher.publish(jobStatus);
+    }
+
+    /**
+     * This is the method to stop the polling process
+     *
+     * @return if the stopping process is successful return true else false
+     */
+    public boolean stopPulling() {
+        this.startPulling = false;
+        return true;
+    }
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public BlockingQueue<UserMonitorData> getQueue() {
+        return queue;
+    }
+
+    public void setQueue(BlockingQueue<UserMonitorData> queue) {
+        this.queue = queue;
+    }
+
+    public boolean authenticate() {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Map<String, ResourceConnection> getConnections() {
+        return connections;
+    }
+
+    public boolean isStartPulling() {
+        return startPulling;
+    }
+
+    public void setConnections(Map<String, ResourceConnection> connections) {
+        this.connections = connections;
+    }
+
+    public void setStartPulling(boolean startPulling) {
+        this.startPulling = startPulling;
+    }
+
+    public GFac getGfac() {
+        return gfac;
+    }
+
+    public void setGfac(GFac gfac) {
+        this.gfac = gfac;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public LinkedBlockingQueue<String> getCancelJobList() {
+        return cancelJobList;
+    }
+
+    public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
+        this.cancelJobList = cancelJobList;
+    }
+
+
+    private void cleanup(UserMonitorData userMonitorData){
+        for(MonitorID iMonitorId:removeList){
+            try {
+                CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId);
+            } catch (AiravataMonitorException e) {
+                logger.error(e.getMessage(), e);
+                logger.error("Error deleting the monitor data: " + iMonitorId.getJobID());
+            }
+        }
+        removeList.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
new file mode 100644
index 0000000..41e9bd2
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.JobStatus;
+import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class ResourceConnection {
+    private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
+
+    private PBSCluster cluster;
+
+    private AuthenticationInfo authenticationInfo;
+
+
+    public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
+        MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
+        try {
+            SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+            if(securityContext != null) {
+                if (securityContext instanceof GSISecurityContext) {
+                    GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
+                    cluster = (PBSCluster) gsiSecurityContext.getPbsCluster();
+                } else if (securityContext instanceof  SSHSecurityContext) {
+                    SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
+                            securityContext;
+                    cluster = (PBSCluster) sshSecurityContext.getPbsCluster();
+                }
+            }
+            // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
+            // we are using our own credentials and not using one users account to do everything.
+            authenticationInfo = authInfo;
+        } catch (GFacException e) {
+            log.error("Error reading data from job ExecutionContext");
+        }
+    }
+
+    public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException {
+        MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
+        try {
+            GSISecurityContext securityContext = (GSISecurityContext)
+                    monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+            cluster = (PBSCluster) securityContext.getPbsCluster();
+
+            // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
+            // we are using our own credentials and not using one users account to do everything.
+            cluster = new PBSCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration());
+        } catch (GFacException e) {
+            log.error("Error reading data from job ExecutionContext");
+        }
+    }
+
+    public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
+        String jobID = monitorID.getJobID();
+        //todo so currently we execute the qstat for each job but we can use user based monitoring
+        //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
+        return getStatusFromString(cluster.getJobStatus(jobID).toString());
+    }
+
+    public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException {
+        Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>();
+        Map<String, JobState> treeMap1 = new TreeMap<String, JobState>();
+        // creating a sorted map with all the jobIds and with the predefined
+        // status as UNKNOWN
+        for (MonitorID monitorID : monitorIDs) {
+            treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U);
+        }
+        String userName = cluster.getServerInfo().getUserName();
+        //todo so currently we execute the qstat for each job but we can use user based monitoring
+        //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
+        //
+        cluster.getJobStatuses(userName, treeMap);
+        for (String key : treeMap.keySet()) {
+            treeMap1.put(key, getStatusFromString(treeMap.get(key).toString()));
+        }
+        return treeMap1;
+    }
+
+    private JobState getStatusFromString(String status) {
+        log.info("parsing the job status returned : " + status);
+        if (status != null) {
+            if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
+                return JobState.COMPLETE;
+            } else if ("H".equals(status) || "h".equals(status)) {
+                return JobState.HELD;
+            } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
+                return JobState.QUEUED;
+            } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
+                return JobState.ACTIVE;
+            } else if ("T".equals(status)) {
+                return JobState.HELD;
+            } else if ("W".equals(status) || "PD".equals(status)) {
+                return JobState.QUEUED;
+            } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
+                return JobState.SUSPENDED;
+            } else if ("CA".equals(status)) {
+                return JobState.CANCELED;
+            } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
+                return JobState.FAILED;
+            } else if ("PR".equals(status) || "Er".equals(status)) {
+                return JobState.FAILED;
+            } else if ("U".equals(status) || ("UNKWN".equals(status))) {
+                return JobState.UNKNOWN;
+            }
+        }
+        return JobState.UNKNOWN;
+    }
+
+    public PBSCluster getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(PBSCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    public boolean isConnected(){
+        return this.cluster.getSession().isConnected();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
new file mode 100644
index 0000000..de8cd8c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+/**
+ * This is the implementation for AMQP based finishQueue, this uses
+ * rabbitmq client to recieve AMQP based monitoring data from
+ * mostly excede resources.
+ */
+public class AMQPMonitor extends PushMonitor {
+    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+
+    /* this will keep all the channels available in the system, we do not create
+      channels for all the jobs submitted, but we create channels for each user for each
+      host.
+    */
+    private Map<String, Channel> availableChannels;
+
+    private MonitorPublisher publisher;
+
+    private MonitorPublisher localPublisher;
+
+    private BlockingQueue<MonitorID> runningQueue;
+
+    private BlockingQueue<MonitorID> finishQueue;
+
+    private String connectionName;
+
+    private String proxyPath;
+
+    private List<String> amqpHosts;
+
+    private boolean startRegister;
+
+    public AMQPMonitor(){
+
+    }
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+                       BlockingQueue<MonitorID> finishQueue,
+                       String proxyPath,String connectionName,List<String> hosts) {
+        this.publisher = publisher;
+        this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
+        this.finishQueue = finishQueue;          // these will be initialized by the MonitorManager
+        this.availableChannels = new HashMap<String, Channel>();
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
+    }
+
+    public void initialize(String proxyPath, String connectionName, List<String> hosts) {
+        this.availableChannels = new HashMap<String, Channel>();
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
+    }
+
+    @Override
+    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
+        // we subscribe to read user-host based subscription
+        ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
+        if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
+            // we get first ip address for the moment
+            String hostAddress = computeResourceDescription.getIpAddresses().get(0);
+            // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+            // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+            String channelID = CommonUtils.getChannelID(monitorID);
+            if (availableChannels.get(channelID) == null) {
+                try {
+                    //todo need to fix this rather getting it from a file
+                    Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+                    Channel channel = null;
+                    channel = connection.createChannel();
+                    availableChannels.put(channelID, channel);
+                    String queueName = channel.queueDeclare().getQueue();
+
+                    BasicConsumer consumer = new
+                            BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
+                    channel.basicConsume(queueName, true, consumer);
+                    String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+                    // here we queuebind to a particular user in a particular machine
+                    channel.queueBind(queueName, "glue2.computing_activity", filterString);
+                    logger.info("Using filtering string to monitor: " + filterString);
+                } catch (IOException e) {
+                    logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+                }
+            }
+        } else {
+            throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
+                    " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
+                    "IpAddress with it");
+        }
+        return true;
+    }
+
+    public void run() {
+        // before going to the while true mode we start unregister thread
+        startRegister = true; // this will be unset by someone else
+        while (startRegister || !ServerSettings.isStopAllThreads()) {
+            try {
+                MonitorID take = runningQueue.take();
+                this.registerListener(take);
+            } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
+                logger.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            } catch (Exception e){
+                logger.error(e.getMessage(), e);
+            }
+        }
+        Set<String> strings = availableChannels.keySet();
+        for(String key:strings) {
+            Channel channel = availableChannels.get(key);
+            try {
+                channel.close();
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Subscribe
+    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
+        Iterator<MonitorID> iterator = finishQueue.iterator();
+        MonitorID next = null;
+        while(iterator.hasNext()){
+            next = iterator.next();
+            if(next.getJobID().endsWith(monitorID.getJobID())){
+                break;
+            }
+        }
+        if(next == null) {
+            logger.error("Job has removed from the queue, old obsolete message recieved");
+            return false;
+        }
+        String channelID = CommonUtils.getChannelID(next);
+        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
+            finishQueue.remove(next);
+
+            // if this is the last job in the queue at this point with the same username and same host we
+            // close the channel and close the connection and remove it from availableChannels
+            if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
+                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
+                        ", incase new job created we do subscribe again");
+                Channel channel = availableChannels.get(channelID);
+                if (channel == null) {
+                    logger.error("Already Unregistered the listener");
+                    throw new AiravataMonitorException("Already Unregistered the listener");
+                } else {
+                    try {
+                        channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
+                        channel.close();
+                        channel.getConnection().close();
+                        availableChannels.remove(channelID);
+                    } catch (IOException e) {
+                        logger.error("Error unregistering the listener");
+                        throw new AiravataMonitorException("Error unregistering the listener");
+                    }
+                }
+            }
+        }
+        next.setStatus(monitorID.getStatus());
+        JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(),
+                                                     next.getTaskID(),
+                                                     next.getWorkflowNodeID(),
+                                                     next.getExperimentID(),
+                                                     next.getJobExecutionContext().getGatewayID());
+        publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
+        return true;
+    }
+    @Override
+    public boolean stopRegister() throws AiravataMonitorException {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Map<String, Channel> getAvailableChannels() {
+        return availableChannels;
+    }
+
+    public void setAvailableChannels(Map<String, Channel> availableChannels) {
+        this.availableChannels = availableChannels;
+    }
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public BlockingQueue<MonitorID> getRunningQueue() {
+        return runningQueue;
+    }
+
+    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+        this.runningQueue = runningQueue;
+    }
+
+    public BlockingQueue<MonitorID> getFinishQueue() {
+        return finishQueue;
+    }
+
+    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+        this.finishQueue = finishQueue;
+    }
+
+    public String getProxyPath() {
+        return proxyPath;
+    }
+
+    public void setProxyPath(String proxyPath) {
+        this.proxyPath = proxyPath;
+    }
+
+    public List<String> getAmqpHosts() {
+        return amqpHosts;
+    }
+
+    public void setAmqpHosts(List<String> amqpHosts) {
+        this.amqpHosts = amqpHosts;
+    }
+
+    public boolean isStartRegister() {
+        return startRegister;
+    }
+
+    public void setStartRegister(boolean startRegister) {
+        this.startRegister = startRegister;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
new file mode 100644
index 0000000..bd5c625
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+
+public class BasicConsumer implements Consumer {
+    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+    private MessageParser parser;
+
+    private MonitorPublisher publisher;
+
+    public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+        this.parser = parser;
+        this.publisher = publisher;
+    }
+
+    public void handleCancel(String consumerTag) {
+    }
+
+    public void handleCancelOk(String consumerTag) {
+    }
+
+    public void handleConsumeOk(String consumerTag) {
+    }
+
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) {
+
+        logger.debug("job update for: " + envelope.getRoutingKey());
+        String message = new String(body);
+        message = message.replaceAll("(?m)^", "    ");
+        // Here we parse the message and get the job status and push it
+        // to the Event bus, this will be picked by
+//        AiravataJobStatusUpdator and store in to registry
+
+        logger.debug("************************************************************");
+        logger.debug("AMQP Message recieved \n" + message);
+        logger.debug("************************************************************");
+        try {
+            String jobID = envelope.getRoutingKey().split("\\.")[0];
+            MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null);
+            monitorID.setStatus(parser.parseMessage(message));
+            publisher.publish(monitorID);
+        } catch (AiravataMonitorException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void handleRecoverOk(String consumerTag) {
+    }
+
+    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
new file mode 100644
index 0000000..72c77d5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.airavata.ComputingActivity;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class JSONMessageParser implements MessageParser {
+    private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
+
+    public JobState parseMessage(String message)throws AiravataMonitorException {
+        /*todo write a json message parser here*/
+        logger.debug(message);
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
+            logger.info(computingActivity.getIDFromEndpoint());
+            List<String> stateList = computingActivity.getState();
+            JobState jobState = null;
+            for (String aState : stateList) {
+                jobState = getStatusFromString(aState);
+            }
+            // we get the last value of the state array
+            return jobState;
+        } catch (IOException e) {
+            throw new AiravataMonitorException(e);
+        }
+    }
+
+private JobState getStatusFromString(String status) {
+        logger.info("parsing the job status returned : " + status);
+        if(status != null){
+            if("ipf:finished".equals(status)){
+                return JobState.COMPLETE;
+            }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
+                return JobState.QUEUED;
+            }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
+                return JobState.ACTIVE;
+            }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
+                return JobState.HELD;
+            } else if ("ipf:suspending".equals(status)) {
+                return JobState.SUSPENDED;
+            }else if ("ipf:failed".equals(status)) {
+                return JobState.FAILED;
+            }else if ("ipf:unknown".equals(status)){
+                return JobState.UNKNOWN;
+            }
+        }
+        return JobState.UNKNOWN;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
new file mode 100644
index 0000000..c4275f1
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SimpleJobFinishConsumer {
+    private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class);
+
+    private List<String> completedJobsFromPush;
+
+    public SimpleJobFinishConsumer(List<String> completedJobsFromPush) {
+        this.completedJobsFromPush = completedJobsFromPush;
+    }
+
+    public void listen() {
+        try {
+            String queueName = ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950");
+            String uri = "amqp://localhost";
+
+            ConnectionFactory connFactory = new ConnectionFactory();
+            connFactory.setUri(uri);
+            Connection conn = connFactory.newConnection();
+            logger.info("--------Created the connection to Rabbitmq server successfully-------");
+
+            final Channel ch = conn.createChannel();
+
+            logger.info("--------Created the channel with Rabbitmq server successfully-------");
+
+            ch.queueDeclare(queueName, false, false, false, null);
+
+            logger.info("--------Declare the queue " + queueName + " in Rabbitmq server successfully-------");
+
+            final QueueingConsumer consumer = new QueueingConsumer(ch);
+            ch.basicConsume(queueName, consumer);
+            (new Thread() {
+                public void run() {
+                    try {
+                        while (true) {
+                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                            String message = new String(delivery.getBody());
+                            logger.info("---------------- Job Finish message received:" + message + " --------------");
+                            synchronized (completedJobsFromPush) {
+                                completedJobsFromPush.add(message);
+                            }
+                            ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+                        }
+                    } catch (Exception ex) {
+                        logger.error("--------Cannot connect to a RabbitMQ Server--------" , ex);
+                    }
+                }
+
+            }).start();
+        } catch (Exception ex) {
+            logger.error("Cannot connect to a RabbitMQ Server: " , ex);
+            logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
new file mode 100644
index 0000000..a701326
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class UnRegisterWorker{
+    private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
+    private Map<String, Channel> availableChannels;
+
+    public UnRegisterWorker(Map<String, Channel> channels) {
+        this.availableChannels = channels;
+    }
+
+    @Subscribe
+    private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException {
+        String channelID = CommonUtils.getChannelID(monitorID);
+        if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
+            Channel channel = availableChannels.get(channelID);
+            if (channel == null) {
+                logger.error("Already Unregistered the listener");
+                throw new AiravataMonitorException("Already Unregistered the listener");
+            } else {
+                try {
+                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+                    channel.close();
+                    channel.getConnection().close();
+                    availableChannels.remove(channelID);
+                } catch (IOException e) {
+                    logger.error("Error unregistering the listener");
+                    throw new AiravataMonitorException("Error unregistering the listener");
+                }
+            }
+        }
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
new file mode 100644
index 0000000..6a4ed3b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultSaslConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+
+public class AMQPConnectionUtil {
+    private final static Logger logger = LoggerFactory.getLogger(AMQPConnectionUtil.class);
+    public static Connection connect(List<String>hosts,String vhost, String proxyFile) {
+        Collections.shuffle(hosts);
+        for (String host : hosts) {
+            Connection connection = connect(host, vhost, proxyFile);
+            if (host != null) {
+                System.out.println("connected to " + host);
+                return connection;
+            }
+        }
+        return null;
+    }
+
+    public static Connection connect(String host, String vhost, String proxyFile) {
+        Connection connection;
+        try {
+            String keyPassPhrase = "test123";
+            KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+            kmf.init(ks, keyPassPhrase.toCharArray());
+
+            KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+            tmf.init(tks);
+
+            SSLContext c = SSLContext.getInstance("SSLv3");
+            c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+            ConnectionFactory factory = new ConnectionFactory();
+            factory.setHost(host);
+            factory.setPort(5671);
+            factory.useSslProtocol(c);
+            factory.setVirtualHost(vhost);
+            factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
+
+            connection = factory.newConnection();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return null;
+        }
+        return connection;
+    }
+
+}


Mime
View raw message