airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [8/9] Separating gfac-monitoring implementation
Date Thu, 01 May 2014 18:29:26 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
deleted file mode 100644
index a481564..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-
-/*
-This is the object which contains the data to identify a particular
-Job to start the monitoring
-*/
-public class MonitorID {
-    private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
-
-    private String userName;
-
-    private Timestamp jobStartedTime;
-
-    private Timestamp lastMonitored;
-
-    private HostDescription host;
-
-    private AuthenticationInfo authenticationInfo = null;
-
-    private Map<String, Object> parameters;
-
-    private String experimentID;
-
-    private String workflowNodeID;
-
-    private String taskID;
-
-    private String jobID;
-
-    private int failedCount = 0;
-
-    private JobState state;
-
-    private JobExecutionContext jobExecutionContext;
-
-    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
-        this.host = host;
-        this.jobStartedTime = new Timestamp((new Date()).getTime());
-        this.userName = userName;
-        this.jobID = jobID;
-        this.taskID = taskID;
-        this.experimentID = experimentID;
-        this.workflowNodeID = workflowNodeID;
-    }
-
-    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
-        this.host = host;
-        this.jobStartedTime = new Timestamp((new Date()).getTime());
-        this.authenticationInfo = authenticationInfo;
-        this.userName = userName;
-        // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
-        if(this.authenticationInfo != null){
-            if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
-                this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
-            }
-        }
-        this.workflowNodeID = workflowNodeID;
-        this.jobID = jobID;
-        this.taskID = taskID;
-        this.experimentID = experimentID;
-    }
-    public HostDescription getHost() {
-        return host;
-    }
-
-    public void setHost(HostDescription host) {
-        this.host = host;
-    }
-
-    public Timestamp getLastMonitored() {
-        return lastMonitored;
-    }
-
-    public void setLastMonitored(Timestamp lastMonitored) {
-        this.lastMonitored = lastMonitored;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getJobID() {
-        return jobID;
-    }
-
-    public void setJobID(String jobID) {
-        this.jobID = jobID;
-    }
-
-    public Timestamp getJobStartedTime() {
-        return jobStartedTime;
-    }
-
-    public void setJobStartedTime(Timestamp jobStartedTime) {
-        this.jobStartedTime = jobStartedTime;
-    }
-
-    public AuthenticationInfo getAuthenticationInfo() {
-        return authenticationInfo;
-    }
-
-    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
-        this.authenticationInfo = authenticationInfo;
-    }
-
-    public void addParameter(String key,Object value) {
-        this.parameters.put(key, value);
-    }
-
-    public Object getParameter(String key) {
-        return this.parameters.get(key);
-    }
-
-    public Map<String, Object> getParameters() {
-        return parameters;
-    }
-
-    public void setParameters(Map<String, Object> parameters) {
-        this.parameters = parameters;
-    }
-
-    public String getExperimentID() {
-        return experimentID;
-    }
-
-    public void setExperimentID(String experimentID) {
-        this.experimentID = experimentID;
-    }
-
-    public String getTaskID() {
-        return taskID;
-    }
-
-    public void setTaskID(String taskID) {
-        this.taskID = taskID;
-    }
-
-    public int getFailedCount() {
-        return failedCount;
-    }
-
-    public void setFailedCount(int failedCount) {
-        this.failedCount = failedCount;
-    }
-
-    public JobState getStatus() {
-        return state;
-    }
-
-    public void setStatus(JobState status) {
-        // this logic is going to be useful for fast finishing jobs
-        // because in some machines job state vanishes quicckly when the job is done
-        // during that case job state comes as unknown.so we handle it here.
-            if (this.state != null && status.equals(JobState.UNKNOWN)) {
-                if (getFailedCount() > 2) {
-                    switch (this.state) {
-                        case ACTIVE:
-                            this.state = JobState.COMPLETE;
-                            break;
-                        case QUEUED:
-                            this.state = JobState.COMPLETE;
-                            break;
-                    }
-                } else {
-                    try {
-                        // when state becomes unknown we sleep for a while
-                        Thread.sleep(10000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                    setFailedCount(getFailedCount() + 1);
-                }
-            } else {
-                // normal scenario
-                this.state = status;
-            }
-    }
-
-	public String getWorkflowNodeID() {
-		return workflowNodeID;
-	}
-
-	public void setWorkflowNodeID(String workflowNodeID) {
-		this.workflowNodeID = workflowNodeID;
-	}
-
-    public JobExecutionContext getJobExecutionContext() {
-        return jobExecutionContext;
-    }
-
-    public void setJobExecutionContext(JobExecutionContext jobExecutionContext) {
-        this.jobExecutionContext = jobExecutionContext;
-    }
-
-    //	public String getWorkflowNodeID() {
-//		return workflowNodeID;
-//	}
-//
-//	public void setWorkflowNodeID(String workflowNodeID) {
-//		this.workflowNodeID = workflowNodeID;
-//	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
deleted file mode 100644
index b703a0a..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.monitor.core.Monitor;
-import org.apache.airavata.gfac.monitor.core.PullMonitor;
-import org.apache.airavata.gfac.monitor.core.PushMonitor;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.impl.LocalJobMonitor;
-import org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.SSHHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/*
-this is the manager class for monitoring system of airavata,
-This simply handle the monitoring flow of the system.
-Keeps available jobs to monitor in a queue and once they are done
-remove them from the queue, this will be done by AiravataJobUpdator.
- */
-public class MonitorManager {
-    private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
-    
-	private final static String ACTIVITY_LISTENERS = "activity.listeners";
-
-    private List<PullMonitor> pullMonitors;    //todo though we have a List we only support one at a time
-
-    private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
-
-    private BlockingQueue<UserMonitorData> pullQueue;
-
-    private BlockingQueue<MonitorID> pushQueue;
-
-    private BlockingQueue<MonitorID> localJobQueue;
-
-    private BlockingQueue<MonitorID> finishQueue;
-
-    private MonitorPublisher monitorPublisher;
-
-    private Monitor localJobMonitor;
-
-    private Registry registry;
-
-    /**
-     * This will initialize the major monitoring system.
-     */
-    public MonitorManager() {
-    	this(new RegistryImpl());
-    }
-
-    public MonitorManager(Registry registry) {
-        pullMonitors = new ArrayList<PullMonitor>();
-        pushMonitors = new ArrayList<PushMonitor>();
-        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
-        pushQueue = new LinkedBlockingQueue<MonitorID>();
-        finishQueue = new LinkedBlockingQueue<MonitorID>();
-        localJobQueue = new LinkedBlockingQueue<MonitorID>();
-        monitorPublisher = new MonitorPublisher(new EventBus());
-        this.registry = registry;
-        loadActivityMonitors();
-    }
-
-    private void loadActivityMonitors(){
-		try {
-			String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
-			if (activityListenersString!=null){
-				String[] activityListenerClasses = activityListenersString.split(",");
-				for (String activityListenerClassName : activityListenerClasses) {
-					try {
-						activityListenerClassName=activityListenerClassName.trim();
-						Class<?>  classInstance = MonitorManager.class
-						        .getClassLoader().loadClass(activityListenerClassName);
-						AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
-						registerListener(monitor);
-					} catch (ClassNotFoundException e) {
-						logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
-					} catch (InstantiationException e) {
-						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
-					} catch (IllegalAccessException e) {
-						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
-					} catch (ClassCastException e){
-						logger.error("Invalid activity monitor \""+activityListenerClassName+"\"!!!",e);
-					}
-				}
-			}
-		} catch (ApplicationSettingsException e1) {
-			logger.warn("Error in reading activity monitors!!!", e1);
-		}
-
-    }
-    /**
-     * This can be use to add an empty AMQPMonitor object to the monitor system
-     * and tihs method will take care of the initialization
-     * todo may be we need to move this to some other class
-     * @param monitor
-     */
-    public void addAMQPMonitor(AMQPMonitor monitor) {
-        monitor.setPublisher(this.getMonitorPublisher());
-        monitor.setFinishQueue(this.getFinishQueue());
-        monitor.setRunningQueue(this.getPushQueue());
-        addPushMonitor(monitor);
-    }
-
-
-    /**
-     * This can be use to add an empty AMQPMonitor object to the monitor system
-     * and tihs method will take care of the initialization
-     * todo may be we need to move this to some other class
-     * @param monitor
-     */
-    public void addLocalMonitor(LocalJobMonitor monitor) {
-        monitor.setPublisher(this.getMonitorPublisher());
-        monitor.setJobQueue(this.getLocalJobQueue());
-        localJobMonitor = monitor;
-    }
-
-    /**
-     * This can be used to adda a QstatMonitor and it will take care of
-     * the initialization of QstatMonitor
-     * //todo may be we need to move this to some other class
-     * @param qstatMonitor
-     */
-    public void addQstatMonitor(QstatMonitor qstatMonitor) {
-        qstatMonitor.setPublisher(this.getMonitorPublisher());
-        qstatMonitor.setQueue(this.getPullQueue());
-        addPullMonitor(qstatMonitor);
-
-    }
-
-    /**
-     * To deal with the statuses users can write their own listener and implement their own logic
-     *
-     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
-     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
-     */
-    public void registerListener(Object listener) {
-        monitorPublisher.registerListener(listener);
-        if (listener instanceof AbstractActivityListener){
-        	((AbstractActivityListener)listener).setup(registry, getFinishQueue(), getMonitorPublisher(), this);
-        }
-    }
-
-    public void registerListener(AbstractActivityListener listener) {
-    	registerListener((Object)listener);
-    }
-
-    /**
-     * To remove listeners of changing statuses
-     *
-     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
-     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
-     */
-    public void unregisterListener(Object listener) {
-        monitorPublisher.unregisterListener(listener);
-    }
-
-    /**
-     * todo write
-     *
-     * @param monitor
-     */
-    public void addPushMonitor(PushMonitor monitor) {
-        pushMonitors.add(monitor);
-    }
-
-    /**
-     * todo write
-     *
-     * @param monitor
-     */
-    public void addPullMonitor(PullMonitor monitor) {
-        pullMonitors.add(monitor);
-    }
-
-
-    /**
-     * Adding this method will trigger the thread in launchMonitor and notify it
-     * This is going to be useful during the startup of the launching process
-     * @param monitorID
-     * @throws AiravataMonitorException
-     * @throws InterruptedException
-     */
-    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
-
-        if (monitorID.getHost().getType() instanceof GsisshHostType) {
-            GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
-            if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
-                    || Constants.PULL.equals(host.getMonitorMode())) {
-                CommonUtils.addMonitortoQueue(pullQueue, monitorID);
-            } else if (Constants.PUSH.equals(host.getMonitorMode())) {
-                pushQueue.put(monitorID);
-                finishQueue.put(monitorID);
-            }
-        } else if(monitorID.getHost().getType() instanceof GlobusHostType){
-            logger.error("Monitoring does not support GlubusHostType resources");
-        } else if(monitorID.getHost().getType() instanceof SSHHostType) {
-            logger.error("Monitoring does not support GlubusHostType resources");
-            localJobQueue.add(monitorID);
-        } else {
-            // we assume this is a type of localJobtype
-            localJobQueue.add(monitorID);
-        }
-    }
-
-    /**
-     * This method should be invoked before adding any elements to monitorQueue
-     * In this method we assume that we give higher preference to Push
-     * Monitorig mechanism if there's any configured, otherwise Pull
-     * monitoring will be launched.
-     * Ex: If there's a reasource which doesn't support Push, we have
-     * to live with Pull MOnitoring.
-     *
-     * @throws AiravataMonitorException
-     */
-    public void launchMonitor() throws AiravataMonitorException {
-        //no push monitor is configured so we launch pull monitor
-        if(localJobMonitor != null){
-            (new Thread(localJobMonitor)).start();
-        }
-
-        for (PullMonitor monitor : pullMonitors) {
-            (new Thread(monitor)).start();
-        }
-
-        //todo fix this
-        for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).start();
-        }
-    }
-
-    /**
-     * This method should be invoked before adding any elements to monitorQueue
-     * In this method we assume that we give higher preference to Push
-     * Monitorig mechanism if there's any configured, otherwise Pull
-     * monitoring will be launched.
-     * Ex: If there's a reasource which doesn't support Push, we have
-     * to live with Pull MOnitoring.
-     *
-     * @throws AiravataMonitorException
-     */
-    public void stopMonitor() throws AiravataMonitorException {
-        //no push monitor is configured so we launch pull monitor
-        if(localJobMonitor != null){
-            (new Thread(localJobMonitor)).interrupt();
-        }
-
-        for (PullMonitor monitor : pullMonitors) {
-            (new Thread(monitor)).interrupt();
-        }
-
-        //todo fix this
-        for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).interrupt();
-        }
-    }
-    /* getter setters for the private variables */
-
-    public List<PullMonitor> getPullMonitors() {
-        return pullMonitors;
-    }
-
-    public void setPullMonitors(List<PullMonitor> pullMonitors) {
-        this.pullMonitors = pullMonitors;
-    }
-
-    public List<PushMonitor> getPushMonitors() {
-        return pushMonitors;
-    }
-
-    public void setPushMonitors(List<PushMonitor> pushMonitors) {
-        this.pushMonitors = pushMonitors;
-    }
-
-    public BlockingQueue<UserMonitorData> getPullQueue() {
-        return pullQueue;
-    }
-
-    public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
-        this.pullQueue = pullQueue;
-    }
-
-    public MonitorPublisher getMonitorPublisher() {
-        return monitorPublisher;
-    }
-
-    public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
-        this.monitorPublisher = monitorPublisher;
-    }
-
-    public BlockingQueue<MonitorID> getFinishQueue() {
-        return finishQueue;
-    }
-
-    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
-        this.finishQueue = finishQueue;
-    }
-
-    public BlockingQueue<MonitorID> getPushQueue() {
-        return pushQueue;
-    }
-
-    public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
-        this.pushQueue = pushQueue;
-    }
-
-    public BlockingQueue<MonitorID> getLocalJobQueue() {
-        return localJobQueue;
-    }
-
-    public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
-        this.localJobQueue = localJobQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
deleted file mode 100644
index c6d386e..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor;
-
-public class TaskIdentity extends WorkflowNodeIdentity {
-	private String taskId;
-
-	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
-		super(experimentId,workflowNodeId);
-		setTaskId(taskId);
-	}
-	public String getTaskId() {
-		return taskId;
-	}
-
-	public void setTaskId(String taskId) {
-		this.taskId = taskId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
deleted file mode 100644
index 022d17c..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is the datastructure to keep the user centric job data, rather keeping
- * the individual jobs we keep the jobs based on the each user
- */
-public class UserMonitorData {
-    private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
-
-    private String  userName;
-
-    private List<HostMonitorData> hostMonitorData;
-
-
-    public UserMonitorData(String userName) {
-        this.userName = userName;
-        hostMonitorData = new ArrayList<HostMonitorData>();
-    }
-
-    public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
-        this.hostMonitorData = hostMonitorDataList;
-        this.userName = userName;
-    }
-
-    public List<HostMonitorData> getHostMonitorData() {
-        return hostMonitorData;
-    }
-
-    public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
-        this.hostMonitorData = hostMonitorData;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    /*
-    This method will add element to the MonitorID list, user should not
-    duplicate it, we do not check it because its going to be used by airavata
-    so we have to use carefully and this method will add a host if its a new host
-     */
-    public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
-        this.hostMonitorData.add(hostMonitorData);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
deleted file mode 100644
index e569c52..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor;
-
-public class WorkflowNodeIdentity extends ExperimentIdentity {
-	private String workflowNodeID;
-	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
-		super(experimentId);
-		setWorkflowNodeID(workflowNodeId);
-	}
-	public String getWorkflowNodeID() {
-		return workflowNodeID;
-	}
-
-	public void setWorkflowNodeID(String workflowNodeID) {
-		this.workflowNodeID = workflowNodeID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
deleted file mode 100644
index f19decf..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor.command;
-
-public class ExperimentCancelRequest {
-	private String experimentId;
-
-	public ExperimentCancelRequest(String experimentId) {
-		this.experimentId = experimentId;
-	}
-
-	public String getExperimentId() {
-		return experimentId;
-	}
-
-	public void setExperimentId(String experimentId) {
-		this.experimentId = experimentId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
deleted file mode 100644
index b45e01c..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor.command;
-
-public class TaskCancelRequest {
-	private String experimentId;
-	private String nodeId;
-	private String taskId;
-	
-	public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
-		this.experimentId = experimentId;
-		this.setNodeId(nodeId);
-		this.taskId = taskId;
-	}
-	public String getExperimentId() {
-		return experimentId;
-	}
-	public void setExperimentId(String experimentId) {
-		this.experimentId = experimentId;
-	}
-	public String getTaskId() {
-		return taskId;
-	}
-	public void setTaskId(String taskId) {
-		this.taskId = taskId;
-	}
-	public String getNodeId() {
-		return nodeId;
-	}
-	public void setNodeId(String nodeId) {
-		this.nodeId = nodeId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
deleted file mode 100644
index da6baf8..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the abstract Monitor which needs to be used by
- * any Monitoring implementation which expect nto consume
- * to store the status to registry. Because they have to
- * use the MonitorPublisher to publish the monitoring statuses
- * to the Event Bus. All the Monitor statuses publish to the eventbus
- * will be saved to the Registry.
- */
-public abstract class AiravataAbstractMonitor implements Monitor {
-    private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
-    protected MonitorPublisher publisher;
-
-    public MonitorPublisher getPublisher() {
-        return publisher;
-    }
-
-    public void setPublisher(MonitorPublisher publisher) {
-        this.publisher = publisher;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
deleted file mode 100644
index a003f55..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-/**
- * This is an interface to implement messageparser, it could be
- * pull based or push based still monitor has to parse the content of
- * the message it gets from remote monitoring system and finalize
- * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
- * for pull based monitor.
- */
-public interface MessageParser {
-    /**
-     * This method is to implement how to parse the incoming message
-     * and implement a logic to finalize the status of the job,
-     * we have to makesure the correct message is given to the messageparser
-     * parse method, it will not do any filtering
-     * @param message content of the message
-     * @return
-     */
-    JobState parseMessage(String message)throws AiravataMonitorException;
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
deleted file mode 100644
index d2fede5..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-
-/**
- * This is the primary interface for Monitors,
- * This can be used to implement different methods of monitoring
- */
-public interface Monitor extends Runnable {
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
deleted file mode 100644
index efdf89c..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-
-/**
- * PullMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This will allow users to program Pull monitors separately
- */
-public abstract class PullMonitor extends AiravataAbstractMonitor {
-
-    private int pollingFrequence;
-    /**
-     * This method will can invoke when PullMonitor needs to start
-     * and it has to invoke in the frequency specified below,
-     * @return if the start process is successful return true else false
-     */
-    public abstract boolean startPulling() throws AiravataMonitorException;
-
-    /**
-     * This is the method to stop the polling process
-     * @return if the stopping process is successful return true else false
-     */
-    public abstract boolean stopPulling()throws AiravataMonitorException;
-
-    /**
-     * this method can be used to set the polling frequencey or otherwise
-     * can implement a polling mechanism, and implement how to do
-     * @param frequence
-     */
-    public void setPollingFrequence(int frequence){
-        this.pollingFrequence = frequence;
-    }
-
-    /**
-     * this method can be used to get the polling frequencey or otherwise
-     * can implement a polling mechanism, and implement how to do
-     * @return
-     */
-    public int getPollingFrequence(){
-        return this.pollingFrequence;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
deleted file mode 100644
index 8e13252..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-
-/**
- * PushMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This interface will allow users to program Push monitors separately
- */
-public abstract class PushMonitor extends AiravataAbstractMonitor {
-    /**
-     * This method can be invoked to register a listener with the
-     * remote monitoring system, ideally inside this method users will be
-     * writing some client listener code for the remote monitoring system,
-     * this will be a simple wrapper around any client for the remote Monitor.
-     * @param monitorID
-     * @return
-     */
-    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
-
-    /**
-     * This method can be invoked to unregister a listener with the
-     * remote monitoring system, ideally inside this method users will be
-     * writing some client listener code for the remote monitoring system,
-     * this will be a simple wrapper around any client for the remote Monitor.
-     * @param monitorID
-     * @return
-     */
-    public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
-
-    /**
-     * This can be used to stop the registration thread
-     * @return
-     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
-     */
-    public abstract boolean stopRegister()throws AiravataMonitorException;
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
deleted file mode 100644
index 52487fe..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.event;
-
-import com.google.common.eventbus.EventBus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MonitorPublisher{
-    private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
-    private EventBus eventBus;
-    
-    public MonitorPublisher(EventBus eventBus) {
-        this.eventBus = eventBus;
-    }
-
-    public void registerListener(Object listener) {
-        eventBus.register(listener);
-    }
-    
-    public void unregisterListener(Object listener) {
-        eventBus.unregister(listener);
-    }
-
-    public void publish(Object o) {
-        eventBus.post(o);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
deleted file mode 100644
index 3acef66..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.exception;
-
-public class AiravataMonitorException extends Exception {
-    private static final long serialVersionUID = -2849422320139467602L;
-
-    public AiravataMonitorException(Throwable e) {
-        super(e);
-    }
-
-    public AiravataMonitorException(String message) {
-        super(message, null);
-    }
-
-    public AiravataMonitorException(String message, Throwable e) {
-        super(message, e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
deleted file mode 100644
index a64b484..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.monitor.JobIdentity;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor;
-import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor can be used to monitor a job which runs locally,
- * Since its a local job job doesn't have states, once it get executed
- * then the job starts running
- */
-public class LocalJobMonitor extends AiravataAbstractMonitor {
-    // Though we have a qeuue here, it not going to be used in local jobs
-    BlockingQueue<MonitorID> jobQueue;
-
-    public void run() {
-        do {
-            try {
-                MonitorID take = jobQueue.take();
-                getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } while (!ServerSettings.isStopAllThreads());
-    }
-
-    public BlockingQueue<MonitorID> getJobQueue() {
-        return jobQueue;
-    }
-
-    public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
-        this.jobQueue = jobQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
deleted file mode 100644
index 020d9ae..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.monitor.JobIdentity;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.UserMonitorData;
-import org.apache.airavata.gfac.monitor.core.PullMonitor;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor is based on qstat command which can be run
- * in grid resources and retrieve the job status.
- */
-public class QstatMonitor extends PullMonitor {
-    private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
-
-    // I think this should use DelayedBlocking Queue to do the monitoring*/
-    private BlockingQueue<UserMonitorData> queue;
-
-    private boolean startPulling = false;
-
-    private Map<String, ResourceConnection> connections;
-
-    private MonitorPublisher publisher;
-
-    public QstatMonitor(){
-        connections = new HashMap<String, ResourceConnection>();
-    }
-    public QstatMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
-        this.queue = queue;
-        this.publisher = publisher;
-        connections = new HashMap<String, ResourceConnection>();
-    }
-
-    public void run() {
-        /* implement a logic to pick each monitorID object from the queue and do the
-        monitoring
-         */
-        this.startPulling = true;
-        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
-            try {
-                startPulling();
-                // After finishing one iteration of the full queue this thread sleeps 1 second
-                Thread.sleep(10000);
-            } catch (Exception e){
-                // we catch all the exceptions here because no matter what happens we do not stop running this
-                // thread, but ideally we should report proper error messages, but this is handled in startPulling
-                // method, incase something happen in Thread.sleep we handle it with this catch block.
-                e.printStackTrace();
-                logger.error(e.getMessage());
-            }
-        }
-        // thread is going to return so we close all the connections
-        Iterator<String> iterator = connections.keySet().iterator();
-        while(iterator.hasNext()){
-            String next = iterator.next();
-            ResourceConnection resourceConnection = connections.get(next);
-            try {
-                resourceConnection.getCluster().disconnect();
-            } catch (SSHApiException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-        }
-    }
-
-    /**
-     * This method will can invoke when PullMonitor needs to start
-     * and it has to invoke in the frequency specified below,
-     *
-     * @return if the start process is successful return true else false
-     */
-    public boolean startPulling() throws AiravataMonitorException {
-        // take the top element in the queue and pull the data and put that element
-        // at the tail of the queue
-        //todo this polling will not work with multiple usernames but with single user
-        // and multiple hosts, currently monitoring will work
-        UserMonitorData take = null;
-        JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
-        MonitorID currentMonitorID = null;
-        HostDescription currentHostDescription = null;
-        try {
-            take = this.queue.take();
-            List<MonitorID> completedJobs = new ArrayList<MonitorID>();
-            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
-            for (HostMonitorData iHostMonitorData : hostMonitorData) {
-                if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
-                    currentHostDescription = iHostMonitorData.getHost();
-                    GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
-                    String hostName = gsisshHostType.getHostAddress();
-                    ResourceConnection connection = null;
-                    if (connections.containsKey(hostName)) {
-                        logger.debug("We already have this connection so not going to create one");
-                        connection = connections.get(hostName);
-                    } else {
-                        connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
-                        connections.put(hostName, connection);
-                    }
-                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
-                    Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
-                    for (MonitorID iMonitorID : monitorID) {
-                        currentMonitorID = iMonitorID;
-                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
-                        jobStatus = new JobStatusChangeRequest(iMonitorID);
-                        // we have this JobStatus class to handle amqp monitoring
-
-                        publisher.publish(jobStatus);
-                        // if the job is completed we do not have to put the job to the queue again
-                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-
-                        // After successful monitoring perform following actions to cleanup the queue, if necessary
-                        if (jobStatus.getState().equals(JobState.COMPLETE)) {
-                            completedJobs.add(iMonitorID);
-                        } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {
-                            logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
-                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            completedJobs.add(iMonitorID);
-                        } else {
-                            // Evey
-                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            // if the job is complete we remove it from the Map, if any of these maps
-                            // get empty this userMonitorData will get delete from the queue
-                        }
-                    }
-                } else {
-                    logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
-                }
-            }
-            // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
-            // now the userMonitorData goes back to the tail of the queue
-            queue.put(take);
-            // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
-            // they become empty
-            for(MonitorID completedJob:completedJobs){
-                CommonUtils.removeMonitorFromQueue(queue, completedJob);
-            }
-        } catch (InterruptedException e) {
-            if (!this.queue.contains(take)) {
-                try {
-                    this.queue.put(take);
-                } catch (InterruptedException e1) {
-                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
-            throw new AiravataMonitorException(e);
-        } catch (SSHApiException e) {
-            logger.error(e.getMessage());
-            if (e.getMessage().contains("Unknown Job Id Error")) {
-                // in this case job is finished or may be the given job ID is wrong
-                jobStatus.setState(JobState.UNKNOWN);
-                publisher.publish(jobStatus);
-            } else if (e.getMessage().contains("illegally formed job identifier")) {
-                logger.error("Wrong job ID is given so dropping the job from monitoring system");
-            } else if (!this.queue.contains(take)) {   // we put the job back to the queue only if its state is not unknown
-                if (currentMonitorID == null) {
-                    logger.error("Monitoring the jobs failed, for user: " + take.getUserName()
-                            + " in Host: " + currentHostDescription.getType().getHostAddress());
-                } else {
-                    if (currentMonitorID != null) {
-                        if (currentMonitorID.getFailedCount() < 2) {
-                            try {
-                                currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
-                                this.queue.put(take);
-                            } catch (InterruptedException e1) {
-                                e1.printStackTrace();
-                            }
-                        } else {
-                            logger.error(e.getMessage());
-                            logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
-                        }
-                    }
-                }
-            }
-            throw new AiravataMonitorException("Error retrieving the job status", e);
-        } catch (Exception e) {
-            if (currentMonitorID != null) {
-                if (currentMonitorID.getFailedCount() < 3) {
-                    try {
-                        currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
-                        this.queue.put(take);
-                        // if we get a wrong status we wait for a while and request again
-                        Thread.sleep(10000);
-                    } catch (InterruptedException e1) {
-                        e1.printStackTrace();
-                    }
-                } else {
-                    logger.error(e.getMessage());
-                    logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
-                }
-            }
-            throw new AiravataMonitorException("Error retrieving the job status", e);
-        }
-
-
-        return true;
-    }
-
-
-    /**
-     * This is the method to stop the polling process
-     *
-     * @return if the stopping process is successful return true else false
-     */
-    public boolean stopPulling() {
-        this.startPulling = false;
-        return true;
-    }
-
-    public MonitorPublisher getPublisher() {
-        return publisher;
-    }
-
-    public void setPublisher(MonitorPublisher publisher) {
-        this.publisher = publisher;
-    }
-
-    public BlockingQueue<UserMonitorData> getQueue() {
-        return queue;
-    }
-
-    public void setQueue(BlockingQueue<UserMonitorData> queue) {
-        this.queue = queue;
-    }
-
-    public boolean authenticate() {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
deleted file mode 100644
index 7a37b88..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public class ResourceConnection {
-    private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
-
-    private PBSCluster cluster;
-
-    public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
-        AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
-        String hostAddress = monitorID.getHost().getType().getHostAddress();
-        String userName = monitorID.getUserName();
-        String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager();
-        JobManagerConfiguration jConfig = null;
-        if (jobManager == null) {
-            log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
-            jConfig = CommonUtils.getPBSJobManager(installedPath);
-        } else {
-            if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) {
-                jConfig = CommonUtils.getPBSJobManager(installedPath);
-            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) {
-                jConfig = CommonUtils.getSLURMJobManager(installedPath);
-            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(monitorID.getHost())) {
-                jConfig = CommonUtils.getSGEJobManager(installedPath);
-            }
-            //todo support br2 etc
-        }
-        ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort());
-        cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
-    }
-
-    public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
-        AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
-        String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
-        String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
-        JobManagerConfiguration jConfig = null;
-        if (jobManager == null) {
-            log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
-            jConfig = CommonUtils.getPBSJobManager(installedPath);
-        } else {
-            if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
-                jConfig = CommonUtils.getPBSJobManager(installedPath);
-            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
-                jConfig = CommonUtils.getSLURMJobManager(installedPath);
-            }else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) {
-                jConfig = CommonUtils.getSGEJobManager(installedPath);
-            }
-            //todo support br2 etc
-        }
-        ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
-        cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
-    }
-    public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
-        String jobID = monitorID.getJobID();
-        //todo so currently we execute the qstat for each job but we can use user based monitoring
-        //todo or we should concatenate all the commands and execute them in one go and parse the response
-        return getStatusFromString(cluster.getJobStatus(jobID).toString());
-    }
-
-    public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
-        Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
-        Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
-        // creating a sorted map with all the jobIds and with the predefined
-        // status as UNKNOWN
-        for (MonitorID monitorID : monitorIDs) {
-            treeMap.put(monitorID.getJobID(), JobStatus.U);
-        }
-        //todo so currently we execute the qstat for each job but we can use user based monitoring
-        //todo or we should concatenate all the commands and execute them in one go and parse the response
-        cluster.getJobStatuses(userName,treeMap);
-        for(String key:treeMap.keySet()){
-            treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
-        }
-        return treeMap1;
-    }
-    private JobState getStatusFromString(String status) {
-        log.info("parsing the job status returned : " + status);
-        if(status != null){
-            if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){
-                return JobState.COMPLETE;
-            }else if("H".equals(status) || "h".equals(status)){
-                return JobState.HELD;
-            }else if("Q".equals(status) || "qw".equals(status)){
-                return JobState.QUEUED;
-            }else if("R".equals(status)  || "CF".equals(status) || "r".equals(status)){
-                return JobState.ACTIVE;
-            }else if ("T".equals(status)) {
-                return JobState.HELD;
-            } else if ("W".equals(status) || "PD".equals(status)) {
-                return JobState.QUEUED;
-            } else if ("S".equals(status)) {
-                return JobState.SUSPENDED;
-            }else if("CA".equals(status)){
-                return JobState.CANCELED;
-            }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) {
-                return JobState.FAILED;
-            }else if ("PR".equals(status) || "Er".equals(status)) {
-                return JobState.FAILED;
-            }else if ("U".equals(status)){
-                return JobState.UNKNOWN;
-            }
-        }
-        return JobState.UNKNOWN;
-    }
-
-    public PBSCluster getCluster() {
-        return cluster;
-    }
-
-    public void setCluster(PBSCluster cluster) {
-        this.cluster = cluster;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
deleted file mode 100644
index fbf6e21..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.monitor.JobIdentity;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.PushMonitor;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This is the implementation for AMQP based finishQueue, this uses
- * rabbitmq client to recieve AMQP based monitoring data from
- * mostly excede resources.
- */
-public class AMQPMonitor extends PushMonitor {
-    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-
-    /* this will keep all the channels available in the system, we do not create
-      channels for all the jobs submitted, but we create channels for each user for each
-      host.
-    */
-    private Map<String, Channel> availableChannels;
-
-    private MonitorPublisher publisher;
-
-    private MonitorPublisher localPublisher;
-
-    private BlockingQueue<MonitorID> runningQueue;
-
-    private BlockingQueue<MonitorID> finishQueue;
-
-    private String connectionName;
-
-    private String proxyPath;
-
-    private List<String> amqpHosts;
-
-    private boolean startRegister;
-
-    public AMQPMonitor(){
-
-    }
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
-                       BlockingQueue<MonitorID> finishQueue,
-                       String proxyPath,String connectionName,List<String> hosts) {
-        this.publisher = publisher;
-        this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
-        this.finishQueue = finishQueue;          // these will be initialized by the MonitorManager
-        this.availableChannels = new HashMap<String, Channel>();
-        this.connectionName = connectionName;
-        this.proxyPath = proxyPath;
-        this.amqpHosts = hosts;
-        this.localPublisher = new MonitorPublisher(new EventBus());
-        this.localPublisher.registerListener(this);
-    }
-
-    public void initialize(String proxyPath, String connectionName, List<String> hosts) {
-        this.availableChannels = new HashMap<String, Channel>();
-        this.connectionName = connectionName;
-        this.proxyPath = proxyPath;
-        this.amqpHosts = hosts;
-        this.localPublisher = new MonitorPublisher(new EventBus());
-        this.localPublisher.registerListener(this);
-    }
-
-    @Override
-    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
-        // we subscribe to read user-host based subscription
-        HostDescription host = monitorID.getHost();
-        String hostAddress = host.getType().getHostAddress();
-        // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
-        // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
-        String channelID = CommonUtils.getChannelID(monitorID);
-        if(availableChannels.get(channelID) == null){
-        try {
-            //todo need to fix this rather getting it from a file
-            Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
-            Channel channel = null;
-            channel = connection.createChannel();
-            availableChannels.put(channelID, channel);
-            String queueName = channel.queueDeclare().getQueue();
-
-            BasicConsumer consumer = new
-                    BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
-            channel.basicConsume(queueName, true, consumer);
-            String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
-            // here we queuebind to a particular user in a particular machine
-            channel.queueBind(queueName, "glue2.computing_activity", filterString);
-            logger.info("Using filtering string to monitor: " + filterString);
-        } catch (IOException e) {
-            logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
-        }
-        }
-        return true;
-    }
-
-    public void run() {
-        // before going to the while true mode we start unregister thread
-        startRegister = true; // this will be unset by someone else
-        while (startRegister || !ServerSettings.isStopAllThreads()) {
-            try {
-                MonitorID take = runningQueue.take();
-                this.registerListener(take);
-            } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            } catch (Exception e){
-                e.printStackTrace();
-            }
-        }
-        Set<String> strings = availableChannels.keySet();
-        for(String key:strings) {
-            Channel channel = availableChannels.get(key);
-            try {
-                channel.close();
-            } catch (IOException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-        }
-    }
-
-    @Subscribe
-    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
-        Iterator<MonitorID> iterator = finishQueue.iterator();
-        MonitorID next = null;
-        while(iterator.hasNext()){
-            next = iterator.next();
-            if(next.getJobID().endsWith(monitorID.getJobID())){
-                break;
-            }
-        }
-        if(next == null) {
-            logger.error("Job has removed from the queue, old obsolete message recieved");
-            return false;
-        }
-        String channelID = CommonUtils.getChannelID(next);
-        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
-            finishQueue.remove(next);
-
-            // if this is the last job in the queue at this point with the same username and same host we
-            // close the channel and close the connection and remove it from availableChannels
-            if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
-                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
-                        ", incase new job created we do subscribe again");
-                Channel channel = availableChannels.get(channelID);
-                if (channel == null) {
-                    logger.error("Already Unregistered the listener");
-                    throw new AiravataMonitorException("Already Unregistered the listener");
-                } else {
-                    try {
-                        channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
-                        channel.close();
-                        channel.getConnection().close();
-                        availableChannels.remove(channelID);
-                    } catch (IOException e) {
-                        logger.error("Error unregistering the listener");
-                        throw new AiravataMonitorException("Error unregistering the listener");
-                    }
-                }
-            }
-        }
-        next.setStatus(monitorID.getStatus());
-        publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
-        return true;
-    }
-    @Override
-    public boolean stopRegister() throws AiravataMonitorException {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public Map<String, Channel> getAvailableChannels() {
-        return availableChannels;
-    }
-
-    public void setAvailableChannels(Map<String, Channel> availableChannels) {
-        this.availableChannels = availableChannels;
-    }
-
-    public MonitorPublisher getPublisher() {
-        return publisher;
-    }
-
-    public void setPublisher(MonitorPublisher publisher) {
-        this.publisher = publisher;
-    }
-
-    public BlockingQueue<MonitorID> getRunningQueue() {
-        return runningQueue;
-    }
-
-    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
-        this.runningQueue = runningQueue;
-    }
-
-    public BlockingQueue<MonitorID> getFinishQueue() {
-        return finishQueue;
-    }
-
-    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
-        this.finishQueue = finishQueue;
-    }
-
-    public String getProxyPath() {
-        return proxyPath;
-    }
-
-    public void setProxyPath(String proxyPath) {
-        this.proxyPath = proxyPath;
-    }
-
-    public List<String> getAmqpHosts() {
-        return amqpHosts;
-    }
-
-    public void setAmqpHosts(List<String> amqpHosts) {
-        this.amqpHosts = amqpHosts;
-    }
-
-    public boolean isStartRegister() {
-        return startRegister;
-    }
-
-    public void setStartRegister(boolean startRegister) {
-        this.startRegister = startRegister;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
deleted file mode 100644
index 1d60c45..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.MessageParser;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BasicConsumer implements Consumer {
-    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-    private MessageParser parser;
-
-    private MonitorPublisher publisher;
-
-    public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
-        this.parser = parser;
-        this.publisher = publisher;
-    }
-
-    public void handleCancel(String consumerTag) {
-    }
-
-    public void handleCancelOk(String consumerTag) {
-    }
-
-    public void handleConsumeOk(String consumerTag) {
-    }
-
-    public void handleDelivery(String consumerTag,
-                               Envelope envelope,
-                               AMQP.BasicProperties properties,
-                               byte[] body) {
-
-        logger.debug("job update for: " + envelope.getRoutingKey());
-        String message = new String(body);
-        message = message.replaceAll("(?m)^", "    ");
-        // Here we parse the message and get the job status and push it
-        // to the Event bus, this will be picked by
-//        AiravataJobStatusUpdator and store in to registry
-
-        logger.debug("************************************************************");
-        logger.debug("AMQP Message recieved \n" + message);
-        logger.debug("************************************************************");
-        try {
-            String jobID = envelope.getRoutingKey().split("\\.")[0];
-            MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
-            monitorID.setStatus(parser.parseMessage(message));
-            publisher.publish(monitorID);
-        } catch (AiravataMonitorException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void handleRecoverOk(String consumerTag) {
-    }
-
-    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
-    }
-
-}


Mime
View raw message