airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: fixing monitoring issue - AIRAVATA-1023
Date Tue, 11 Mar 2014 15:10:00 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 2e8dbe16f -> 58872cec2


fixing monitoring issue - AIRAVATA-1023


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/58872cec
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/58872cec
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/58872cec

Branch: refs/heads/master
Commit: 58872cec2dcc121995d5db0541251a179d5b019e
Parents: 2e8dbe1
Author: lahiru <lahiru@apache.org>
Authored: Tue Mar 11 11:09:52 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Mar 11 11:09:52 2014 -0400

----------------------------------------------------------------------
 .../airavata/job/monitor/core/MessageParser.java   |  3 ++-
 .../job/monitor/impl/pull/qstat/QstatMonitor.java  |  3 ++-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java    | 17 ++++++++++++-----
 .../job/monitor/impl/push/amqp/BasicConsumer.java  |  7 ++++++-
 .../monitor/impl/push/amqp/JSONMessageParser.java  | 13 ++++++++++++-
 .../airavata/common/utils/ServerSettings.java      |  9 +++++++++
 .../main/resources/conf/airavata-server.properties |  3 ++-
 .../orchestrator/server/OrchestratorServer.java    |  4 ++--
 .../server/OrchestratorServerHandler.java          |  8 ++++++++
 .../org/apache/airavata/server/ServerMain.java     |  1 +
 10 files changed, 56 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index cd827ca..c70e372 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.core;
 
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
 
 /**
@@ -40,5 +41,5 @@ public interface MessageParser {
      * @param monitorID monitorID object
      * @return
      */
-    JobStatus parseMessage(String message,MonitorID monitorID);
+    JobStatus parseMessage(String message,MonitorID monitorID)throws AiravataMonitorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index 1978ad8..5168da0 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.impl.pull.qstat;
 
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PullMonitor;
@@ -69,7 +70,7 @@ public class QstatMonitor extends PullMonitor {
         monitoring
          */
         this.startPulling = true;
-        while (this.startPulling) {
+        while (this.startPulling || !ServerSettings.isStopAllThreads()) {
             try {
                 startPulling();
                 // After finishing one iteration of the full queue this thread sleeps 1 second

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 0929acb..06d21a1 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -23,6 +23,7 @@ package org.apache.airavata.job.monitor.impl.push.amqp;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -34,10 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -125,7 +123,7 @@ public class AMQPMonitor extends PushMonitor {
     public void run() {
         // before going to the while true mode we start unregister thread
         startRegister = true; // this will be unset by someone else
-        while (startRegister) {
+        while (startRegister || !ServerSettings.isStopAllThreads()) {
             try {
                 MonitorID take = runningQueue.take();
                 this.registerListener(take);
@@ -137,6 +135,15 @@ public class AMQPMonitor extends PushMonitor {
                 e.printStackTrace();
             }
         }
+        Set<String> strings = availableChannels.keySet();
+        for(String key:strings) {
+            Channel channel = availableChannels.get(key);
+            try {
+                channel.close();
+            } catch (IOException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index f6704ca..ad25b95 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -27,6 +27,7 @@ import com.rabbitmq.client.ShutdownSignalException;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,11 @@ public class BasicConsumer implements Consumer {
         // Here we parse the message and get the job status and push it
         // to the Event bus, this will be picked by
         // AiravataJobStatusUpdator and store in to registry
-        publisher.publish(parser.parseMessage(message,monitorID));
+        try {
+            publisher.publish(parser.parseMessage(message,monitorID));
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
     }
 
     public void handleRecoverOk(java.lang.String consumerTag) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index d281c0f..f91176b 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -22,16 +22,27 @@ package org.apache.airavata.job.monitor.impl.push.amqp;
 
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.MessageParser;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class JSONMessageParser implements MessageParser {
     private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
 
-    public JobStatus parseMessage(String message, MonitorID monitorID) {
+    public JobStatus parseMessage(String message, MonitorID monitorID)throws AiravataMonitorException{
         /*todo write a json message parser here*/
         logger.info("Mesage parse invoked");
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            mapper.readTree(message);
+        } catch (IOException e) {
+            throw new AiravataMonitorException(e);
+        }
         return new JobStatus();
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 7766e8e..8685e35 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -56,6 +56,7 @@ public class ServerSettings extends ApplicationSettings{
     private static final String REGISTRY_DB_DRIVER ="registry.jdbc.driver";
     private static final String ENABLE_HTTPS = "enable.https";
     private static final String HOST_SCHEDULER = "host.scheduler";
+    private static boolean stopAllThreads = false;
 
     public static String getSystemUser() throws ApplicationSettingsException{
     	return getSetting(SYSTEM_USER);
@@ -171,4 +172,12 @@ public class ServerSettings extends ApplicationSettings{
     public static String getHostScheduler() throws ApplicationSettingsException {
         return getSetting(HOST_SCHEDULER);
     }
+
+    public static boolean isStopAllThreads() {
+        return stopAllThreads;
+    }
+
+    public static void setStopAllThreads(boolean stopAllThreads) {
+        ServerSettings.stopAllThreads = stopAllThreads;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
index d0c94c9..74220c2 100644
--- a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
+++ b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
@@ -256,7 +256,8 @@ TwoPhase=true
 ###---------------------------Monitoring module Configurations---------------------------###
 #This will be the primary monitoring tool which runs in airavata, in future there will be
multiple monitoring
 #mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor
+#,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
 #This is the amqp related configuration and this lists down the Rabbitmq host, this is an
xsede specific configuration
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
index 5df526b..1f9a15d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -27,6 +27,7 @@ import org.apache.airavata.common.utils.IServer.ServerStatus;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.util.Constants;
 import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.server.TSimpleServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
@@ -44,7 +45,6 @@ public class OrchestratorServer implements IServer{
 
 	private TSimpleServer server;
 
-    public static final String TESTARGUMENTTOHANDLER = "testing";
 	public OrchestratorServer() {
 		setStatus(ServerStatus.STOPPED);
 	}
@@ -103,7 +103,7 @@ public class OrchestratorServer implements IServer{
 
 	@Override
 	public void stop() throws Exception {
-		if (server!=null && server.isServing()){
+        if (server!=null && server.isServing()){
 			setStatus(ServerStatus.STOPING);
 			server.stop();
 		}

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 0ac5ed7..9dc561b 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -191,6 +191,14 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface
{
         return true;
     }
 
+    public MonitorManager getMonitorManager() {
+        return monitorManager;
+    }
+
+    public void setMonitorManager(MonitorManager monitorManager) {
+        this.monitorManager = monitorManager;
+    }
+
     @Override
     public boolean terminateExperiment(String experimentId) throws TException {
         return false;

http://git-wip-us.apache.org/repos/asf/airavata/blob/58872cec/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 41d2f16..3a25884 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -110,6 +110,7 @@ public class ServerMain {
 			}
 		}
 		if (hasStopRequested()){
+            ServerSettings.setStopAllThreads(true);
 			stopAllServers();
 			System.exit(0);
 		}


Mime
View raw message