airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: implement GridPushMonitorHandler
Date Thu, 22 May 2014 20:27:52 GMT
Repository: airavata
Updated Branches:
  refs/heads/master f06e75f4c -> bb1cecbae


implement GridPushMonitorHandler


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/249a390f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/249a390f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/249a390f

Branch: refs/heads/master
Commit: 249a390f30ba4446d81db4b19da3f39692fe998e
Parents: a365830
Author: Udara <y.b.n.udara@gmail.com>
Authored: Fri May 23 01:46:12 2014 +0530
Committer: Udara <y.b.n.udara@gmail.com>
Committed: Fri May 23 01:46:12 2014 +0530

----------------------------------------------------------------------
 .../gsissh/provider/impl/GSISSHProvider.java    |  17 ++-
 .../handlers/GridPushMonitorHandler.java        | 105 +++++++++++++++++++
 2 files changed, 119 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/249a390f/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index eb3b4ce..c9b3aa1 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -99,20 +99,31 @@ public class GSISSHProvider extends AbstractProvider {
             // to perform monitoring, daemon handlers can be accessed from anywhere
             List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
             ThreadedHandler pullMonitorHandler = null;
+            ThreadedHandler pushMonitorHandler = null;
+            String monitorMode = ((GsisshHostType) host).getMonitorMode();
             for(ThreadedHandler threadedHandler:daemonHandlers){
                 if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
                     pullMonitorHandler = threadedHandler;
-                    String monitorMode = ((GsisshHostType) host).getMonitorMode();
                     if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
                         log.info("Job is launched successfully now parsing it to monitoring
in pull mode, JobID Returned:  " + jobID);
                         pullMonitorHandler.invoke(jobExecutionContext);
                     }else{
-                        log.error("Currently we only support Pull monitoring");
+                        log.error("Currently we only support Pull and Push monitoring and
monitorMode should be PULL" +
+                                " to handle by the GridPullMonitorHandler");
+                    }
+                }else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())){
+                    pushMonitorHandler = threadedHandler;
+                    if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)){
+                        log.info("Job is launched successfully now parsing it to monitoring
in push mode, JobID Returned:  " + jobID);
+                        pushMonitorHandler.invoke(jobExecutionContext);
+                    }else{
+                        log.error("Currently we only support Pull and Push monitoring and
monitorMode should be PUSH" +
+                                " to handle by the GridPushMonitorHandler");
                     }
                 }
                 // have to handle the GridPushMonitorHandler logic
             }
-            if(pullMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())){
+            if(pullMonitorHandler == null && pushMonitorHandler==null &&
ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())){
                 log.error("No Daemon handler is configured in gfac-config.xml, either pull
or push, so monitoring will not invoked" +
                         ", execution is configured as asynchronous, so Outhandler will not
be invoked");
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/249a390f/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
new file mode 100644
index 0000000..66f8467
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *   this handler is responsible monitoring jobs in push mode
+ *   and currently this support multiple push monitoring in grid resource
+ */
+public class GridPushMonitorHandler extends ThreadedHandler {
+    private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class);
+
+    private AMQPMonitor amqpMonitor;
+
+    private AuthenticationInfo authenticationInfo;
+
+    @Override
+    public void initProperties(Properties properties) throws GFacHandlerException {
+        String myProxyUser=null;
+        try{
+            myProxyUser = ServerSettings.getSetting("myproxy.username");
+            String myProxyPass = ServerSettings.getSetting("myproxy.password");
+            String certPath = ServerSettings.getSetting("trusted.cert.location");
+            String myProxyServer = ServerSettings.getSetting("myproxy.server");
+            setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass,
myProxyServer,
+                    7512, 17280000, certPath));
+
+            String hostList=(String)properties.get("hosts");
+            String proxyFilePath = ServerSettings.getSetting("proxy.file.path");
+            String connectionName=ServerSettings.getSetting("connection.name");
+            LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
+            LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
+            List<String> hosts= Arrays.asList(hostList.split(","));
+            amqpMonitor=new AMQPMonitor(GFacImpl.getMonitorPublisher(),pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
+        }catch (ApplicationSettingsException e){
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void run() {
+        amqpMonitor.run();
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
+        super.invoke(jobExecutionContext);
+        MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
+        amqpMonitor.getRunningQueue().add(monitorID);
+    }
+
+    public AMQPMonitor getAmqpMonitor() {
+        return amqpMonitor;
+    }
+
+    public void setAmqpMonitor(AMQPMonitor amqpMonitor) {
+        this.amqpMonitor = amqpMonitor;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+}


Mime
View raw message