airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/3] git commit: GFac pull monitor Write job count to zookeeper
Date Fri, 12 Sep 2014 02:43:42 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 68e81ef80 -> 45f0d68fd


GFac pull monitor Write job count to zookeeper


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/78ad2ef5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/78ad2ef5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/78ad2ef5

Branch: refs/heads/master
Commit: 78ad2ef58b04051bc5c41791b7c03f90da914e5c
Parents: 92ad9f1
Author: shamrath <shameerainfo@gmail.com>
Authored: Thu Sep 11 21:35:41 2014 -0400
Committer: shamrath <shameerainfo@gmail.com>
Committed: Thu Sep 11 21:35:41 2014 -0400

----------------------------------------------------------------------
 .../apache/airavata/common/utils/Constants.java |   2 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 109 ++++++++++++++++++-
 2 files changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index b8f999a..8335e0c 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -46,4 +46,6 @@ public final class Constants {
     public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
     public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
     public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
+    public static final String STAT = "stat";
+    public static final String JOB = "job";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/78ad2ef5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 93e1aa9..3742179 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.gfac.monitor.impl.pull.qstat;
 
+import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Date;
@@ -28,9 +29,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -48,10 +53,15 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.JobStatus;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -242,6 +252,7 @@ public class HPCPullMonitor extends PullMonitor {
             for (MonitorID completedJob : completedJobs) {
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
             }
+            updateZkWithJobCount(take , completedJobs);
         } catch (InterruptedException e) {
             if (!this.queue.contains(take)) {
                 try {
@@ -304,6 +315,102 @@ public class HPCPullMonitor extends PullMonitor {
         return true;
     }
 
+    /**
+     * Build the /stat/{username}/{hostAddress}/job znode path and store job count
+     * @param userMonitorData
+     * @param completedJobs
+     * @throws ApplicationSettingsException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID>
completedJobs) {
+        try {
+            final CountDownLatch latch = new CountDownLatch(1);
+            ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher()
{
+                @Override
+                public void process(WatchedEvent event) {
+                    if (event.getState() == Event.KeeperState.SyncConnected) {
+                        latch.countDown();
+                    }
+                }
+            });
+            latch.await();
+
+            try {
+                List<String> updatedPathList = new ArrayList<String>();
+                String pathToUserName = new StringBuilder("/").append(Constants.STAT)
+                        .append("/").append(userMonitorData.getUserName()).toString();
+                StringBuilder jobPathBuilder;
+                for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) {
+                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+                            .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+                    checkAndCreateZNode(zk, jobPathBuilder.toString());
+                    int jobCount = 0;
+                    List<MonitorID> idList = hostData.getMonitorIDs();
+                    if (idList != null) {
+                        jobCount = idList.size();
+                        // removed already updated jobs from complete jobs
+                        for (MonitorID monitorID : idList) {
+                            if (completedJobs.contains(monitorID)) {
+                                completedJobs.remove(monitorID);
+                            }
+                        }
+                    }
+                    zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(),
-1);
+                    updatedPathList.add(jobPathBuilder.toString());
+                }
+
+                //handle completed jobs
+                /* If all jobs are completed in a host then monitor queue remove such hosts
from monitoring ,but we need
+                     to update those host's stat with JobCount 0 */
+                for (MonitorID monitorID : completedJobs) {
+                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
+                            .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
+                    zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1);
+                    updatedPathList.add(jobPathBuilder.toString());
+                }
+                // trigger orchestrator watcher by saving the updated list to zookeeper
+                if (updatedPathList.size() > 0) {
+                    StringBuilder strBuilder = new StringBuilder();
+                    for (String updatedPath : updatedPathList) {
+                        strBuilder.append(updatedPath).append(":");
+                    }
+                    strBuilder.deleteCharAt(strBuilder.length() - 1);
+                    zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(),
-1);
+                }
+                zk.close();
+            } catch (KeeperException e) {
+                logger.error("Error while storing job count to zookeeper", e);
+            } catch (InterruptedException e) {
+                logger.error("Error while storing job count to zookeeper", e);
+            }
+        } catch (IOException e) {
+            logger.error("Error while connecting to the zookeeper server", e);
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error while getting zookeeper hostport property", e);
+        } catch (InterruptedException e) {
+            logger.error("Error while waiting for SyncConnected message" , e);
+        }
+
+    }
+
+    /**
+     * Check whether znode is exist in given path if not create a new znode
+     * @param zk - zookeeper instance
+     * @param path - path to check znode
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException,
InterruptedException {
+        if (zk.exists(path, null) == null) { // if znode doesn't exist
+            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode
and check parent exist
+                checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+            }
+            zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//
create a znode
+        }
+    }
+
 
     /**
      * This is the method to stop the polling process


Mime
View raw message