airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] git commit: Optimized zookeeper job count update process to update job addition and job deletion steps
Date Sat, 13 Sep 2014 00:52:47 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 2207eceab -> d1d8759fd


Optimized zookeeper job count update process to update job addition and job deletion steps


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9c23aa81
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9c23aa81
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9c23aa81

Branch: refs/heads/master
Commit: 9c23aa81147268b70472df3001bd04575a457e9e
Parents: e45607a
Author: shamrath <shameerainfo@gmail.com>
Authored: Fri Sep 12 17:24:59 2014 -0400
Committer: shamrath <shameerainfo@gmail.com>
Committed: Fri Sep 12 17:24:59 2014 -0400

----------------------------------------------------------------------
 .../handlers/GridPullMonitorHandler.java        |   1 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 160 +++----------------
 .../airavata/gfac/monitor/util/CommonUtils.java | 122 ++++++++++++++
 3 files changed, 149 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 9ca5235..ff467bf 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -100,6 +100,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements
Watcher{
                 e.printStackTrace();
             }
             CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
+            CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
         } catch (AiravataMonitorException e) {
             logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID());
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index b4c5819..dac9499 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,22 +20,7 @@
 */
 package org.apache.airavata.gfac.monitor.impl.pull.qstat;
 
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -57,16 +42,20 @@ import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.SSHHostType;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.EventBus;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * This monitor is based on qstat command which can be run
@@ -275,10 +264,24 @@ public class HPCPullMonitor extends PullMonitor {
             queue.put(take);
             // cleaning up the completed jobs, this method will remove some of the userMonitorData
from the queue if
             // they become empty
+            Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>();
+            ZooKeeper zk = null;
             for (MonitorID completedJob : completedJobs) {
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
+                if (zk == null) {
+                    zk = completedJob.getJobExecutionContext().getZk();
+                }
+                String key = CommonUtils.getJobCountUpdatePath(completedJob);
+                int i = 0;
+                if (jobRemoveCountMap.containsKey(key)) {
+                    i = Integer.valueOf(jobRemoveCountMap.get(key));
+                }
+                jobRemoveCountMap.put(key, ++i);
+            }
+            if (completedJobs.size() > 0) {
+                // reduce completed job count from zookeeper
+                CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, false);
             }
-//            updateZkWithJobCount(take , completedJobs);
         } catch (InterruptedException e) {
             if (!this.queue.contains(take)) {
                 try {
@@ -342,117 +345,6 @@ public class HPCPullMonitor extends PullMonitor {
     }
 
     /**
-     * Build the /stat/{username}/{hostAddress}/job znode path and store job count
-     *
-     * @param userMonitorData
-     * @param completedJobs
-     * @throws ApplicationSettingsException
-     * @throws IOException
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    private void updateZkWithJobCount(UserMonitorData userMonitorData, List<MonitorID>
completedJobs) {
-        try {
-            final CountDownLatch latch = new CountDownLatch(1);
-            ZooKeeper zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher()
{
-                @Override
-                public void process(WatchedEvent event) {
-                    if (event.getState() == Event.KeeperState.SyncConnected) {
-                        latch.countDown();
-                    }
-                }
-            });
-            latch.await();
-
-            try {
-                List<String> updatedPathList = new ArrayList<String>();
-                String pathToUserName = new StringBuilder("/").append(Constants.STAT)
-                        .append("/").append(userMonitorData.getUserName()).toString();
-                StringBuilder jobPathBuilder;
-                for (HostMonitorData hostData : userMonitorData.getHostMonitorData()) {
-                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
-                            .append(hostData.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
-                    checkAndCreateZNode(zk, jobPathBuilder.toString());
-                    int jobCount = 0;
-                    String jobCountStr = new String(zk.getData(jobPathBuilder.toString(),
null, null));
-                    try {
-                        jobCount = Integer.parseInt(jobCountStr);
-                    } catch (NumberFormatException e) {
-                        // do nothing , keep jobCount 0
-                    }
-                    List<MonitorID> idList = hostData.getMonitorIDs();
-                    boolean updatePath = true;
-                    if (idList != null) {
-                        if (jobCount == idList.size()) {
-                            updatePath = false;
-                        } else {
-                            jobCount = idList.size();
-                        }
-                        // removed already updated jobs from complete jobs
-                        for (MonitorID monitorID : idList) {
-                            if (completedJobs.contains(monitorID)) {
-                                completedJobs.remove(monitorID);
-                            }
-                        }
-                    }
-                    if (updatePath) {
-                        zk.setData(jobPathBuilder.toString(), String.valueOf(jobCount).getBytes(),
-1);
-                        updatedPathList.add(jobPathBuilder.toString());
-                    }
-                }
-
-                //handle completed jobs
-                /* If all jobs are completed in a host then monitor queue remove such hosts
from monitoring ,but we need
-                     to update those host's stat with JobCount 0 */
-                for (MonitorID monitorID : completedJobs) {
-                    jobPathBuilder = new StringBuilder(pathToUserName).append("/")
-                            .append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB);
-                    zk.setData(jobPathBuilder.toString(), "0".getBytes(), -1);
-                    updatedPathList.add(jobPathBuilder.toString());
-                }
-                // trigger orchestrator watcher by saving the updated list to zookeeper
-                if (updatedPathList.size() > 0) {
-                    StringBuilder strBuilder = new StringBuilder();
-                    for (String updatedPath : updatedPathList) {
-                        strBuilder.append(updatedPath).append(":");
-                    }
-                    strBuilder.deleteCharAt(strBuilder.length() - 1);
-                    zk.setData(("/" + Constants.STAT), strBuilder.toString().getBytes(),
-1);
-                }
-                zk.close();
-            } catch (KeeperException e) {
-                logger.error("Error while storing job count to zookeeper", e);
-            } catch (InterruptedException e) {
-                logger.error("Error while storing job count to zookeeper", e);
-            }
-        } catch (IOException e) {
-            logger.error("Error while connecting to the zookeeper server", e);
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error while getting zookeeper hostport property", e);
-        } catch (InterruptedException e) {
-            logger.error("Error while waiting for SyncConnected message", e);
-        }
-
-    }
-
-    /**
-     * Check whether znode is exist in given path if not create a new znode
-     * @param zk - zookeeper instance
-     * @param path - path to check znode
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    private void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException,
InterruptedException {
-        if (zk.exists(path, null) == null) { // if znode doesn't exist
-            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode
and check parent exist
-                checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
-            }
-            zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//
create a znode
-        }
-    }
-
-
-    /**
      * This is the method to stop the polling process
      *
      * @return if the stopping process is successful return true else false

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c23aa81/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 6db4550..9cb544c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,6 +20,9 @@
 */
 package org.apache.airavata.gfac.monitor.util;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -30,12 +33,22 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 
 public class CommonUtils {
     private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class);
@@ -204,4 +217,113 @@ public class CommonUtils {
             }
         }
     }
+
+        /**
+         *  Update job count for a given set of paths.
+         * @param zk - zookeeper instance
+         * @param changeCountMap - map of change job count with relevant path
+         * @param isAdd - Should add or reduce existing job count by the given job count.
+         */
+    public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer>
changeCountMap, boolean isAdd) {
+        StringBuilder changeZNodePaths = new StringBuilder();
+        try {
+            if (zk == null || !zk.getState().isConnected()) {
+                try {
+                    final CountDownLatch countDownLatch = new CountDownLatch(1);
+                    zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher()
{
+                        @Override
+                        public void process(WatchedEvent event) {
+                            countDownLatch.countDown();
+                        }
+                    });
+                    countDownLatch.await();
+                } catch (ApplicationSettingsException e) {
+                    logger.error("Error while reading zookeeper hostport string");
+                } catch (IOException e) {
+                    logger.error("Error while reconnect attempt to zookeeper where zookeeper
connection loss state");
+                }
+            }
+
+            for (String path : changeCountMap.keySet()) {
+                if (isAdd) {
+                    CommonUtils.checkAndCreateZNode(zk, path);
+                }
+                byte[] byteData = zk.getData(path, null, null);
+                String nodeData;
+                if (byteData == null) {
+                    if (isAdd) {
+                        zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(),
-1);
+                    } else {
+                        // This is not possible, but we handle in case there any data zookeeper
communication failure
+                        logger.warn("Couldn't reduce job count in " + path + " as it returns
null data. Hence reset the job count to 0");
+                        zk.setData(path, "0".getBytes(), -1);
+                    }
+                } else {
+                    nodeData = new String(byteData);
+                    if (isAdd) {
+                        zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(),
-1);
+                    } else {
+                        int previousCount = Integer.parseInt(nodeData);
+                        int removeCount = changeCountMap.get(path);
+                        if (previousCount >= removeCount) {
+                            zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(),
-1);
+                        } else {
+                            // This is not possible, do we need to reset the job count to
0 ?
+                            logger.error("Requested remove job count is " + removeCount +
+                                    " which is higher than the existing job count " + previousCount
+                                    + " in  " + path + " path.");
+                        }
+                    }
+                }
+                changeZNodePaths.append(path).append(":");
+            }
+
+            // update stat node to trigger orchestrator watchers
+            if (changeCountMap.size() > 0) {
+                changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
+                zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(),
-1);
+            }
+        } catch (KeeperException e) {
+            logger.error("Error while writing job count to zookeeper", e);
+        } catch (InterruptedException e) {
+            logger.error("Error while writing job count to zookeeper", e);
+        }
+
+    }
+
+    /**
+     * Increase job count by one and update the zookeeper
+     * @param monitorID - Job monitorId
+     */
+    public static void increaseZkJobCount(MonitorID monitorID) {
+        Map<String, Integer> addMap = new HashMap<String, Integer>();
+        addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
+        updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true);
+    }
+
+    /**
+     * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job
+     * @param monitorID - Job monitorId
+     * @return
+     */
+    public static String getJobCountUpdatePath(MonitorID monitorID){
+        return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
+                .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString();
+    }
+
+    /**
+     * Check whether znode is exist in given path if not create a new znode
+     * @param zk - zookeeper instance
+     * @param path - path to check znode
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException,
InterruptedException {
+        if (zk.exists(path, null) == null) { // if znode doesn't exist
+            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode
and check parent exist
+                checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+            }
+            zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//
create a znode
+        }
+    }
 }


Mime
View raw message