airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samin...@apache.org
Subject [1/2] git commit: refactor to load monitor listeners to load from the server settings
Date Thu, 17 Apr 2014 20:18:53 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 9edda85fa -> 56574dd19


refactor to load monitor listeners to load from the server settings


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0c0b6d4a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0c0b6d4a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0c0b6d4a

Branch: refs/heads/master
Commit: 0c0b6d4a6ea0eaddfa220291ee6961964a7c8c34
Parents: eeb2c0d
Author: Saminda Wijeratne <samindaw@gmail.com>
Authored: Thu Apr 17 13:18:18 2014 -0700
Committer: Saminda Wijeratne <samindaw@gmail.com>
Committed: Thu Apr 17 13:18:18 2014 -0700

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |  2 +-
 .../monitor/AbstractActivityMonitorClient.java  | 27 +++++++++
 .../job/monitor/AiravataJobStatusUpdator.java   | 28 +++++----
 .../airavata/job/monitor/MonitorManager.java    | 60 +++++++++++++++-----
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 21 ++++---
 5 files changed, 103 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/0c0b6d4a/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 2768f62..193e9c8 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-
+activity.monitors=org.apache.airavata.job.monitor.AiravataJobStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/0c0b6d4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
new file mode 100644
index 0000000..9124cc3
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+
+public interface AbstractActivityMonitorClient {
+	public void setup(Object...configurations);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0c0b6d4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index b755e16..37045a8 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -20,34 +20,27 @@
 */
 package org.apache.airavata.job.monitor;
 
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Calendar;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.eventbus.Subscribe;
 
-public class AiravataJobStatusUpdator{
+public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{
     private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
 
     private Registry airavataRegistry;
 
     private BlockingQueue<MonitorID> jobsToMonitor;
 
-
-    public AiravataJobStatusUpdator(Registry airavataRegistry, BlockingQueue<MonitorID>
jobsToMonitor) {
-        this.airavataRegistry = airavataRegistry;
-        this.jobsToMonitor = jobsToMonitor;
-    }
-
     public Registry getAiravataRegistry() {
         return airavataRegistry;
     }
@@ -128,4 +121,17 @@ public class AiravataJobStatusUpdator{
         details.setJobID(jobID);
         airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details,
ids);
     }
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof BlockingQueue<?>){
+				this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/0c0b6d4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 45b7230..ac5b6d1 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -20,8 +20,14 @@
 */
 package org.apache.airavata.job.monitor;
 
-import com.google.common.eventbus.EventBus;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.job.monitor.core.Monitor;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.core.PushMonitor;
@@ -39,10 +45,7 @@ import org.apache.airavata.schemas.gfac.SSHHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.eventbus.EventBus;
 
 /*
 this is the manager class for monitoring system of airavata,
@@ -68,20 +71,16 @@ public class MonitorManager {
     private MonitorPublisher monitorPublisher;
 
     private Monitor localJobMonitor;
+    
+    private List<AbstractActivityMonitorClient> activityMonitors;
 
+    private Registry registry;
 
     /**
      * This will initialize the major monitoring system.
      */
     public MonitorManager() {
-        pullMonitors = new ArrayList<PullMonitor>();
-        pushMonitors = new ArrayList<PushMonitor>();
-        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
-        pushQueue = new LinkedBlockingQueue<MonitorID>();
-        finishQueue = new LinkedBlockingQueue<MonitorID>();
-        localJobQueue = new LinkedBlockingQueue<MonitorID>();
-        monitorPublisher = new MonitorPublisher(new EventBus());
-        registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), getFinishQueue()));
+    	this(new RegistryImpl());
     }
 
     public MonitorManager(Registry registry) {
@@ -92,7 +91,40 @@ public class MonitorManager {
         finishQueue = new LinkedBlockingQueue<MonitorID>();
         localJobQueue = new LinkedBlockingQueue<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
-        registerListener(new AiravataJobStatusUpdator(registry, getFinishQueue()));
+        this.registry = registry;
+        loadActivityMonitors();
+    }
+    
+    private void loadActivityMonitors(){
+		try {
+			activityMonitors=new ArrayList<AbstractActivityMonitorClient>();
+			String activityMonitorsString = ServerSettings.getSetting("activity.monitors");
+			if (activityMonitorsString!=null){
+				String[] activityMonitorClasses = activityMonitorsString.split(",");
+				for (String activityMonitorClassName : activityMonitorClasses) {
+					Class<?> classInstance;
+					try {
+						classInstance = MonitorManager.class
+						        .getClassLoader().loadClass(activityMonitorClassName);
+						AbstractActivityMonitorClient monitor=(AbstractActivityMonitorClient)classInstance.newInstance();
+						monitor.setup(registry,getFinishQueue());
+						activityMonitors.add(monitor);
+						registerListener(monitor);
+					} catch (ClassNotFoundException e) {
+						logger.error("Error while locating activity monitor implementation \""+activityMonitorClassName+"\"!!!",e);
+					} catch (InstantiationException e) {
+						logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+					} catch (IllegalAccessException e) {
+						logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+					} catch (ClassCastException e){
+						logger.error("Invalid activity monitor \""+activityMonitorClassName+"\"!!!",e);
+					}
+				}
+			}
+		} catch (ApplicationSettingsException e1) {
+			logger.warn("Error in reading activity monitors!!!", e1);
+		}
+		
     }
     /**
      * This can be use to add an empty AMQPMonitor object to the monitor system

http://git-wip-us.apache.org/repos/asf/airavata/blob/0c0b6d4a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 058bcd0..addedbb 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -20,15 +20,17 @@
 */
 package org.apache.airavata.job.monitor.impl.push.amqp;
 
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
@@ -39,9 +41,10 @@ import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
 
 /**
  * This is the implementation for AMQP based finishQueue, this uses


Mime
View raw message