mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1131670 - in /incubator/mesos/trunk/frameworks/torque: qstat.py torquesched.py
Date Sun, 05 Jun 2011 04:55:36 GMT
Author: benh
Date: Sun Jun  5 04:55:36 2011
New Revision: 1131670

URL: http://svn.apache.org/viewvc?rev=1131670&view=rev
Log:
qstat.py wraps qstat and returns an array of Job objects. Checkpoint of progress on torquesched.py,
which is probably broken, it now looks into the queue to decide whether to grab more resources
to release some

Added:
    incubator/mesos/trunk/frameworks/torque/qstat.py
Modified:
    incubator/mesos/trunk/frameworks/torque/torquesched.py

Added: incubator/mesos/trunk/frameworks/torque/qstat.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/qstat.py?rev=1131670&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/qstat.py (added)
+++ incubator/mesos/trunk/frameworks/torque/qstat.py Sun Jun  5 04:55:36 2011
@@ -0,0 +1,36 @@
+import xml.dom.minidom
+
+from subprocess import *
+
+class Job:
+  def __init__(self, xmlJobElt):
+    print "creating a new Job obj out of xml"
+    print "xml element that was passed should be named 'Job', is it?: " + xmlJobElt.nodeName
+    self.resourceList = {}
+    for res in xmlJobElt.getElementsByTagName("Resource_List")[0].childNodes:
+      self.resourceList[res.nodeName] = res.childNodes[0].data
+
+def getActiveJobs():
+  print "in getJobs, grabbing xml output from qstat"
+  jobs = Popen("qstat -x", shell=True, stdout=PIPE).stdout
+  #print "output of qstat: "
+  #for line in qstat:
+  #  print line
+  dom_doc = xml.dom.minidom.parse(jobs)
+  print "grabbing the Job elements from the xml dom doc"
+  xmljobs = dom_doc.getElementsByTagName("Job")
+  jobs = []
+  print "creating a new job object for each job dom elt"
+  for j in xmljobs:
+    jobs.append(Job(j))
+  return jobs
+
+#TODO: DELETE THIS? Might note be used eventually
+def getQueueLength():
+  #print "computing the number of active jobs in the queue"
+  qstat = Popen("qstat -Q",shell=True,stdout=PIPE).stdout
+  jobcount = 0
+  for line in qstat:
+     if re.match('^batch.*', line):
+       jobcount = int(line.split()[5]) + int(line.split()[6]) + int(line.spli
+  return jobcount

Modified: incubator/mesos/trunk/frameworks/torque/torquesched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquesched.py?rev=1131670&r1=1131669&r2=1131670&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquesched.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquesched.py Sun Jun  5 04:55:36 2011
@@ -9,10 +9,12 @@ import Queue
 import threading
 import re
 import socket
+import qstat
 
 from optparse import OptionParser
 from subprocess import *
 from socket import gethostname
+#from qstat import *
 
 PBS_SERVER_FILE = "/var/spool/torque/server_name"
 
@@ -27,6 +29,7 @@ class MyScheduler(nexus.Scheduler):
     self.ip = ip 
     self.servers = {}
     self.overloaded = False
+    self.numToRegister = 1
   
   def getExecutorInfo(self, driver):
     execPath = os.path.join(os.getcwd(), "start_pbs_mom.sh")
@@ -45,22 +48,23 @@ class MyScheduler(nexus.Scheduler):
     for offer in slave_offers:
       # if we haven't registered this node, accept slot & register w pbs_server
       #TODO: check to see if slot is big enough 
-      if self.queueLength() == 0:
-        print "Rejecting slot, job queue is empty"
+      if self.numToRegister <= 0:
+        print "Rejecting slot, job queue is empty (only keep one slave around)"
         continue
       if offer.host in self.servers.values():
         print "Rejecting slot, already registered node " + offer.host
         continue
-      if len(self.servers) >= SAFE_ALLOCATION:
-        print "Rejecting slot, already at safe allocation"
+      if len(self.servers) >= SAFE_ALLOCATION["cpus"]:
+        print "Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"]
         continue
       print "Accepting slot, setting up params for it..."
-      params = {"cpus": "%d" % 1, "mem": "%d" % 1073741824}
+      params = {"cpus": "1", "mem": "1073741824"}
       td = nexus.TaskDescription(
           self.id, offer.slaveId, "task %d" % self.id, params, "")
       tasks.append(td)
       self.servers[self.id] = offer.host
       self.regComputeNode(offer.host)
+      self.numToRegister -= 1
       self.id += 1
       print "self.id now set to " + str(self.id)
     print "---"
@@ -94,23 +98,27 @@ class MyScheduler(nexus.Scheduler):
     print("removing node from pbs_server: qmgr -c delete node " + node_name)
     print Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout
   
+  #leave one node so that the queue can accept new jobs
   def unregAllNodes(self):
     for key, val in self.servers.items():
-      print "unregistering node " + str(val)
-      self.unregComputeNode(val)
-      self.servers.pop(key)
+      if (self.servers) > 1:
+        print "unregistering node " + str(val)
+        self.unregComputeNode(val)
+        self.servers.pop(key)
   
+  #unreg up to N random compute nodes, leave at least one
+  def unregNNodes(self, num_nodes):
+    print "unregistering %d nodes" % num_nodes
+    for key, val in self.servers.items():
+      if (self.servers) > 1 and num_nodes > 0:
+        print "unregistering node " + str(val)
+        self.unregComputeNode(val)
+        self.servers.pop(key)
+        num_nodes = num_nodes - 1
+
   def getFrameworkName(self, driver):
-    return "Nexus torque Framework"
-  
-  def queueLength(self):
-    #print "computing the number of active jobs in the queue"
-    qstat = Popen("qstat -Q",shell=True,stdout=PIPE).stdout
-    jobcount = 0
-    for line in qstat:
-       if re.match('^batch.*', line):
-         jobcount = int(line.split()[5]) + int(line.split()[6]) + int(line.split()[7]) +
int(line.split()[8])
-    return jobcount
+    return "Nexus Torque Framework"
+
 
 def monitor(sched):
   while True:
@@ -121,14 +129,31 @@ def monitor(sched):
       #print "no incomplete jobs in queue, attempting to release all slots"
       if len(sched.servers) == 0:
         #print "no servers registered, so no need to call unregAllNodes()"
-        #print "monitor thread releasing lock"
-        #print "\n"
+        print "monitor thread releasing lock"
+        print "\n"
         sched.lock.release()
         continue
       print "unregistering all nodes because no jobs running"
       sched.unregAllNodes()
-    #else:
-      #TODO: if num servers currently registered > num needed by head of queue, drop to
num needed 
+    else:
+      #adjust num servers registered to match num needed 
+      #by frist N jobs in #the queue. 
+      print "computing num nodes needed to satisfy first N jobs in queue"
+      needed = 0
+      jobs = qstat.getActiveJobs()
+      print "retreived jobs in queue, count: %d" % len(jobs)
+      for j in jobs:
+        #WARNING: this check should only be used if torque is using fifo queue
+        #if needed + j.needsnodes <= SAFE_ALLOCATION:
+        needed += j.resourceList["neednodes"]
+      print "number of nodes needed by jobs in queue: %d" % needed
+      numToRelease = len(self.servers) - needed
+      if numToRelease > 0:
+        sched.unregNNodes(numToRelease)
+        sched.numToRegister = 0
+      else:
+        print "monitor updating sched.numToRelease from %d to %d" % (sched.numToRegister,
numToRelease * -1)
+        sched.numToRegister = numToRelease * -1
     sched.lock.release()
     print "monitor thread releasing lock"
     print "\n"
@@ -153,7 +178,7 @@ if __name__ == "__main__":
   FILE.write(fqdn)
   FILE.close()
 
-  time.sleep(1)
+  time.sleep(2)
   print "starting pbs_server"
   #Popen("/etc/init.d/pbs_server start", shell=True)
   Popen("pbs_server", shell=True)
@@ -161,7 +186,7 @@ if __name__ == "__main__":
   print "running killall pbs_sched"
   Popen("killall pbs_sched", shell=True)
   
-  time.sleep(1)
+  time.sleep(2)
   print "starting pbs_scheduler"
   #Popen("/etc/init.d/pbs_sched start", shell=True)
   Popen("pbs_sched", shell=True)



Mime
View raw message