airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samin...@apache.org
Subject [1/2] git commit: https://issues.apache.org/jira/browse/AIRAVATA-1145
Date Fri, 18 Apr 2014 12:24:05 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 56574dd19 -> d4e398545


https://issues.apache.org/jira/browse/AIRAVATA-1145


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20306cd0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20306cd0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20306cd0

Branch: refs/heads/master
Commit: 20306cd044679cf5d0f61e4832eae02a2ae69e83
Parents: 56574dd
Author: Saminda Wijeratne <samindaw@gmail.com>
Authored: Fri Apr 18 04:14:02 2014 -0700
Committer: Saminda Wijeratne <samindaw@gmail.com>
Committed: Fri Apr 18 04:14:02 2014 -0700

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |  2 +-
 .../job/monitor/AbstractActivityListener.java   | 27 +++++++++++++
 .../monitor/AbstractActivityMonitorClient.java  | 27 -------------
 .../job/monitor/AiravataJobStatusUpdator.java   | 42 ++++++++++++++++++--
 .../airavata/job/monitor/MonitorManager.java    | 42 ++++++++++----------
 .../SingleAppIntegrationTestBase.java           |  1 +
 .../orchestrator/server/OrchestratorServer.java |  3 --
 .../apache/airavata/job/monitor/MonitorID.java  | 11 ++---
 .../job/monitor/event/MonitorPublisher.java     | 17 ++++++--
 9 files changed, 107 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 193e9c8..13f78d5 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.monitors=org.apache.airavata.job.monitor.AiravataJobStatusUpdator
+activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
new file mode 100644
index 0000000..49927e6
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+
+public interface AbstractActivityListener {
+	public void setup(Object...configurations);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
deleted file mode 100644
index 9124cc3..0000000
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AbstractActivityMonitorClient.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.job.monitor;
-
-
-public interface AbstractActivityMonitorClient {
-	public void setup(Object...configurations);
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 37045a8..ec03d71 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -23,9 +23,12 @@ package org.apache.airavata.job.monitor;
 import java.util.Calendar;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.TaskStatus;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
@@ -34,11 +37,13 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.Subscribe;
 
-public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{
+public class AiravataJobStatusUpdator implements AbstractActivityListener{
     private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
 
     private Registry airavataRegistry;
 
+    private MonitorPublisher monitorPublisher;
+    
     private BlockingQueue<MonitorID> jobsToMonitor;
 
     public Registry getAiravataRegistry() {
@@ -105,9 +110,39 @@ public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{
                     logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
                     jobsToMonitor.remove(jobStatus.getMonitorID());
                     break;
+			default:
+				break;
             }
         }
     }
+    
+    @Subscribe
+    public void setupTaskStatus(JobStatus jobStatus){
+    	TaskState state=TaskState.UNKNOWN;
+    	switch(jobStatus.getState()){
+    	case ACTIVE:
+    		state=TaskState.EXECUTING; break;
+    	case CANCELED:
+    		state=TaskState.CANCELED; break;
+    	case COMPLETE:
+    		state=TaskState.COMPLETED; break;
+    	case FAILED:
+    		state=TaskState.FAILED; break;
+    	case HELD: case SUSPENDED: case QUEUED:
+    		state=TaskState.WAITING; break;
+    	case SETUP:
+    		state=TaskState.PRE_PROCESSING; break;
+    	case SUBMITTED:
+    		state=TaskState.STARTED; break;
+    	case UN_SUBMITTED:
+    		state=TaskState.CANCELED; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Task Status "+state.toString());
+    	monitorPublisher.publish(new TaskStatus(jobStatus.getMonitorID(),state));
+    }
+    
     public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception
{
         CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
         JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
@@ -126,12 +161,13 @@ public class AiravataJobStatusUpdator implements AbstractActivityMonitorClient{
 	@Override
 	public void setup(Object... configurations) {
 		for (Object configuration : configurations) {
-			
 			if (configuration instanceof Registry){
 				this.airavataRegistry=(Registry)configuration;
 			} else if (configuration instanceof BlockingQueue<?>){
 				this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
-			}
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 1929057..ed89230 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -55,6 +55,8 @@ remove them from the queue, this will be done by AiravataJobUpdator.
  */
 public class MonitorManager {
     private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
+    
+	private final static String ACTIVITY_LISTENERS = "activity.listeners";
 
     private List<PullMonitor> pullMonitors;    //todo though we have a List we only
support one at a time
 
@@ -72,7 +74,7 @@ public class MonitorManager {
 
     private Monitor localJobMonitor;
     
-    private List<AbstractActivityMonitorClient> activityMonitors;
+    private List<AbstractActivityListener> activityListeners;
 
     private Registry registry;
 
@@ -97,27 +99,27 @@ public class MonitorManager {
     
     private void loadActivityMonitors(){
 		try {
-			activityMonitors=new ArrayList<AbstractActivityMonitorClient>();
-			String activityMonitorsString = ServerSettings.getSetting("activity.monitors");
-			if (activityMonitorsString!=null){
-				String[] activityMonitorClasses = activityMonitorsString.split(",");
-				for (String activityMonitorClassName : activityMonitorClasses) {
-					Class<?> classInstance;
+			activityListeners=new ArrayList<AbstractActivityListener>();
+			String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
+			if (activityListenersString!=null){
+				String[] activityListenerClasses = activityListenersString.split(",");
+				for (String activityListenerClassName : activityListenerClasses) {
 					try {
-						classInstance = MonitorManager.class
-						        .getClassLoader().loadClass(activityMonitorClassName);
-						AbstractActivityMonitorClient monitor=(AbstractActivityMonitorClient)classInstance.newInstance();
-						monitor.setup(registry,getFinishQueue());
-						activityMonitors.add(monitor);
+						activityListenerClassName=activityListenerClassName.trim();
+						Class<?>  classInstance = MonitorManager.class
+						        .getClassLoader().loadClass(activityListenerClassName);
+						AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
+						monitor.setup(registry, getFinishQueue(), getMonitorPublisher());
+						activityListeners.add(monitor);
 						registerListener(monitor);
 					} catch (ClassNotFoundException e) {
-						logger.error("Error while locating activity monitor implementation \""+activityMonitorClassName+"\"!!!",e);
+						logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
 					} catch (InstantiationException e) {
-						logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
 					} catch (IllegalAccessException e) {
-						logger.error("Error while initiating activity monitor instance \""+activityMonitorClassName+"\"!!!",e);
+						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
 					} catch (ClassCastException e){
-						logger.error("Invalid activity monitor \""+activityMonitorClassName+"\"!!!",e);
+						logger.error("Invalid activity monitor \""+activityListenerClassName+"\"!!!",e);
 					}
 				}
 			}
@@ -234,7 +236,6 @@ public class MonitorManager {
      */
     public void launchMonitor() throws AiravataMonitorException {
         //no push monitor is configured so we launch pull monitor
-        int index = 0;
         if(localJobMonitor != null){
             (new Thread(localJobMonitor)).start();
         }
@@ -261,18 +262,17 @@ public class MonitorManager {
      */
     public void stopMonitor() throws AiravataMonitorException {
         //no push monitor is configured so we launch pull monitor
-        int index = 0;
         if(localJobMonitor != null){
-            (new Thread(localJobMonitor)).stop();
+            (new Thread(localJobMonitor)).interrupt();
         }
 
         for (PullMonitor monitor : pullMonitors) {
-            (new Thread(monitor)).stop();
+            (new Thread(monitor)).interrupt();
         }
 
         //todo fix this
         for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).stop();
+            (new Thread(monitor)).interrupt();
         }
     }
     /* getter setters for the private variables */

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
index 11e6a77..6592bf8 100644
--- a/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
+++ b/modules/integration-tests/src/test/java/org/apache/airavata/integration/SingleAppIntegrationTestBase.java
@@ -136,6 +136,7 @@ public class SingleAppIntegrationTestBase {
                 Map<String, JobStatus> jobStatuses = null;
                 while (true) {
                     try {
+                    	System.out.println("*********Experiment status*** : "+client.getExperimentStatus(expId));
                         jobStatuses = client.getJobStatuses(expId);
                         Set<String> strings = jobStatuses.keySet();
                         for (String key : strings) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
index 115ea3f..d6ff5c3 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -23,12 +23,9 @@ package org.apache.airavata.orchestrator.server;
 
 import org.apache.airavata.common.utils.IServer;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.IServer.ServerStatus;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.util.Constants;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.server.TSimpleServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index f65241a..718177c 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -20,20 +20,17 @@
 */
 package org.apache.airavata.job.monitor;
 
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.omg.PortableInterceptor.ACTIVE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-import java.util.Properties;
-
 /*
 This is the object which contains the data to identify a particular
 Job to start the monitoring

http://git-wip-us.apache.org/repos/asf/airavata/blob/20306cd0/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index 0f75206..cc85e58 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -20,12 +20,15 @@
 */
 package org.apache.airavata.job.monitor.event;
 
-import com.google.common.eventbus.EventBus;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.ExperimentStatus;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.TaskStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.EventBus;
+
 public class MonitorPublisher {
     private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
     private EventBus eventBus;
@@ -38,10 +41,18 @@ public class MonitorPublisher {
         eventBus.register(listener);
     }
 
-    public void publish(JobStatus jobState) {
-        eventBus.post(jobState);
+    public void publish(JobStatus jobStatus) {
+        eventBus.post(jobStatus);
     }
 
+    public void publish(TaskStatus taskStatus) {
+        eventBus.post(taskStatus);
+    }
+    
+    public void publish(ExperimentStatus experimentStatus) {
+        eventBus.post(experimentStatus);
+    }
+    
     public void publish(MonitorID monitorID){
         eventBus.post(monitorID);
     }


Mime
View raw message