hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject svn commit: r608950 [6/7] - in /lucene/hadoop/trunk/src: contrib/hod/ contrib/hod/bin/ contrib/hod/conf/ contrib/hod/hodlib/ contrib/hod/hodlib/AllocationManagers/ contrib/hod/hodlib/Common/ contrib/hod/hodlib/GridServices/ contrib/hod/hodlib/Hod/ cont...
Date Fri, 04 Jan 2008 18:20:22 GMT
Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,169 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+import os, re, time
+from hodlib.Common.threads import loop, func
+from hodlib.Common.threads import simpleCommand
+from hodlib.Common.util import get_exception_string
+
+class JobTrackerMonitor:
+  """This class monitors the JobTracker of an allocated cluster
+     periodically to detect whether it is idle. If it is found
+     to be idle for more than a configured limit, it calls back
+     registered handlers who can act upon the idle cluster."""
+
+  def __init__(self, log, idleJTHandler, interval, limit,
+                      hadoopDir, javaHome, servInfoProvider):
+    self.__log = log
+    self.__idlenessLimit = limit
+    self.__idleJobTrackerHandler = idleJTHandler
+    self.__hadoopDir = hadoopDir
+    hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
+    #hadoop directory can be from pkgs or a temp location like tarball. Verify once.
+    if not os.path.exists(hadoopPath):
+      raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
+    self.__javaHome = javaHome
+    # Note that when this object is created, we don't yet know the JT URL.
+    # The service info provider will be polled until we get the URL.
+    self.__serviceInfoProvider = servInfoProvider
+    self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
+    self.__firstIdleTime = 0
+    #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
+    if not self.__isCompatibleHadoopVersion():
+      raise Exception('Incompatible Hadoop Version: Cannot check status')
+    self.__stopFlag = False
+    self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
+    self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
+                                  sleep=interval)
+    self.__jobTrackerURL = None
+
+  def start(self):
+    """This method starts a thread that will determine the JobTracker URL"""
+    self.__jtURLFinderThread.start()
+
+  def stop(self):
+    self.__log.debug('Joining the monitoring thread.')
+    self.__stopFlag = True
+    if self.__jtMonitorThread.isAlive():
+      self.__jtMonitorThread.join()
+    self.__log.debug('Joined the monitoring thread.')
+
+  def getJobTrackerURL(self):
+    """This method periodically checks the service info provider for the JT URL"""
+    self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
+    while not self.__stopFlag and \
+          (self.__jobTrackerURL is None or \
+            self.__jobTrackerURL == 'not found'):
+      time.sleep(10)
+      if not self.__stopFlag:
+        self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
+      else:
+        break
+
+    if (self.__jobTrackerURL != None) and \
+          (self.__jobTrackerURL != 'not found'):
+      self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
+      self.__jtMonitorThread.start()
+
+  def monitorJobTracker(self):
+    """This method is periodically called to monitor the JobTracker of the cluster."""
+    try:
+      if self.__isIdle():
+        if self.__idleJobTrackerHandler:
+          self.__log.info('Detected cluster as idle. Calling registered callback handler.')
+          self.__idleJobTrackerHandler.handleIdleJobTracker()
+    except:
+      self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
+
+  def __isIdle(self):
+    """This method checks if the JobTracker is idle beyond a certain limit."""
+    if self.__getJobCount() == 0:
+      if self.__firstIdleTime == 0:
+        #detecting idleness for the first time
+        self.__firstIdleTime = time.time()
+      else:
+        if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
+          self.__log.info('Idleness limit crossed for cluster')
+          return True
+    else:
+      # reset idleness time
+      self.__firstIdleTime = 0
+    return False
+
+  def __getJobCount(self):
+    """This method executes the hadoop job -list command and parses the output to detect
+       the number of running jobs."""
+
+    # We assume here that the poll interval is small enough to detect running jobs. 
+    # If jobs start and stop within the poll interval, the cluster would be incorrectly 
+    # treated as idle. Hadoop 2266 will provide a better mechanism than this.
+    jobs = -1
+    jtStatusCommand = self.__initStatusCommand()
+    jtStatusCommand.start()
+    jtStatusCommand.wait()
+    jtStatusCommand.join()
+    if jtStatusCommand.exit_code() == 0:
+      for line in jtStatusCommand.output():
+        match = self.__jobCountRegExp.match(line)
+        if match:
+          jobs = int(match.group(1))
+    return jobs
+
+  def __findHadoopVersion(self):
+    """This method determines the version of hadoop being used by executing the 
+       hadoop version command"""
+    verMap = { 'major' : None, 'minor' : None }
+    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
+    cmd = "%s version" % hadoopPath
+    self.__log.debug('Executing command %s to find hadoop version' % cmd)
+    env = os.environ
+    env['JAVA_HOME'] = self.__javaHome
+    hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
+    hadoopVerCmd.start()
+    hadoopVerCmd.wait()
+    hadoopVerCmd.join()
+    if hadoopVerCmd.exit_code() == 0:
+      verLine = hadoopVerCmd.output()[0]
+      self.__log.debug('Version from hadoop command: %s' % verLine)
+      hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
+      verMatch = hadoopVerRegExp.match(verLine)
+      if verMatch != None:
+        verMap['major'] = verMatch.group(1)
+        verMap['minor'] = verMatch.group(2)
+
+    return verMap
+
+  def __isCompatibleHadoopVersion(self):
+    """This method determines whether the version of hadoop being used is one that 
+       provides the hadoop job -list command or not"""
+    ver = self.__findHadoopVersion()
+    ret = False
+  
+    if (ver['major']!=None) and (int(ver['major']) >= 0) \
+      and (ver['minor']!=None) and (int(ver['minor']) >= 15):
+      ret = True
+
+    return ret
+
+  def __initStatusCommand(self):
+    """This method initializes the command to run to check the JT status"""
+    cmd = None
+    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
+    cmdStr = "%s job -jt %s -list" % (hadoopPath, self.__jobTrackerURL)
+    env = os.environ
+    env['JAVA_HOME'] = self.__javaHome
+    cmd = simpleCommand('HadoopStatus', cmdStr, env)
+    return cmd
+   

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/idleJobTracker.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,895 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+#!/usr/bin/env python
+"""manages services and nodepool"""
+# -*- python -*-
+
+import os, sys, random, time, sets, shutil, threading
+import urllib, urlparse, re, getpass, pprint, signal, shutil
+
+from pprint import pformat
+from HTMLParser import HTMLParser
+
+binfile = sys.path[0]
+libdir = os.path.dirname(binfile)
+sys.path.append(libdir)
+
+import hodlib.Common.logger
+from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor
+
+from hodlib.Common.threads import func 
+
+from hodlib.Hod.nodePool import *
+from hodlib.Common.util import *
+from hodlib.Common.nodepoolutil import NodePoolUtil
+from hodlib.Common.socketServers import hodXMLRPCServer
+from hodlib.Common.socketServers import threadedHTTPServer
+from hodlib.NodePools import *
+from hodlib.NodePools.torque import *
+from hodlib.GridServices import *
+from hodlib.Common.descGenerator import *
+from hodlib.Common.xmlrpc import hodXRClient
+from hodlib.Common.miniHTMLParser import miniHTMLParser
+from hodlib.Common.threads import simpleCommand
+
+class ringMasterServer:
+  """The RPC server that exposes all the master config
+  changes. Also, one of these RPC servers runs as a proxy
+  and all the hodring instances register with this proxy"""
+  instance = None
+  xmlrpc = None
+  
+  def __init__(self, cfg, log, logMasterSources, retry=5):
+    try:
+      from hodlib.Common.socketServers import twistedXMLRPCServer
+      ringMasterServer.xmlrpc = twistedXMLRPCServer("", 
+        cfg['ringmaster']['xrs-port-range'])
+    except ImportError:
+      log.info("Twisted interface not found. Using hodXMLRPCServer.")
+      ringMasterServer.xmlrpc = hodXMLRPCServer("", 
+        cfg['ringmaster']['xrs-port-range'])
+
+    ringMasterServer.xmlrpc.register_instance(logMasterSources)
+    self.logMasterSources = logMasterSources
+    ringMasterServer.xmlrpc.serve_forever()
+        
+    while not ringMasterServer.xmlrpc.is_alive():
+      time.sleep(.5)
+          
+    log.debug('Ringmaster RPC Server at %d' % 
+                 ringMasterServer.xmlrpc.server_address[1])
+    
+  def startService(ss, cfg, np, log, rm):
+    logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)
+    ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)
+
+  def stopService():
+    ringMasterServer.xmlrpc.stop()
+  
+  def getPort():
+    return ringMasterServer.instance.port
+
+  def getAddress():
+    return 'http://%s:%d/' % (socket.gethostname(), 
+                              ringMasterServer.xmlrpc.server_address[1])
+  
+  startService = staticmethod(startService)
+  stopService = staticmethod(stopService)
+  getPort = staticmethod(getPort)
+  getAddress = staticmethod(getAddress)
+  
+class _LogMasterSources:
+  """All the methods that are run by the RPC server are
+  added into this class """
+  
+  def __init__(self, serviceDict, cfg, np, log, rm):
+    self.serviceDict = serviceDict
+    self.tarSource = []
+    self.tarSourceLock = threading.Lock()
+    self.dict = {}
+    self.count = {}
+    self.logsourceList = []
+    self.logsourceListLock = threading.Lock()
+    self.masterParam = []
+    self.masterParamLock = threading.Lock()
+    self.verify = 'none'
+    self.cmdLock = threading.Lock()
+    self.cfg = cfg
+    self.log = log
+    self.np = np
+    self.rm = rm 
+    self.hdfsHost = None
+    self.mapredHost = None
+    self.maxconnect = self.cfg['ringmaster']['max-connect']
+    self.log.debug("Using max-connect value %s"%self.maxconnect)
+
+   
+  def registerTarSource(self, hostname, url, addr=None):
+    self.log.debug("registering: " + url)
+    lock = self.tarSourceLock
+    lock.acquire()
+    self.dict[url] = url
+    self.count[url] = 0
+    # addr is None when ringMaster himself invokes this method
+    if addr:
+      c = self.count[addr]
+      self.count[addr] = c - 1
+    lock.release()
+    if addr:
+      str = "%s is done" % (addr)
+      self.log.debug(str)
+    return url
+
+  def getTarList(self,hodring):   # this looks useful
+    lock = self.tarSourceLock
+    lock.acquire()
+    leastkey = None
+    leastval = -1
+    for k, v in self.count.iteritems():
+      if (leastval  == -1):
+        leastval = v
+        pass
+      if (v <= leastval and v < self.maxconnect):
+        leastkey = k
+        leastval = v
+    if (leastkey == None):
+      url  = 'none'
+    else:
+      url = self.dict[leastkey]
+      self.count[leastkey] = leastval + 1
+      self.log.debug("%s %d" % (leastkey, self.count[leastkey]))
+    lock.release()
+    self.log.debug('sending url ' + url+" to "+hodring)  # this looks useful
+    return url
+
+  def tarDone(self, uri):
+    str = "%s is done" % (uri)
+    self.log.debug(str)
+    lock = self.tarSourceLock
+    lock.acquire()
+    c = self.count[uri]
+    self.count[uri] = c - 1
+    lock.release()
+    return uri
+
+  def status(self):
+    return True
+
+# FIXME: this code is broken, it relies on a central service registry
+#
+#  def clusterStart(self, changedClusterParams=[]):
+#    self.log.debug("clusterStart method invoked.")
+#    self.dict = {}
+#    self.count = {}
+#    try:
+#      if (len(changedClusterParams) > 0):
+#        self.log.debug("Updating config.")
+#        for param in changedClusterParams:
+#          (key, sep1, val) = param.partition('=')
+#          (i1, sep2, i2) = key.partition('.')
+#          try:
+#            prev = self.cfg[i1][i2]
+#            self.rm.cfg[i1][i2] = val
+#            self.cfg[i1][i2] = val
+#            self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))
+#          except KeyError, e:
+#            self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)
+#        self.log.debug("Regenerating Service Description.")
+#        dGen = DescGenerator(self.rm.cfg)
+#        self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()
+#        self.cfg['servicedesc'] = self.rm.cfg['servicedesc']
+#  
+#      self.rm.tar = None
+#      if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):
+#        self.rm.download = True
+#        self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']
+#        self.log.debug("self.rm.tar=%s" % self.rm.tar)
+# 
+#      self.rm.cd_to_tempdir()
+#
+#      self.rm.tarAddress = None 
+#      hostname = socket.gethostname()
+#      if (self.rm.download):
+#        self.rm.basename = os.path.basename(self.rm.tar)
+#        dest = os.path.join(os.getcwd(), self.rm.basename)
+#        src =  self.rm.tar  
+#        self.log.debug("cp %s -> %s" % (src, dest))
+#        shutil.copy(src, dest) 
+#        self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)
+#        self.registerTarSource(hostname, self.rm.tarAddress)
+#        self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)
+#      else:
+#        self.log.debug("Download not set.")
+#      
+#      if (self.rm.tar != None):
+#        self.cfg['hodring']['download-addr'] = self.rm.tarAddress
+#        self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress
+#
+#      sdl = self.rm.cfg['servicedesc']
+#      workDirs = self.rm.getWorkDirs(self.rm.cfg, True)
+#      hdfsDesc = sdl['hdfs']
+#      hdfs = None
+#      if hdfsDesc.isExternal():
+#        hdfs = HdfsExternal(hdfsDesc, workDirs)
+#      else:
+#        hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)
+#    
+#      self.rm.serviceDict[hdfs.getName()] = hdfs
+#      mrDesc = sdl['mapred']
+#      mr = None
+#      if mrDesc.isExternal():
+#        mr = MapReduceExternal(mrDesc, workDirs)
+#      else:
+#        mr = MapReduce(mrDesc, workDirs, 1)
+#      self.rm.serviceDict[mr.getName()] = mr
+#
+#      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
+#        self.np.getServiceId(), 'hodring', 'hod') 
+#    
+#      slaveList = ringList
+#      hdfsringXRAddress = None
+#      # Start HDFS Master - Step 1
+#      if not hdfsDesc.isExternal():
+#        masterFound = False
+#        for ring in ringList:
+#          ringXRAddress = ring['xrs']
+#          if ringXRAddress == None:
+#            raise Exception("Could not get hodring XML-RPC server address.")
+#          if  (ringXRAddress.find(self.hdfsHost) != -1):
+#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
+#            hdfsringXRAddress = ringXRAddress
+#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")
+#            ringClient.clusterStart()
+#            masterFound = True 
+#            slaveList.remove(ring)
+#            break
+#        if not masterFound:
+#          raise Exception("HDFS Master host not found")
+#        while hdfs.getInfoAddrs() == None:
+#          self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")
+#          time.sleep(1)
+#
+#      # Start MAPRED Master - Step 2
+#      if not mrDesc.isExternal():
+#        masterFound = False
+#        for ring in ringList:
+#          ringXRAddress = ring['xrs']
+#          if ringXRAddress == None:
+#            raise Exception("Could not get hodring XML-RPC server address.")
+#          if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):
+#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
+#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")
+#            ringClient.clusterStart()
+#            masterFound = True 
+#            slaveList.remove(ring)
+#            break
+#        if not masterFound:
+#          raise Excpetion("MAPRED Master host not found")
+#        while mr.getInfoAddrs() == None:
+#          self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \
+# mapred.job.tracker.info.port")
+#          time.sleep(1)
+#
+#      # Start Slaves - Step 3 
+#      for ring in slaveList:
+#          ringXRAddress = ring['xrs']
+#          if ringXRAddress == None:
+#            raise Exception("Could not get hodring XML-RPC server address.")
+#          ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
+#          self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")
+#          ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())
+#          ring['thread'] = ringThread
+#          ringThread.start()
+#
+#      for ring in slaveList:
+#        ringThread = ring['thread']
+#        if ringThread == None:
+#          raise Exception("Could not get hodring thread (Slave).")
+#        ringThread.join()
+#        self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")
+#
+#      # Run Admin Commands on HDFS Master - Step 4
+#      if not hdfsDesc.isExternal():
+#        if hdfsringXRAddress == None:
+#          raise Exception("HDFS Master host not found (to Run Admin Commands)")
+#        ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)
+#        self.log.debug("Invoking clusterStart(False) - Admin on "
+#                       + hdfsringXRAddress + " (HDFS Master)")
+#        ringClient.clusterStart(False)
+#
+#    except:
+#      self.log.debug(get_exception_string())
+#      return False
+#
+#    self.log.debug("Successfully started cluster.")
+#    return True
+#
+#  def clusterStop(self):
+#    self.log.debug("clusterStop method invoked.")
+#    try:
+#      hdfsAddr = self.getServiceAddr('hdfs')
+#      if hdfsAddr.find(':') != -1:
+#        h, p = hdfsAddr.split(':', 1)
+#        self.hdfsHost = h
+#        self.log.debug("hdfsHost: " + self.hdfsHost)
+#      mapredAddr = self.getServiceAddr('mapred')
+#      if mapredAddr.find(':') != -1:
+#        h, p = mapredAddr.split(':', 1)
+#        self.mapredHost = h
+#        self.log.debug("mapredHost: " + self.mapredHost)
+#      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
+#                                                      self.np.getServiceId(),
+#                                                      'hodring', 'hod')
+#      for ring in ringList:
+#        ringXRAddress = ring['xrs']
+#        if ringXRAddress == None:
+#          raise Exception("Could not get hodring XML-RPC server address.")
+#        ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)
+#        self.log.debug("Invoking clusterStop on " + ringXRAddress)
+#        ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())
+#        ring['thread'] = ringThread
+#        ringThread.start()
+#
+#      for ring in ringList:
+#        ringThread = ring['thread']
+#        if ringThread == None:
+#          raise Exception("Could not get hodring thread.")
+#        ringThread.join()
+#        self.log.debug("Completed clusterStop on " + ring['xrs'])
+#
+#    except:
+#      self.log.debug(get_exception_string())
+#      return False
+#
+#    self.log.debug("Successfully stopped cluster.")
+#    
+#    return True
+
+  def getCommand(self, addr):
+    """This method is called by the
+    hodrings to get commands from
+    the ringmaster"""
+    lock = self.cmdLock
+    cmdList = []
+    lock.acquire()
+    try:
+      try:
+        for v in self.serviceDict.itervalues():
+          if (not v.isExternal()):
+            if v.isLaunchable(self.serviceDict):
+              if not v.isMasterLaunched():
+                cmdList = v.getMasterCommands(self.serviceDict)
+                v.setlaunchedMaster()
+                v.setMasterAddress(addr)
+                break
+        if cmdList == []:
+          for s in self.serviceDict.itervalues():
+            if (not v.isExternal()):
+              if s.isMasterInitialized():
+                cl = s.getWorkerCommands(self.serviceDict)
+                cmdList.extend(cl)
+              else:
+                cmdList = []
+                break
+      except:
+        self.log.debug(get_exception_string())
+    finally:
+      lock.release()
+      pass
+    
+    cmd = addr + pformat(cmdList)
+    self.log.debug("getCommand returning " + cmd)
+    return cmdList
+  
+  def getAdminCommand(self, addr):
+    """This method is called by the
+    hodrings to get admin commands from
+    the ringmaster"""
+    lock = self.cmdLock
+    cmdList = []
+    lock.acquire()
+    try:
+      try:
+        for v in self.serviceDict.itervalues():
+          cmdList = v.getAdminCommands(self.serviceDict)
+          if cmdList != []:
+            break
+      except Exception, e:
+        self.log.debug(get_exception_string())
+    finally:
+      lock.release()
+      pass
+    cmd = addr + pformat(cmdList)
+    self.log.debug("getAdminCommand returning " + cmd)
+    return cmdList
+
+  def addMasterParams(self, addr, vals):
+    """This method is called by
+    hodring to update any parameters
+    its changed for the commands it was
+    running"""
+    self.log.debug('Comment: adding master params')
+    self.log.debug(pformat(vals))
+    lock = self.masterParamLock
+    lock.acquire()
+    try:
+      for v in self.serviceDict.itervalues():
+        if v.isMasterLaunched():
+          if (v.getMasterAddress() == addr):
+            v.setMasterParams(vals)
+            v.setMasterInitialized()
+
+    except:
+      self.log.debug(get_exception_string())
+      pass
+    lock.release()
+            
+    return addr
+
+  def getKeys(self):
+    lock= self.masterParamLock
+    lock.acquire()
+    keys = self.serviceDict.keys()
+    lock.release()    
+  
+    return keys
+  
+  def getServiceAddr(self, name):
+    addr = 'not found'
+    self.log.debug("getServiceAddr name: %s" % name)
+    lock= self.masterParamLock
+    lock.acquire()
+    try:
+      service = self.serviceDict[name]
+    except KeyError:
+      pass
+    else:
+      self.log.debug("getServiceAddr service: %s" % service)
+      if (service.isMasterInitialized()):
+        addr = service.getMasterAddrs()[0]
+      else:
+        addr = 'not found'
+    lock.release()
+    self.log.debug("getServiceAddr addr %s: %s" % (name, addr))
+    
+    return addr
+
+  def getURLs(self, name):
+    addr = 'none'
+    lock = self.masterParamLock
+    lock.acquire()
+    
+    try:
+      service = self.serviceDict[name]
+    except KeyError:
+      pass
+    else:
+      if (service.isMasterInitialized()):
+        addr = service.getInfoAddrs()[0]
+      
+    lock.release()
+    
+    return addr
+
+  
+class RingMaster:
+  def __init__(self, cfg, log, **kwds):
+    """starts nodepool and services"""
+    self.download = False
+    self.httpServer = None
+    self.cfg = cfg
+    self.log = log
+    self.__hostname = local_fqdn()
+    self.workDirs = None 
+
+    # ref to the idle job tracker object.
+    self.__jtMonitor = None
+    self.__idlenessDetected = False
+    self.__stopInProgress = False
+
+    self.__initialize_signal_handlers()
+    
+    sdd = self.cfg['servicedesc']
+    gsvc = None
+    for key in sdd:
+      gsvc = sdd[key]
+      break
+    
+    npd = self.cfg['nodepooldesc']
+    self.np = NodePoolUtil.getNodePool(npd, cfg, log)
+
+    self.log.debug("Getting service ID.")
+    
+    self.serviceId = self.np.getServiceId()
+    
+    self.log.debug("Got service ID: %s" % self.serviceId)
+
+    self.tarSrcLoc = None
+    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
+      self.download = True
+      self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']
+ 
+    self.cd_to_tempdir()
+
+    if (self.download):
+      self.__copy_tarball(os.getcwd())
+      self.basename = self.__find_tarball_in_dir(os.getcwd())
+      if self.basename is None:
+        raise Exception('Did not find tarball copied from %s in %s.'
+                          % (self.tarSrcLoc, os.getcwd()))
+      
+    self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])
+    
+    self.log.debug("Service registry @ %s" % self.serviceAddr)
+    
+    self.serviceClient = hodXRClient(self.serviceAddr)
+    self.serviceDict  = {}
+    try:
+      sdl = self.cfg['servicedesc']
+
+      workDirs = self.getWorkDirs(cfg)
+
+      hdfsDesc = sdl['hdfs']
+      hdfs = None
+      if hdfsDesc.isExternal():
+        hdfs = HdfsExternal(hdfsDesc, workDirs)
+      else:
+        hdfs = Hdfs(hdfsDesc, workDirs, 0)
+
+      self.serviceDict[hdfs.getName()] = hdfs
+      
+      mrDesc = sdl['mapred']
+      mr = None
+      if mrDesc.isExternal():
+        mr = MapReduceExternal(mrDesc, workDirs)
+      else:
+        mr = MapReduce(mrDesc, workDirs,1)
+
+      self.serviceDict[mr.getName()] = mr
+    except:
+      self.log.debug(get_exception_string)
+
+    # should not be starting these in a constructor
+    ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
+    
+    self.rpcserver = ringMasterServer.getAddress()
+    
+    self.httpAddress = None   
+    self.tarAddress = None 
+    hostname = socket.gethostname()
+    if (self.download):
+      self.httpServer = threadedHTTPServer(hostname, 
+        self.cfg['ringmaster']['http-port-range'])
+      
+      self.httpServer.serve_forever()
+      self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], 
+                                 self.httpServer.server_address[1])
+      self.tarAddress = "%s%s" % (self.httpAddress, self.basename)
+      
+      ringMasterServer.instance.logMasterSources.registerTarSource(hostname, 
+                                                                   self.tarAddress)
+    else:
+      self.log.debug("Download not set.")
+    
+    self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], 
+      self.serviceId, self.__hostname, 'ringmaster', 'hod'))
+    
+    if self.cfg['ringmaster']['register']:      
+      if self.httpAddress:
+        self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
+          self.serviceId, self.__hostname, 'ringmaster', 'hod', {
+          'xrs' : self.rpcserver, 'http' : self.httpAddress })
+      else:
+        self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
+          self.serviceId, self.__hostname, 'ringmaster', 'hod', {
+          'xrs' : self.rpcserver, })
+    
+    self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)
+    
+    hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')
+    hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' 
+                                  + getpass.getuser())
+    
+    self.cfg['hodring']['hodring'] = [hodRingWorkDir,]
+    self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']
+    self.cfg['hodring']['service-id'] = self.np.getServiceId()
+
+    self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)
+    
+    if (self.tarSrcLoc != None):
+      cfg['hodring']['download-addr'] = self.tarAddress
+ 
+    self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)
+
+  def __init_job_tracker_monitor(self, logMasterSources):
+    hadoopDir = self.__getHadoopDir()
+    self.log.debug('hadoopdir=%s, java-home=%s' % \
+                (hadoopDir, self.cfg['hodring']['java-home']))
+    try:
+      self.__jtMonitor = JobTrackerMonitor(self.log, self, 
+                            self.cfg['ringmaster']['jt-poll-interval'], 
+                            self.cfg['ringmaster']['idleness-limit'],
+                            hadoopDir, self.cfg['hodring']['java-home'],
+                            logMasterSources)
+      self.log.debug('starting jt monitor')
+      self.__jtMonitor.start()
+    except:
+      self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\
+                          Exception message: %s' % get_exception_error_string())
+      self.log.debug('Exception details: %s' % get_exception_string())
+
+
+  def __getHadoopDir(self):
+    hadoopDir = None
+    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
+      tarFile = os.path.join(os.getcwd(), self.basename)
+      ret = untar(tarFile, os.getcwd())
+      if not ret:
+        raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \
+                            % (tarFile, os.getcwd()))
+      hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))
+    else:
+      hadoopDir = self.cfg['gridservice-mapred']['pkgs']
+    self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)
+    return hadoopDir
+
+  def __get_dir(self, name):
+    """Return the root directory inside the tarball
+    specified by name. Assumes that the tarball begins
+    with a root directory."""
+    import tarfile
+    myTarFile = tarfile.open(name)
+    hadoopPackage = myTarFile.getnames()[0]
+    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
+    return hadoopPackage
+
+  def __find_tarball_in_dir(self, dir):
+    """Find the tarball among files specified in the given 
+    directory. We need this method because how the tarball
+    source URI is given depends on the method of copy and
+    we can't get the tarball name from that.
+    This method will fail if there are multiple tarballs
+    in the directory with the same suffix."""
+    files = os.listdir(dir)
+    for file in files:
+      if self.tarSrcLoc.endswith(file):
+        return file
+    return None
+
+  def __copy_tarball(self, destDir):
+    """Copy the hadoop tar ball from a remote location to the
+    specified destination directory. Based on the URL it executes
+    an appropriate copy command. Throws an exception if the command
+    returns a non-zero exit code."""
+    # for backwards compatibility, treat the default case as file://
+    url = ''
+    if self.tarSrcLoc.startswith('/'):
+      url = 'file:/'
+    src = '%s%s' % (url, self.tarSrcLoc)
+    if src.startswith('file://'):
+      src = src[len('file://')-1:]
+      cpCmd = '/bin/cp'
+      cmd = '%s %s %s' % (cpCmd, src, destDir)
+      self.log.debug('Command to execute: %s' % cmd)
+      copyProc = simpleCommand('remote copy', cmd)
+      copyProc.start()
+      copyProc.wait()
+      copyProc.join()
+      ret = copyProc.exit_code()
+      self.log.debug('Completed command execution. Exit Code: %s.' % ret)
+
+      if ret != 0:
+        output = copyProc.output()
+        raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' 
+                        % (cmd, ret, output))
+    else:
+      raise Exception('Unsupported URL for file: %s' % src)
+
+# input: http://hostname:port/. output: [hostname,port]
+  def __url_to_addr(self, url):
+    addr = url.rstrip('/')
+    if addr.startswith('http://'):
+      addr = addr.replace('http://', '', 1)
+    addr_parts = addr.split(':')
+    return [addr_parts[0], int(addr_parts[1])]
+
+  def __initialize_signal_handlers(self): 
+    def sigStop(sigNum, handler):
+      sig_wrapper(sigNum, self.stop)
+  
+    signal.signal(signal.SIGTERM, sigStop)
+    signal.signal(signal.SIGINT, sigStop)
+    signal.signal(signal.SIGQUIT, sigStop)
+
+  def __clean_up(self):
+    tempDir = self.__get_tempdir()
+    os.chdir(os.path.split(tempDir)[0])
+    if os.path.exists(tempDir):
+      shutil.rmtree(tempDir, True)
+      
+    self.log.debug("Cleaned up temporary dir: %s" % tempDir)
+
+  def __get_tempdir(self):
+    dir = os.path.join(self.cfg['ringmaster']['temp-dir'], 
+                          "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], 
+                                                self.np.getServiceId()))
+    return dir
+
+  def getWorkDirs(self, cfg, reUse=False):
+
+    if (not reUse) or (self.workDirs == None):
+      import math
+      frand = random.random()
+      while math.ceil(frand) != math.floor(frand):
+        frand = frand * 100
+
+      irand = int(frand)
+      uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)
+      dirs = []
+      parentDirs = cfg['ringmaster']['work-dirs']
+      for p in parentDirs:
+        dir = os.path.join(p, uniq)
+        dirs.append(dir)
+      self.workDirs = dirs
+
+    return self.workDirs
+
+  def _fetchLink(self, link, parentDir):
+    parser = miniHTMLParser()
+    self.log.debug("Checking link %s" %link)
+    while link:
+
+      # Get the file from the site and link
+      input = urllib.urlopen(link)
+      out = None
+      contentType = input.info().gettype()
+      isHtml = contentType == 'text/html'
+
+      #print contentType
+      if isHtml:
+        parser.setBaseUrl(input.geturl())
+      else:
+        parsed = urlparse.urlparse(link)
+        hp = parsed[1]
+        h = hp
+        p = None
+        if hp.find(':') != -1:
+          h, p = hp.split(':', 1)
+        path = parsed[2]
+        path = path.split('/')
+        file = os.path.join(parentDir, h, p)
+        for c in path:
+          if c == '':
+            continue
+          file = os.path.join(file, c)
+
+        try:
+          self.log.debug('Creating %s' % file)
+          dir, tail = os.path.split(file)
+          if not os.path.exists(dir):
+            os.makedirs(dir)
+        except:
+          self.log.debug(get_exception_string())
+
+        out = open(file, 'w')
+
+      bufSz = 8192
+      buf = input.read(bufSz)
+      while len(buf) > 0:
+        if isHtml:
+          # Feed the file into the HTML parser
+          parser.feed(buf)
+        if out:
+          out.write(buf)
+        buf = input.read(bufSz)
+
+      input.close()
+      if out:
+        out.close()
+
+      # Search the retfile here
+
+      # Get the next link in level traversal order
+      link = parser.getNextLink()
+      
+    parser.close()
+    
+  def _finalize(self):
+    try:
+      # FIXME: get dir from config
+      dir = 'HOD-log-P%d' % (os.getpid())
+      dir = os.path.join('.', dir)
+    except:
+      self.log.debug(get_exception_string())
+
+    self.np.finalize()
+
+  def handleIdleJobTracker(self):
+    self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \
+                          % self.cfg['ringmaster']['idleness-limit'])
+    self.__idlenessDetected = True
+
+  def cd_to_tempdir(self):
+    dir = self.__get_tempdir()
+    
+    if not os.path.exists(dir):
+      os.makedirs(dir)
+    os.chdir(dir)
+    
+    return dir
+  
+  def getWorkload(self):
+    return self.workload
+
+  def getHostName(self):
+    return self.__hostname
+
+  def start(self):
+    """run the thread main loop"""
+    
+    self.log.debug("Entered start method.")
+    hodring = os.path.join(self.cfg['ringmaster']['base-dir'], 
+                           'bin', 'hodring')
+    largs = [hodring]
+    targs = self.cfg.get_args(section='hodring')
+    largs.extend(targs) 
+    
+    hodringCmd = ""
+    for item in largs:
+      hodringCmd = "%s%s " % (hodringCmd, item)
+      
+    self.log.debug(hodringCmd)
+    
+    if self.np.runWorkers(largs) > 0:
+      self.log.critical("Failed to start worker.")
+    
+    self.log.debug("Returned from runWorkers.")
+    
+    self._finalize()
+
+  def stop(self):
+    self.log.debug("RingMaster stop method invoked.")
+    if self.__stopInProgress:
+      return
+    self.__stopInProgress = True
+    if self.__jtMonitor is not None:
+      self.__jtMonitor.stop()
+    if ringMasterServer.instance is not None:
+      self.log.debug('stopping ringmaster instance')
+      ringMasterServer.stopService()
+    if self.httpServer:
+      self.httpServer.stop()
+      
+    self.__clean_up()
+
+  def isClusterIdle(self):
+    return self.__idlenessDetected
+
+def main(cfg,log):
+  try:
+    rm = None
+    dGen = DescGenerator(cfg)
+    cfg = dGen.initializeDesc()
+    rm = RingMaster(cfg, log)
+    rm.start()
+    while not rm.isClusterIdle():
+      time.sleep(1)
+    rm.stop()
+    log.debug('returning from main')
+  except Exception, e:
+    if log:
+      log.critical(get_exception_string())
+    raise Exception(e)

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/ringMaster.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,147 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+import os, pprint, re, time
+
+from hodlib.Common.threads import simpleCommand
+from hodlib.Common.util import args_to_string
+from hodlib.Common.logger import hodDummyLogger
+
+reQstatLine = re.compile("^\s*(\w+)\s*=\s*(.*)\s*$")
+
+class torqueInterface:
+  def __init__(self, torqueDir, environment, log=None):
+    self.__qsub = os.path.join(torqueDir, 'bin', 'qsub')
+    self.__qdel = os.path.join(torqueDir, 'bin', 'qdel')
+    self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
+    self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
+    self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
+    self.__env = environment
+    
+    self.__log = log
+    if not self.__log:
+      self.__log = hodDummyLogger()
+        
+  def qsub(self, argList, stdinList):
+    jobID = False
+    exitCode = 0
+
+    qsubCommand = "%s %s" % (self.__qsub, args_to_string(argList))
+    
+    self.__log.debug("qsub -> %s" % qsubCommand)
+    
+    qsubProcess = simpleCommand('qsub', qsubCommand, env=self.__env)
+    qsubProcess.start()
+    
+    while qsubProcess.stdin == None:
+      time.sleep(.2)
+
+    for line in stdinList:
+      self.__log.debug("qsub stdin: %s" % line)
+      print >>qsubProcess.stdin, line
+
+    qsubProcess.stdin.close()
+    qsubProcess.wait()
+    qsubProcess.join()
+    
+    exitCode = qsubProcess.exit_code()
+    if exitCode == 0:
+      buffer = qsubProcess.output()
+      jobID = buffer[0].rstrip('\n')
+      self.__log.debug("qsub jobid: %s" % jobID)
+    else:
+      self.__log.critical("qsub error: %s" % qsubProcess.exit_status_string())    
+    
+    return jobID, exitCode
+  
+  def qstat(self, jobID):
+    qstatInfo = None  
+    
+    qstatCommand = "%s -f -1 %s" % (self.__qstat, jobID)
+    
+    self.__log.debug(qstatCommand)
+
+    qstatProcess = simpleCommand('qstat', qstatCommand, env=self.__env)
+    qstatProcess.start()
+    qstatProcess.wait()
+    qstatProcess.join()
+    
+    exitCode = qstatProcess.exit_code()
+    if exitCode > 0:
+      self.__log.error('qstat error: %s' % qstatProcess.exit_status_string())
+    else:
+      qstatInfo = {}
+      for line in qstatProcess.output():
+        line = line.rstrip()
+        if line.find('=') != -1:
+          qstatMatch = reQstatLine.match(line)
+          if qstatMatch:
+            key = qstatMatch.group(1)
+            value = qstatMatch.group(2)
+            qstatInfo[key] = value
+          
+      if 'exec_host' in qstatInfo:
+        list = qstatInfo['exec_host'].split('+')
+        addrList = []
+        
+        for item in list:
+          [head, end] = item.split('/', 1)
+          addrList.append(head)
+        
+        qstatInfo['exec_host'] = addrList
+        
+    return qstatInfo, exitCode
+  
+  def pbs_nodes(self, argString):
+    pass
+  
+  def qdel(self, jobId, force=False):
+    exitCode = 0
+    qdel = self.__qdel
+    if force:
+      qdel = "%s -p %s" % (qdel, jobId)
+    else:
+      qdel = "%s %s" % (qdel, jobId) 
+
+    self.__log.debug(qdel)
+
+    qdelProcess = simpleCommand('qdel', qdel, env=self.__env)
+    qdelProcess.start()
+    qdelProcess.wait()
+    qdelProcess.join()      
+      
+    exitCode = qdelProcess.exit_code()
+    
+    return exitCode
+  
+  def pbsdsh(self, arguments):
+    status = None
+    
+    pbsdshCommand = "%s %s" % (self.__pbsdsh, args_to_string(arguments))
+    
+    self.__log.debug("pbsdsh command: %s" % pbsdshCommand)
+    
+    pbsdsh = simpleCommand('pbsdsh', pbsdshCommand, env=self.__env)
+    pbsdsh.start()   
+
+    for i in range(0, 30):
+      status = pbsdsh.exit_code()
+      if status:
+        self.__log.error("pbsdsh failed: %s" % pbsdsh.exit_status_string())
+        break  
+    
+    if not status: status = 0
+      
+    return status  

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Schedulers/torque.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,49 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+"""HOD Service Proxy Implementation"""
+# -*- python -*-
+
+import sys, time, signal, httplib, socket, threading
+import sha, base64, hmac
+import xml.dom.minidom
+
+from hodlib.Common.socketServers import hodHTTPServer
+from hodlib.Common.hodsvc import hodBaseService
+from hodlib.Common.threads import loop
+from hodlib.Common.tcp import tcpSocket
+from hodlib.Common.util import get_exception_string
+from hodlib.Common.AllocationManagerUtil import *
+
+class svcpxy(hodBaseService):
+    def __init__(self, config):
+        hodBaseService.__init__(self, 'serviceProxy', config['service_proxy'],
+                                xrtype='twisted')
+        self.amcfg=config['allocation_manager']
+
+    def _xr_method_isProjectUserValid(self, userid, project, ignoreErrors = False, timeOut = 15):
+       return self.isProjectUserValid(userid, project, ignoreErrors, timeOut)
+    
+    def isProjectUserValid(self, userid, project, ignoreErrors, timeOut):
+        """Method thats called upon by
+        the hodshell to verify if the 
+        specified (user, project) combination 
+        is valid"""
+        self.logs['main'].info("Begin isProjectUserValid()")
+        am = AllocationManagerUtil.getAllocationManager(self.amcfg['id'], 
+                                                        self.amcfg,
+                                                        self.logs['main'])
+        self.logs['main'].info("End isProjectUserValid()")
+        return am.getQuote(userid, project)

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceProxy/serviceProxy.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,109 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+import sys, time, socket, threading, copy, pprint
+
+from hodlib.Common.hodsvc import hodBaseService
+from hodlib.Common.threads import loop
+from hodlib.Common.tcp import tcpSocket
+from hodlib.Common.util import get_exception_string
+
+class svcrgy(hodBaseService):
+    def __init__(self, config):
+        hodBaseService.__init__(self, 'serviceRegistry', config)
+        
+        self.__serviceDict = {}
+        self.__failCount = {}
+        self.__released = {}
+        self.__locked = {}
+        
+        self.__serviceDictLock = threading.Lock()
+    
+    def __get_job_key(self, userid, job):
+        return "%s-%s" % (userid, job)
+    
+    def _xr_method_registerService(self, userid, job, host, name, type, dict):
+       return self.registerService(userid, job, host, name, type, dict)
+    
+    def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, 
+                                  type=None):
+        return self.getServiceInfo(userid, job, name, type)
+        
+    def registerService(self, userid, job, host, name, type, dict):
+        """Method thats called upon by
+        the ringmaster to register to the
+        the service registry"""
+        lock = self.__serviceDictLock
+        lock.acquire()
+        try:
+            self.logs['main'].debug("Registering %s.%s.%s.%s.%s..." % (
+                                    userid, job, host, name, type))    
+            id = "%s.%s" % (name, type) 
+   
+            if userid in self.__serviceDict:
+                if job in self.__serviceDict[userid]:
+                     if host in self.__serviceDict[userid][job]:
+                          self.__serviceDict[userid][job][host].append(
+                              {id : dict,})
+                     else:
+                        self.__serviceDict[userid][job][host] = [
+                            {id : dict,},] 
+                else:
+                    self.__serviceDict[userid][job] = {host : [
+                                                       { id : dict,},]}
+            else:    
+                self.__serviceDict[userid] = {job : {host : [
+                                                     { id : dict,},]}}
+
+        finally:
+            lock.release()
+            
+        return True
+    
+    def getXMLRPCAddr(self):
+      """return the xml rpc server address"""
+      return self._xrc.server_address
+    
+    def getServiceInfo(self, userid=None, job=None, name=None, type=None):
+        """This method is called upon by others
+        to query for a particular service returns
+        a dictionary of elements"""
+        
+        self.logs['main'].debug("inside getServiceInfo: %s.%s.%s" % (userid, job, name))
+        retdict = {}
+        lock = self.__serviceDictLock
+        lock.acquire()
+        try:
+            if userid in self.__serviceDict:
+                if job in self.__serviceDict[userid]:
+                    if name and type:
+                        retdict = []
+                        id = "%s.%s" % (name, type)
+                        for host in self.__serviceDict[userid][job]:
+                            for dict in self.__serviceDict[userid][job][host]:
+                                [loopID, ] = dict.keys()
+                                if loopID == id:
+                                    retdict.append(dict[id])
+                    else:
+                        retdict = copy.deepcopy(
+                            self.__serviceDict[userid][job])
+                elif not job:
+                    retdict = copy.deepcopy(self.__serviceDict[userid])
+            elif not userid:
+                retdict = copy.deepcopy(self.__serviceDict)
+        finally:
+            lock.release()
+        
+        return retdict

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,16 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL



Mime
View raw message