hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject svn commit: r608950 [5/7] - in /lucene/hadoop/trunk/src: contrib/hod/ contrib/hod/bin/ contrib/hod/conf/ contrib/hod/hodlib/ contrib/hod/hodlib/AllocationManagers/ contrib/hod/hodlib/Common/ contrib/hod/hodlib/GridServices/ contrib/hod/hodlib/Hod/ cont...
Date Fri, 04 Jan 2008 18:20:22 GMT
Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,576 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+"""define WorkLoad as abstract interface for user job"""
+# -*- python -*-
+
+import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
+
+from HTMLParser import HTMLParser
+
+import xml.dom.minidom
+import xml.dom.pulldom
+from xml.dom import getDOMImplementation
+
+from hodlib.Common.util import *
+from hodlib.Common.xmlrpc import hodXRClient
+from hodlib.Common.miniHTMLParser import miniHTMLParser
+from hodlib.Common.nodepoolutil import NodePoolUtil
+from hodlib.Common.tcp import tcpError, tcpSocket
+
+reCommandDelimeterString = r"(?<!\\);"
+reCommandDelimeter = re.compile(reCommandDelimeterString)
+
+class hadoopConfig:
+  def __create_xml_element(self, doc, name, value, description, final = False):
+    prop = doc.createElement("property")
+    nameP = doc.createElement("name")
+    string = doc.createTextNode(name)
+    nameP.appendChild(string)
+    valueP = doc.createElement("value")
+    string = doc.createTextNode(value)
+    valueP.appendChild(string)
+    if final:
+      finalP = doc.createElement("final")
+      string = doc.createTextNode("true")
+      finalP.appendChild(string)
+    desc = doc.createElement("description")
+    string = doc.createTextNode(description)
+    desc.appendChild(string)
+    prop.appendChild(nameP)
+    prop.appendChild(valueP)
+    if final:
+      prop.appendChild(finalP)
+    prop.appendChild(desc)
+    
+    return prop
+
+  def gen_site_conf(self, confDir, numNodes, hdfsAddr, mapredAddr=None,\
+             clientParams=None, serverParams=None,\
+             finalServerParams=None, clusterFactor=None):
+    if not mapredAddr:
+      mapredAddr = "dummy:8181"
+    
+    implementation = getDOMImplementation()
+    doc = implementation.createDocument('', 'configuration', None)
+    comment = doc.createComment(
+      "This is an auto generated hadoop-site.xml, do not modify")
+    topElement = doc.documentElement
+    topElement.appendChild(comment)
+    prop = self.__create_xml_element(doc, 'mapred.job.tracker', 
+                                     mapredAddr, "description")
+    topElement.appendChild(prop)
+    prop = self.__create_xml_element(doc, 'fs.default.name', hdfsAddr, 
+                                   "description")
+    topElement.appendChild(prop)
+    mapredAddrSplit = mapredAddr.split(":")
+    mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
+    prop = self.__create_xml_element(doc, 'mapred.system.dir', mapredsystem, 
+                                   "description", True )
+    topElement.appendChild(prop)
+    prop = self.__create_xml_element(doc, 'hadoop.tmp.dir', confDir, 
+                                   "description")
+    topElement.appendChild(prop)
+    prop = self.__create_xml_element(doc, 'dfs.client.buffer.dir', 
+                                     confDir, "description")
+    topElement.appendChild(prop)
+
+    # clientParams aer enabled now
+    if clientParams:
+      for k, v in clientParams.iteritems():
+        prop = self.__create_xml_element(doc, k, v[0], "client param")
+        topElement.appendChild(prop)
+
+    # end
+
+    # servelParams
+    if serverParams:
+      for k, v in serverParams.iteritems():
+        prop = self.__create_xml_element(doc, k, v[0], "server param")
+        topElement.appendChild(prop)
+
+    # finalservelParams
+    if finalServerParams:
+      for k, v in finalServerParams.iteritems():
+        prop = self.__create_xml_element(doc, k, v[0], "server param", True)
+        topElement.appendChild(prop)
+
+   
+    # mapred-default.xml is no longer used now.
+    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
+    prop = self.__create_xml_element(doc, "mapred.reduce.tasks", str(numred), 
+                                 "description")
+    topElement.appendChild(prop)
+    # end
+
+    siteName = os.path.join(confDir, "hadoop-site.xml")
+    sitefile = file(siteName, 'w')
+    print >> sitefile, topElement.toxml()
+    sitefile.close()
+
+class hadoopCluster:
+  def __init__(self, cfg, log):
+    self.__cfg = cfg
+    self.__log = log
+    self.__changedClusterParams = []
+    
+    self.__hostname = local_fqdn()    
+    self.__svcrgyClient = None
+    self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'], 
+                                               self.__cfg, self.__log)        
+    self.__hadoopCfg = hadoopConfig()
+    self.jobId = None
+    self.mapredInfo = None
+    self.hdfsInfo = None
+    self.ringmasterXRS = None
+
+  def __get_svcrgy_client(self):
+    svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
+    return hodXRClient(svcrgyUrl)
+
+  def __get_service_status(self):
+    serviceData = self.__get_service_data()
+    
+    status = True
+    hdfs = False
+    mapred = False
+    
+    for host in serviceData.keys():
+      for item in serviceData[host]:
+        service = item.keys()
+        if service[0] == 'hdfs.grid' and \
+          self.__cfg['gridservice-hdfs']['external'] == False:
+          hdfs = True
+        elif service[0] == 'mapred.grid':
+          mapred = True
+    
+    if not mapred:
+      status = "mapred"
+    
+    if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
+      if status != True:
+        status = "mapred and hdfs"
+      else:
+        status = "hdfs"
+      
+    return status
+  
+  def __get_service_data(self):
+    registry = to_http_url(self.__cfg['hod']['xrs-address'])
+    serviceData = self.__svcrgyClient.getServiceInfo(
+      self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
+    
+    return serviceData
+  
+  def __check_allocation_manager(self):
+    userValid = True
+    try:
+      self.serviceProxyClient = hodXRClient(
+        to_http_url(self.__cfg['hod']['proxy-xrs-address']), None, None, 0,
+        0, 1, False, 15)
+      
+      userValid = self.serviceProxyClient.isProjectUserValid(
+        self.__setup.cfg['hod']['userid'], 
+        self.__setup.cfg['resource_manager']['pbs-account'],True)
+      
+      if userValid:
+        self.__log.debug("Validated that user %s is part of project %s." %
+          (self.__cfg['hod']['userid'], 
+           self.__cfg['resource_manager']['pbs-account']))
+      else:
+        self.__log.debug("User %s is not part of project: %s." % (
+          self.__cfg['hod']['userid'], 
+          self.__cfg['resource_manager']['pbs-account']))
+        self.__log.error("Please specify a valid project in "
+                      + "resource_manager.pbs-account. If you still have "
+                      + "issues, please contact operations")
+        userValidd = False
+        # ignore invalid project for now - TODO
+    except Exception:
+      # ignore failures - non critical for now
+      self.__log.debug(
+        "Unable to contact Allocation Manager Proxy - ignoring...")
+      #userValid = False
+        
+    return userValid
+
+  def __check_job_status(self):
+    initWaitCount = 20
+    count = 0
+    status = False
+    state = 'Q'
+    while state == 'Q':
+      state = self.__nodePool.getJobState()
+      if (state==False) or (state!='Q'):
+        break
+      count = count + 1
+      if count < initWaitCount:
+        time.sleep(0.5)
+      else:
+        time.sleep(10)
+    
+    if state and state != 'C':
+      status = True
+    
+    return status
+  
+  def __get_ringmaster_client(self):
+    ringmasterXRS = None
+   
+    ringList = self.__svcrgyClient.getServiceInfo(
+      self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
+      'ringmaster', 'hod')
+
+    if ringList and len(ringList):
+      if isinstance(ringList, list):
+        ringmasterXRS = ringList[0]['xrs']
+    else:    
+      count = 0
+      waitTime = self.__cfg['hod']['allocate-wait-time']
+  
+      while count < waitTime:
+        ringList = self.__svcrgyClient.getServiceInfo(
+          self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
+          'ringmaster', 
+          'hod')
+        
+        if ringList and len(ringList):
+          if isinstance(ringList, list):        
+            ringmasterXRS = ringList[0]['xrs']
+        
+        if ringmasterXRS is not None:
+          break
+        else:
+          time.sleep(1)
+          count = count + 1
+          # check to see if the job exited by any chance in that time:
+          if (count % 10 == 0):
+            if not self.__check_job_status():
+              break
+
+    return ringmasterXRS
+ 
+  def __init_hadoop_service(self, serviceName, xmlrpcClient):
+    status = True
+    serviceAddress = None
+    serviceInfo = None
+ 
+    for i in range(0, 250):
+      try:
+        serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
+        if serviceAddress:
+          if serviceAddress == 'not found':
+            time.sleep(.5)
+          # check to see if the job exited by any chance in that time:
+            if (i % 10 == 0):
+              if not self.__check_job_status():
+                break
+          else:
+            serviceInfo = xmlrpcClient.getURLs(serviceName)           
+            break 
+      except:
+        self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
+        self.__log.debug(get_exception_string())
+        status = False
+        break
+    
+    if serviceAddress == 'not found' or not serviceAddress:
+      self.__log.critical("Failed to retrieve '%s' service address." % 
+                          serviceName)
+      status = False
+    else:
+      try:
+        self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'], 
+                                            self.jobId, self.__hostname, 
+                                            serviceName, 'grid', serviceInfo)
+        
+      except:
+        self.__log.critical("'%s': registry xmlrpc error." % serviceName)    
+        self.__log.debug(get_exception_string())
+        status = False
+        
+    return status, serviceAddress, serviceInfo
+
+  def __collect_jobtracker_ui(self, dir):
+
+     link = self.mapredInfo + "/jobtracker.jsp"
+     parser = miniHTMLParser()
+     parser.setBaseUrl(self.mapredInfo)
+     node_cache = {}
+
+     self.__log.debug("collect_jobtracker_ui seeded with " + link)
+
+     def alarm_handler(number, stack):
+         raise AlarmException("timeout")
+       
+     signal.signal(signal.SIGALRM, alarm_handler)
+
+     input = None
+     while link:
+       self.__log.debug("link: %s" % link)
+       # taskstats.jsp,taskdetails.jsp not included since too many to collect
+       if re.search(
+         "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp", 
+         link):
+
+         for i in range(1,5):
+           try:
+             input = urllib.urlopen(link)
+             break
+           except:
+             self.__log.debug(get_exception_string())
+             time.sleep(1)
+  
+         if input:
+           out = None
+    
+           self.__log.debug("collecting " + link + "...")
+           filename = re.sub(self.mapredInfo, "", link)
+           filename = dir + "/"  + filename
+           filename = re.sub("http://","", filename)
+           filename = re.sub("[\?\&=:]","_",filename)
+           filename = filename + ".html"
+    
+           try:
+             tempdir, tail = os.path.split(filename)
+             if not os.path.exists(tempdir):
+               os.makedirs(tempdir)
+           except:
+             self.__log.debug(get_exception_string())
+    
+           out = open(filename, 'w')
+           
+           bufSz = 8192
+           
+           signal.alarm(10)
+           
+           try:
+             self.__log.debug("Starting to grab: %s" % link)
+             buf = input.read(bufSz)
+      
+             while len(buf) > 0:
+               # Feed the file into the HTML parser
+               parser.feed(buf)
+        
+         # Re-write the hrefs in the file
+               p = re.compile("\?(.+?)=(.+?)")
+               buf = p.sub(r"_\1_\2",buf)
+               p= re.compile("&(.+?)=(.+?)")
+               buf = p.sub(r"_\1_\2",buf)
+               p = re.compile("http://(.+?):(\d+)?")
+               buf = p.sub(r"\1_\2/",buf)
+               buf = re.sub("href=\"/","href=\"",buf)
+               p = re.compile("href=\"(.+?)\"")
+               buf = p.sub(r"href=\1.html",buf)
+ 
+               out.write(buf)
+               buf = input.read(bufSz)
+      
+             signal.alarm(0)
+             input.close()
+             if out:
+               out.close()
+               
+             self.__log.debug("Finished grabbing: %s" % link)
+           except AlarmException:
+             if out: out.close()
+             if input: input.close()
+             
+             self.__log.debug("Failed to retrieve: %s" % link)
+         else:
+           self.__log.debug("Failed to retrieve: %s" % link)
+         
+       # Get the next link in level traversal order
+       link = parser.getNextLink()
+
+     parser.close()
+     
+  def check_cluster(self, clusterInfo):
+    status = 0
+
+    if 'mapred' in clusterInfo:
+      mapredAddress = clusterInfo['mapred'][7:]
+      hdfsAddress = clusterInfo['hdfs'][7:]
+  
+      mapredSocket = tcpSocket(mapredAddress)
+        
+      try:
+        mapredSocket.open()
+        mapredSocket.close()
+      except tcpError:
+        status = 14
+  
+      hdfsSocket = tcpSocket(hdfsAddress)
+        
+      try:
+        hdfsSocket.open()
+        hdfsSocket.close()
+      except tcpError:
+        if status > 0:
+          status = 10
+        else:
+          status = 13
+      
+      if status == 0:
+        status = 12
+    else:
+      status = 15
+      
+    return status
+  
+  def cleanup(self):
+    if self.__nodePool: self.__nodePool.finalize()     
+
+  def get_job_id(self):
+    return self.jobId
+
+  def delete_job(self, jobId):
+    '''Delete a job given it's ID'''
+    ret = 0
+    if self.__nodePool: 
+      ret = self.__nodePool.deleteJob(jobId)
+    else:
+      raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
+    return ret
+         
+  def allocate(self, clusterDir, min, max=None):
+    status = 0  
+    self.__svcrgyClient = self.__get_svcrgy_client()
+        
+    self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
+    
+    if min < 3:
+      self.__log.critical("Minimum nodes must be greater than 2.")
+      status = 2
+    else:
+      if self.__check_allocation_manager():
+        nodeSet = self.__nodePool.newNodeSet(min)
+        self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet)
+        if self.jobId:                 
+          if self.__check_job_status():
+            self.ringmasterXRS = self.__get_ringmaster_client()
+            if self.ringmasterXRS:
+              ringClient =  hodXRClient(self.ringmasterXRS)
+              
+              hdfsStatus, hdfsAddr, self.hdfsInfo = \
+                self.__init_hadoop_service('hdfs', ringClient)
+              
+              if hdfsStatus:
+                mapredStatus, mapredAddr, self.mapredInfo = \
+                  self.__init_hadoop_service('mapred', ringClient)
+                  
+                if mapredStatus:
+                  self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
+                  self.__log.info("Mapred UI on http://%s" % self.mapredInfo)
+ 
+                  # Go generate the client side hadoop-site.xml now
+                  # adding final-params as well, just so that conf on 
+                  # client-side and server-side are (almost) the same
+                  clientParams = None
+                  serverParams = {}
+                  finalServerParams = {}
+
+                  # client-params
+                  if self.__cfg['hod'].has_key('client-params'):
+                    clientParams = self.__cfg['hod']['client-params']
+
+                  # server-params
+                  if self.__cfg['gridservice-mapred'].has_key('server-params'):
+                    serverParams.update(\
+                      self.__cfg['gridservice-mapred']['server-params'])
+                  if self.__cfg['gridservice-hdfs'].has_key('server-params'):
+                    # note that if there are params in both mapred and hdfs
+                    # sections, the ones in hdfs overwirte the ones in mapred
+                    serverParams.update(\
+                        self.__cfg['gridservice-mapred']['server-params'])
+                  
+                  # final-server-params
+                  if self.__cfg['gridservice-mapred'].has_key(\
+                                                    'final-server-params'):
+                    finalServerParams.update(\
+                      self.__cfg['gridservice-mapred']['final-server-params'])
+                  if self.__cfg['gridservice-hdfs'].has_key(
+                                                    'final-server-params'):
+                    finalServerParams.update(\
+                        self.__cfg['gridservice-hdfs']['final-server-params'])
+
+                  clusterFactor = self.__cfg['hod']['cluster-factor']
+                  self.__hadoopCfg.gen_site_conf(clusterDir, min,
+                            hdfsAddr, mapredAddr, clientParams,\
+                            serverParams, finalServerParams,\
+                            clusterFactor)
+                  # end of hadoop-site.xml generation
+                else:
+                  status = 8
+              else:
+                status = 7  
+            else:
+              status = 6
+            if status != 0:
+              self.__log.info("Cleaning up job id %s, as cluster could not be allocated." % self.jobId)
+              self.delete_job(self.jobId)
+          else:
+            self.__log.critical("No job found, ringmaster failed to run.")
+            status = 5 
+ 
+        elif self.jobId == False:
+          if exitCode == 188:
+            self.__log.critical("Request execeeded maximum resource allocation.")
+          else:
+            self.__log.critical("Insufficient resources available.")
+          status = 4
+        else:    
+          self.__log.critical("Scheduler failure, allocation failed.\n\n")        
+          status = 4
+      else:
+        status = 9
+    
+    return status
+
+  def deallocate(self, clusterDir, clusterInfo):
+    status = 0 
+    
+    nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'], 
+                                         id=clusterInfo['jobid'])
+    self.mapredInfo = clusterInfo['mapred']
+    self.hdfsInfo = clusterInfo['hdfs']
+    try:
+      if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
+        clusterStatus = self.check_cluster(clusterInfo)
+        if clusterStatus != 14 and clusterStatus != 10:   
+          # If JT is still alive
+          self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
+      else:
+        self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
+    except:
+      self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
+    status = self.__nodePool.finalize()
+   
+    return status
+  
+class hadoopScript:
+  def __init__(self, conf, execDir):
+    self.__environ = os.environ.copy()
+    self.__environ['HADOOP_CONF_DIR'] = conf
+    self.__execDir = execDir
+    
+  def run(self, script):
+    scriptThread = simpleCommand(script, script, self.__environ, 4, False, 
+                                 False, self.__execDir)
+    scriptThread.start()
+    scriptThread.wait()
+    scriptThread.join()
+    
+    return scriptThread.exit_code()

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,376 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+# -*- python -*-
+
+import sys, os, getpass, pprint, re, cPickle, random, shutil
+
+import hodlib.Common.logger
+
+from hodlib.ServiceRegistry.serviceRegistry import svcrgy
+from hodlib.Common.xmlrpc import hodXRClient
+from hodlib.Common.util import to_http_url, get_exception_string
+from hodlib.Common.util import get_exception_error_string
+from hodlib.Common.nodepoolutil import NodePoolUtil
+from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
+
+CLUSTER_DATA_FILE = 'clusters'
+
+class hodState:
+  def __init__(self, store):
+    self.__store = store
+    self.__stateFile = None
+    self.__init_store()
+    self.__STORE_EXT = ".state"
+   
+  def __init_store(self):
+    if not os.path.exists(self.__store):
+      os.mkdir(self.__store)
+  
+  def __set_state_file(self, id=None):
+    if id:
+      self.__stateFile = os.path.join(self.__store, "%s%s" % (id, 
+                                      self.__STORE_EXT))
+    else:
+      for item in os.listdir(self.__store):
+        if item.endswith(self.__STORE_EXT):  
+          self.__stateFile = os.path.join(self.__store, item)          
+          
+  def read(self, id=None):
+    info = {}
+    
+    self.__set_state_file(id)
+  
+    if self.__stateFile:
+      if os.path.isfile(self.__stateFile):
+        stateFile = open(self.__stateFile, 'r')
+        try:
+          info = cPickle.load(stateFile)
+        except EOFError:
+          pass
+        
+        stateFile.close()
+    
+    return info
+           
+  def write(self, id, info):
+    self.__set_state_file(id)
+    if not os.path.exists(self.__stateFile):
+      self.clear(id)
+      
+    stateFile = open(self.__stateFile, 'w')
+    cPickle.dump(info, stateFile)
+    stateFile.close()
+  
+  def clear(self, id=None):
+    self.__set_state_file(id)
+    if self.__stateFile and os.path.exists(self.__stateFile):
+      os.remove(self.__stateFile)
+    else:
+      for item in os.listdir(self.__store):
+        if item.endswith(self.__STORE_EXT):
+          os.remove(item)
+        
+class hodRunner:
+  def __init__(self, cfg):
+    self.__ops = [ 'prepare', 'allocate', 'deallocate', 
+                   'list', 'info', 'help' ]           
+    self.__cfg = cfg  
+    self.__npd = self.__cfg['nodepooldesc']
+    self.__opCode = 0
+    self.__user = getpass.getuser()
+    self.__registry = None
+    self.__baseLogger = None
+    self.__setup_logger()
+    
+    self.__userState = hodState(self.__cfg['hod']['user_state']) 
+    
+    self.__clusterState = None
+    self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
+    
+    self.__cluster = hadoopCluster(self.__cfg, self.__log)
+  
+  def __setup_logger(self):
+    self.__baseLogger = hodlib.Common.logger.hodLog('hod')
+    self.__log = self.__baseLogger.add_logger(self.__user )
+ 
+    if self.__cfg['hod']['stream']:
+      self.__baseLogger.add_stream(level=self.__cfg['hod']['debug'], 
+                            addToLoggerNames=(self.__user ,))
+  
+    if self.__cfg['hod'].has_key('syslog-address'):
+      self.__baseLogger.add_syslog(self.__cfg['hod']['syslog-address'], 
+                                   level=self.__cfg['hod']['debug'], 
+                                   addToLoggerNames=(self.__user ,))
+
+  def __setup_cluster_logger(self, directory):
+    self.__baseLogger.add_file(logDirectory=directory, level=4, 
+                               addToLoggerNames=(self.__user ,))
+
+  def __setup_cluster_state(self, directory):
+    self.__clusterState = hodState(directory)
+
+  def __norm_cluster_dir(self, directory):
+    directory = os.path.expanduser(directory)
+    directory = os.path.abspath(directory)
+    
+    return directory
+  
+  def __setup_service_registry(self):
+    cfg = self.__cfg['hod'].copy()
+    cfg['debug'] = 0
+    self.__registry = svcrgy(cfg)
+    self.__registry.start()
+    self.__log.debug(self.__registry.getXMLRPCAddr())
+    self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr()
+    self.__cfg['ringmaster']['svcrgy-addr'] = self.__cfg['hod']['xrs-address']
+
+  def __set_cluster_state_info(self, env, hdfs, mapred, ring, jobid, min, max):
+    self.__clusterStateInfo['env'] = env
+    self.__clusterStateInfo['hdfs'] = "http://%s" % hdfs
+    self.__clusterStateInfo['mapred'] = "http://%s" % mapred
+    self.__clusterStateInfo['ring'] = ring
+    self.__clusterStateInfo['jobid'] = jobid
+    self.__clusterStateInfo['min'] = min
+    self.__clusterStateInfo['max'] = max
+    
+  def __set_user_state_info(self, info):
+    userState = self.__userState.read(CLUSTER_DATA_FILE)
+    for key in info.keys():
+      userState[key] = info[key]
+      
+    self.__userState.write(CLUSTER_DATA_FILE, userState)  
+
+  def __remove_cluster(self, clusterDir):
+    clusterInfo = self.__userState.read(CLUSTER_DATA_FILE)
+    if clusterDir in clusterInfo:
+      del(clusterInfo[clusterDir])
+      self.__userState.write(CLUSTER_DATA_FILE, clusterInfo)
+      
+  def __cleanup(self):
+    if self.__registry: self.__registry.stop()
+    
+  def __check_operation(self, operation):    
+    opList = operation.split()
+    
+    if not opList[0] in self.__ops:
+      self.__log.critical("Invalid hod operation specified: %s" % operation)
+      self._op_help(None)
+      self.__opCode = 2
+         
+    return opList 
+  
+  def _op_allocate(self, args):
+    operation = "allocate"
+    argLength = len(args)
+    min = 0
+    max = 0
+    if argLength == 3:
+      nodes = args[2]
+      clusterDir = self.__norm_cluster_dir(args[1])
+      if os.path.isdir(clusterDir):
+        self.__setup_cluster_logger(clusterDir)
+        if re.match('\d+-\d+', nodes):
+          (min, max) = nodes.split("-")
+          min = int(min)
+          max = int(max)
+        else:
+          try:
+            nodes = int(nodes)
+            min = nodes
+            max = nodes
+          except ValueError:
+            self.__log.critical(
+            "%s operation requires a single argument. n nodes, or n-m nodes." % 
+            operation)
+            self.__opCode = 3
+          else:
+            self.__setup_cluster_state(clusterDir)
+            clusterInfo = self.__clusterState.read()
+            self.__opCode = self.__cluster.check_cluster(clusterInfo)
+            if self.__opCode == 0 or self.__opCode == 15:
+              self.__setup_service_registry()   
+              allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
+              if allocateStatus == 0:
+                self.__set_cluster_state_info(os.environ, 
+                                              self.__cluster.hdfsInfo, 
+                                              self.__cluster.mapredInfo, 
+                                              self.__cluster.ringmasterXRS,
+                                              self.__cluster.jobId,
+                                              min, max)
+                self.__setup_cluster_state(clusterDir)
+                self.__clusterState.write(self.__cluster.jobId, 
+                                          self.__clusterStateInfo)
+                self.__set_user_state_info( 
+                  { clusterDir : self.__cluster.jobId, } )
+              self.__opCode = allocateStatus
+            elif self.__opCode == 12:
+              self.__log.critical("Cluster %s already allocated." % clusterDir)
+            elif self.__opCode == 10:
+              self.__log.critical("dead\t%s\t%s" % (clusterInfo['jobid'], 
+                                                    clusterDir))
+            elif self.__opCode == 13:
+              self.__log.warn("hdfs dead\t%s\t%s" % (clusterInfo['jobid'], 
+                                                         clusterDir))
+            elif self.__opCode == 14:
+              self.__log.warn("mapred dead\t%s\t%s" % (clusterInfo['jobid'], 
+                                                       clusterDir))   
+            
+            if self.__opCode > 0 and self.__opCode != 15:
+              self.__log.critical("Cannot allocate cluster %s" % clusterDir)
+            
+      else:
+        self.__log.critical("Invalid cluster directory '%s' specified." % 
+                          clusterDir)
+        self.__opCode = 3
+    else:
+      self.__log.critical("%s operation requires two arguments. "  % operation
+                        + "A cluster path and n nodes, or min-max nodes.")
+      self.__opCode = 3
+  
+  def _op_deallocate(self, args):
+    operation = "deallocate"
+    argLength = len(args)
+    if argLength == 2:
+      clusterDir = self.__norm_cluster_dir(args[1])
+      if os.path.isdir(clusterDir):
+        self.__setup_cluster_state(clusterDir)
+        clusterInfo = self.__clusterState.read()
+        if clusterInfo == {}:
+          self.__opCode = 15
+          self.__log.critical("Cluster %s not allocated." % clusterDir)
+        else:
+          self.__opCode = \
+            self.__cluster.deallocate(clusterDir, clusterInfo)
+          # irrespective of whether deallocate failed or not\
+          # remove the cluster state.
+          self.__clusterState.clear()
+          self.__remove_cluster(clusterDir)
+      else:
+        self.__log.critical("Invalid cluster directory '%s' specified." % 
+                            clusterDir)
+        self.__opCode = 3        
+    else:
+      self.__log.critical("%s operation requires one argument. "  % operation
+                        + "A cluster path.")
+      self.__opCode = 3
+            
+  def _op_list(self, args):
+    clusterList = self.__userState.read(CLUSTER_DATA_FILE)
+    for path in clusterList.keys():
+      self.__setup_cluster_state(path)
+      clusterInfo = self.__clusterState.read()
+      clusterStatus = self.__cluster.check_cluster(clusterInfo)
+      if clusterStatus == 12:
+        self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
+      elif clusterStatus == 10:
+        self.__log.info("dead\t%s\t%s" % (clusterList[path], path))
+      elif clusterStatus == 13:
+        self.__log.info("hdfs dead\t%s\t%s" % (clusterList[path], path))
+      elif clusterStatus == 14:
+        self.__log.info("mapred dead\t%s\t%s" % (clusterList[path], path))    
+         
+  def _op_info(self, args):
+    operation = 'info'
+    argLength = len(args)  
+    if argLength == 2:
+      clusterDir = self.__norm_cluster_dir(args[1])
+      if os.path.isdir(clusterDir):
+        self.__setup_cluster_state(clusterDir)
+        clusterInfo = self.__clusterState.read()
+        clusterStatus = self.__cluster.check_cluster(clusterInfo)
+        if clusterStatus == 12:
+          self.__log.info(clusterDir)
+          keys = clusterInfo.keys()
+          keys.sort()
+          for key in keys:
+            if key != 'env':
+              self.__log.info("%s\t%s" % (key, clusterInfo[key]))  
+            
+          if self.__cfg['hod']['debug'] == 4:
+            for var in clusterInfo['env'].keys():
+              self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+        elif clusterStatus == 10:
+          self.__log.critical("%s cluster is dead" % clusterDir)
+        elif clusterStatus == 13:
+          self.__log.warn("%s cluster hdfs is dead" % clusterDir)
+        elif clusterStatus == 14:
+          self.__log.warn("%s cluster mapred is dead" % clusterDir)
+        
+        if clusterStatus != 12:
+          if clusterStatus == 15:
+            self.__log.critical("Cluster %s not allocated." % clusterDir)
+            
+          self.__opCode = clusterStatus
+      else:
+        self.__log.critical("'%s' does not exist." % clusterDir)
+        self.__opCode = 3 
+    else:
+      self.__log.critical("%s operation requires one argument. "  % operation
+                        + "A cluster path.")
+      self.__opCode = 3      
+  
+  def _op_help(self, args):  
+    print "hod operations:\n"
+    print " allocate <directory> <nodes> - Allocates a cluster of n nodes using the specified cluster"
+    print "                                directory to store cluster state information.  The Hadoop site XML" 
+    print "                                is also stored in this location."
+    print ""
+    print " deallocate <directory>       - Deallocates a cluster using the pecified cluster directory.  This"
+    print "                                operation is also required to clean up a dead cluster."      
+    print ""
+    print " list                         - List all clusters currently allocated by a user, along with" 
+    print "                                limited status information and the cluster's job ID."
+    print ""
+    print " info <directory>             - Provide detailed information on an allocated cluster."
+  
+  def operation(self):  
+    operation = self.__cfg['hod']['operation']
+    try:
+      opList = self.__check_operation(operation)
+      if self.__opCode == 0:
+        getattr(self, "_op_%s" % opList[0])(opList)
+    except:
+      self.__log.critical("op: %s failed: %s" % (operation,
+                          get_exception_error_string()))
+      self.__log.debug(get_exception_string())
+    
+    self.__cleanup()
+    
+    self.__log.debug("return code: %s" % self.__opCode)
+    
+    return self.__opCode
+  
+  def script(self):
+    script = self.__cfg['hod']['script']
+    nodes = self.__cfg['hod']['min-nodes']
+    clusterDir = "/tmp/%s.%s" % (self.__cfg['hod']['userid'], 
+                                 random.randint(0, 20000))
+    os.mkdir(clusterDir)
+    try:
+      self._op_allocate(('allocate', clusterDir, str(nodes)))
+      scriptRunner = hadoopScript(clusterDir, 
+                                  self.__cfg['hod']['original-dir'])
+      self.__opCode = scriptRunner.run(script)
+      self._op_deallocate(('deallocate', clusterDir))
+      shutil.rmtree(clusterDir, True)
+    except:
+      self.__log.critical("script: %s failed: %s" % (script,
+                          get_exception_error_string()))
+      self.__log.debug(get_exception_string())
+    
+    self.__cleanup()      
+    
+    return self.__opCode

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/hod.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,116 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+"""defines nodepool and nodeset as abstract interface for batch system"""
+# -*- python -*-
+
+from hodlib.GridServices.service import *
+
+class NodeSet:
+  """a set of nodes as one allocation unit"""
+
+  PENDING, COMMITTED, COMPLETE = range(3)
+
+  def __init__(self, id, numNodes, preferredList, isPreemptee):
+    self.id = id
+    self.numNodes = numNodes
+    self.isPreemptee = isPreemptee
+    self.preferredList = preferredList
+    self.cmdDescSet = []
+
+  def getId(self):
+    """returns a unique id of the nodeset"""
+    return self.id
+
+  def registerCommand(self, cmdDesc):
+    """register a command to the nodeset"""
+    self.cmdDescSet.append(cmdDesc)
+
+  def getAddrList(self):
+    """get list of node host names
+    May return empty list if node set is not allocated yet"""
+    raise NotImplementedError
+
+  def _getNumNodes(self):
+    return self.numNodes
+
+  def _isPreemptee(self):
+    return self.isPreemptee
+
+  def _getPreferredList(self):
+    return self.preferredList
+
+  def _getCmdSet(self):
+    return self.cmdDescSet
+
+class NodePool:
+  """maintains a collection of node sets as they get allocated.
+  Also the base class for all kinds of nodepools. """
+
+  def __init__(self, nodePoolDesc, cfg, log):
+    self.nodePoolDesc = nodePoolDesc
+    self.nodeSetDict = {}
+    self._cfg = cfg
+    self.nextNodeSetId = 0
+    self._log = log
+    
+
+  def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
+    """create a nodeset possibly with asked properties"""
+    raise NotImplementedError
+
+  def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None, 
+                    account = None, resourcelist = None):
+    """submit the nodeset request to nodepool
+    return False if error happened"""
+    raise NotImplementedError
+
+  def pollNodeSet(self, nodeSet):
+    """return status of node set"""
+    raise NotImplementedError
+
+  def getWorkers(self):
+    """return the hosts that comprise this nodepool"""
+    raise NotImplementedError
+
+  def runWorkers(self, nodeSet = None, args = []):
+    """Run node set workers."""
+    
+    raise NotImplementedError
+  
+  def freeNodeSet(self, nodeset):
+    """free a node set"""
+    raise NotImplementedError
+
+  def finalize(self):
+    """cleans up all nodesets"""
+    raise NotImplementedError
+
+  def getServiceId(self):
+    raise NotImplementedError
+ 
+  def getJobState(self):
+    raise NotImplementedError
+
+  def deleteJob(self, jobId):
+    """Delete a job, given it's id"""
+    raise NotImplementedError
+
+  def getNextNodeSetId(self):
+    id = self.nextNodeSetId
+    self.nextNodeSetId += 1
+    
+    return id  
+  

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,866 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+#!/usr/bin/env python
+"""hodring launches hadoop commands on work node and 
+ cleans up all the work dirs afterward
+"""
+# -*- python -*-
+import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom
+import socket, sets, urllib, csv, signal, pprint, random, re
+
+from xml.dom import getDOMImplementation
+from pprint import pformat
+from optparse import OptionParser
+from urlparse import urlparse
+from hodlib.Common.util import local_fqdn
+
+binfile = sys.path[0]
+libdir = os.path.dirname(binfile)
+sys.path.append(libdir)
+
+import hodlib.Common.logger
+
+from hodlib.GridServices.service import *
+from hodlib.Common.util import *
+from hodlib.Common.socketServers import threadedHTTPServer
+from hodlib.Common.hodsvc import hodBaseService
+from hodlib.Common.threads import simpleCommand
+from hodlib.Common.xmlrpc import hodXRClient
+
+mswindows = (sys.platform == "win32")
+originalcwd = os.getcwd()
+
+reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)")
+
+class CommandDesc:
+  """A class that represents the commands that
+  are run by hodring"""
+  def __init__(self, dict, log):
+    self.log = log
+    self.log.debug("In command desc")
+    self.log.debug("Done in command desc")
+    dict.setdefault('argv', [])
+    dict.setdefault('envs', {})
+    dict.setdefault('java-opts', [])
+    dict.setdefault('workdirs', [])
+    dict.setdefault('attrs', {})
+    dict.setdefault('final-attrs', {})
+    dict.setdefault('fg', False)
+    dict.setdefault('ignorefailures', False)
+    dict.setdefault('stdin', None)
+
+    self.log.debug("Printing dict")
+    self._checkRequired(dict)
+    self.dict = dict
+
+  def _checkRequired(self, dict):
+    if 'name' not in dict:
+      raise ValueError, "Command description lacks 'name'"
+    if 'program' not in dict:
+      raise ValueError, "Command description lacks 'program'"
+    if 'pkgdirs' not in dict:
+      raise ValueError, "Command description lacks 'pkgdirs'"
+
+  def getName(self):
+    return self.dict['name']
+
+  def getProgram(self):
+    return self.dict['program']
+
+  def getArgv(self):
+    return self.dict['argv']
+
+  def getEnvs(self):
+    return self.dict['envs']
+
+  def getJavaOpts(self):
+    return self.dict['java-opts']
+
+  def getPkgDirs(self):
+    return self.dict['pkgdirs']
+
+  def getWorkDirs(self):
+    return self.dict['workdirs']
+
+  def getAttrs(self):
+    return self.dict['attrs']
+
+  def getfinalAttrs(self):
+    return self.dict['final-attrs']
+  
+  def isForeground(self):
+    return self.dict['fg']
+
+  def isIgnoreFailures(self):
+    return self.dict['ignorefailures']
+
+  def getStdin(self):
+    return self.dict['stdin']
+
+  def parseDesc(str):
+
+    dict = CommandDesc._parseMap(str)
+
+    dict['argv'] = CommandDesc._parseList(dict['argv'])
+    dict['envs'] = CommandDesc._parseMap(dict['envs'])
+    dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':')
+    dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':')
+    dict['attrs'] = CommandDesc._parseMap(dict['attrs'])
+    dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs'])
+						
+    return CommandDesc(dict)
+
+  parseDesc = staticmethod(parseDesc)
+
+  def _parseList(str, delim = ','):
+    list = []
+    for row in csv.reader([str], delimiter=delim, escapechar='\\', 
+                          quoting=csv.QUOTE_NONE, doublequote=False):
+      list.extend(row)
+    return list
+
+  _parseList = staticmethod(_parseList)
+
+  def _parseMap(str):
+    """Parses key value pairs"""
+    dict = {}
+    for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False):
+      for f in row:
+        [k, v] = f.split('=', 1)
+        dict[k] = v
+    return dict
+
+  _parseMap = staticmethod(_parseMap)
+
+      
+class HadoopCommand:
+  """Runs a single hadoop command"""
+    
+  def __init__(self, id, desc, tempdir, tardir, log, javahome, restart=False):
+    self.desc = desc
+    self.log = log
+    self.javahome = javahome
+    self.program = desc.getProgram()
+    self.name = desc.getName()
+    self.workdirs = desc.getWorkDirs()
+    self.hadoopdir = tempdir
+    self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), 
+                                "confdir")
+    self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), 
+                               "logdir")
+    self.child = None
+    self.restart = restart
+    self.filledInKeyVals = []
+    self._createWorkDirs()
+    self._createHadoopSiteXml()
+    self._createHadoopLogDir()
+    self.__hadoopThread = None
+
+  def _createWorkDirs(self):
+    for dir in self.workdirs:
+      if os.path.exists(dir):
+        if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK):
+          raise ValueError, "Workdir %s does not allow rwx permission." % (dir)
+        continue
+      try:
+        os.makedirs(dir)
+      except:
+        pass
+
+  def getFilledInKeyValues(self):
+    return self.filledInKeyVals
+
+  def createXML(self, doc, attr, topElement, final):
+    for k,v in attr.iteritems():
+      self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v))
+      if ( v == "fillinport" ):
+        v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000))
+
+      keyvalpair = ''
+      if isinstance(v, (tuple, list)):
+        for item in v:
+          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)
+        keyvalpair = keyvalpair[:-1]
+      else:
+        keyvalpair = k + '=' + v
+
+      self.filledInKeyVals.append(keyvalpair)
+      if(k == "mapred.job.tracker"): # total hack for time's sake
+        keyvalpair = k + "=" + v
+        self.filledInKeyVals.append(keyvalpair)
+	
+      if ( v == "fillinhostport"):
+        port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000))
+        self.log.debug('Setting hostname to: %s' % local_fqdn())
+        v = local_fqdn() + ':' + port
+      
+      keyvalpair = ''
+      if isinstance(v, (tuple, list)):
+        for item in v:
+          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)
+        keyvalpair = keyvalpair[:-1]
+      else:
+        keyvalpair = k + '=' + v
+      
+      self.filledInKeyVals.append(keyvalpair)
+      if ( v == "fillindir"):
+        v = os.path.join('/mapredsystem', local_fqdn())
+        pass
+      
+      prop = None
+      if isinstance(v, (tuple, list)):
+        for item in v:
+          prop = self._createXmlElement(doc, k, item, "No description", final)
+          topElement.appendChild(prop)
+      else:
+        prop = self._createXmlElement(doc, k, v, "No description", final)
+        topElement.appendChild(prop)
+	
+  def _createHadoopSiteXml(self):
+    if self.restart:
+      if not os.path.exists(self.confdir):
+        os.makedirs(self.confdir)
+    else:
+      assert os.path.exists(self.confdir) == False
+      os.makedirs(self.confdir)
+
+    implementation = getDOMImplementation()
+    doc = implementation.createDocument('', 'configuration', None)
+    comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify")
+    topElement = doc.documentElement
+    topElement.appendChild(comment)
+    
+    attr = self.desc.getfinalAttrs()
+    self.createXML(doc, attr, topElement, True)
+    attr = self.desc.getAttrs()
+    self.createXML(doc, attr, topElement, False)
+              
+    
+    siteName = os.path.join(self.confdir, "hadoop-site.xml")
+    sitefile = file(siteName, 'w')
+    print >> sitefile, topElement.toxml()
+    sitefile.close()
+    self.log.debug('created %s' % (siteName))
+
+  def _createHadoopLogDir(self):
+    if self.restart:
+      if not os.path.exists(self.logdir):
+        os.makedirs(self.logdir)
+    else:
+      assert os.path.exists(self.logdir) == False
+      os.makedirs(self.logdir)
+
+  def _createXmlElement(self, doc, name, value, description, final):
+    prop = doc.createElement("property")
+    nameP = doc.createElement("name")
+    string = doc.createTextNode(name)
+    nameP.appendChild(string)
+    valueP = doc.createElement("value")
+    string = doc.createTextNode(value)
+    valueP.appendChild(string)
+    desc = doc.createElement("description")
+    string = doc.createTextNode(description)
+    desc.appendChild(string)
+    prop.appendChild(nameP)
+    prop.appendChild(valueP)
+    prop.appendChild(desc)
+    if (final):
+      felement = doc.createElement("final")
+      string = doc.createTextNode("true")
+      felement.appendChild(string)
+      prop.appendChild(felement)
+      pass
+    
+    return prop
+
+  def run(self, dir):
+    status = True
+    args = []
+    desc = self.desc
+    
+    self.log.debug(pprint.pformat(desc.dict))
+    
+    
+    self.log.debug("Got package dir of %s" % dir)
+    
+    self.path = os.path.join(dir, self.program)
+    
+    self.log.debug("path: %s" % self.path)
+    args.append(self.path)
+    args.extend(desc.getArgv())
+    envs = desc.getEnvs()
+    javaOpts = desc.getJavaOpts()
+    fenvs = os.environ
+    
+    for k, v in envs.iteritems():
+      fenvs[k] = v[0]
+    
+    self.log.debug(javaOpts)
+    fenvs['HADOOP_OPTS'] = ''
+    for option in javaOpts:
+      fenvs['HADOOP_OPTS'] = "%s%s " % (fenvs['HADOOP_OPTS'], option)
+    
+    fenvs['JAVA_HOME'] = self.javahome
+    fenvs['HADOOP_CONF_DIR'] = self.confdir
+    fenvs['HADOOP_LOG_DIR'] = self.logdir
+
+    self.log.info(pprint.pformat(fenvs))
+
+    hadoopCommand = ''
+    for item in args:
+        hadoopCommand = "%s%s " % (hadoopCommand, item)
+        
+    self.log.debug('running command: %s' % (hadoopCommand)) 
+    self.log.debug('hadoop env: %s' % fenvs)
+
+    self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs)
+    self.__hadoopThread.start()
+    
+    while self.__hadoopThread.stdin == None:
+      time.sleep(.2)
+      self.log.debug("hadoopThread still == None ...")
+    
+    input = desc.getStdin()
+    self.log.debug("hadoop input: %s" % input)
+    if input:
+      if self.__hadoopThread.is_running():
+        print >>self.__hadoopThread.stdin, input
+      else:
+        self.log.error("hadoop command failed to start")
+    
+    self.__hadoopThread.stdin.close()  
+    
+    self.log.debug("isForground: %s" % desc.isForeground())
+    if desc.isForeground():
+      self.log.debug("Waiting on hadoop to finish...")
+      self.__hadoopThread.wait()
+      
+      self.log.debug("Joining hadoop thread...")
+      self.__hadoopThread.join()
+      if self.__hadoopThread.exit_code() != 0:
+        status = False
+    else:
+      code = self.__hadoopThread.exit_code()
+      if code != 0 and code != None:
+        status = False
+        
+    self.log.debug("hadoop run status: %s" % status)    
+    
+    if status == False:
+      for item in self.__hadoopThread.output():
+        self.log.error(item)
+      self.log.error('hadoop error: %s' % (
+                       self.__hadoopThread.exit_status_string()))
+   
+    if (status == True) or (not desc.isIgnoreFailures()):
+      return status
+    else:
+      self.log.error("Ignoring Failure")
+      return True
+
+  def kill(self):
+    self.__hadoopThread.kill()
+    if self.__hadoopThread:
+      self.__hadoopThread.join()
+
+  def addCleanup(self, list):
+    list.extend(self.workdirs)
+    list.append(self.confdir)
+
+class HodRing(hodBaseService):
+  """The main class for hodring that
+  polls the commands it runs"""
+  def __init__(self, config):
+    hodBaseService.__init__(self, 'hodring', config['hodring'])
+    self.log = self.logs['main']
+    self._http = None
+    self.__pkg = None
+    self.__pkgDir = None 
+    self.__tempDir = None
+    self.__running = {}
+    self.__hadoopLogDirs = []
+    self.__init_temp_dir()
+
+  def __init_temp_dir(self):
+    self.__tempDir = os.path.join(self._cfg['temp-dir'], 
+                                  "%s.%s.hodring" % (self._cfg['userid'], 
+                                                      self._cfg['service-id']))
+    if not os.path.exists(self.__tempDir):
+      os.makedirs(self.__tempDir)
+    os.chdir(self.__tempDir)  
+
+  def __fetch(self, url, spath):
+    retry = 3
+    success = False
+    while (retry != 0 and success != True):
+      try:
+        input = urllib.urlopen(url)
+        bufsz = 81920
+        buf = input.read(bufsz)
+        out = open(spath, 'w')
+        while len(buf) > 0:
+          out.write(buf)
+          buf = input.read(bufsz)
+        input.close()
+        out.close()
+        success = True
+      except:
+        self.log.debug("Failed to copy file")
+        retry = retry - 1
+    if (retry == 0 and success != True):
+      raise IOError, "Failed to copy the files"
+
+      
+  def __get_name(self, addr):
+    parsedUrl = urlparse(addr)
+    path = parsedUrl[2]
+    split = path.split('/', 1)
+    return split[1]
+
+  def __get_dir(self, name):
+    """Return the root directory inside the tarball
+    specified by name. Assumes that the tarball begins
+    with a root directory."""
+    import tarfile
+    myTarFile = tarfile.open(name)
+    hadoopPackage = myTarFile.getnames()[0]
+    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
+    return hadoopPackage
+
+  def __download_package(self, ringClient):
+    self.log.debug("Found download address: %s" % 
+                   self._cfg['download-addr'])
+    try:
+      addr = 'none'
+      downloadTime = self._cfg['tarball-retry-initial-time']           # download time depends on tarball size and network bandwidth
+      
+      increment = 0
+      
+      addr = ringClient.getTarList(self.hostname)
+
+      while(addr == 'none'):
+        rand = self._cfg['tarball-retry-initial-time'] + increment + \
+                        random.uniform(0,self._cfg['tarball-retry-interval'])
+        increment = increment + 1
+        self.log.debug("got no tarball. Retrying again in %s seconds." % rand)
+        time.sleep(rand)
+        addr = ringClient.getTarList(self.hostname)
+
+    
+      self.log.debug("got this address %s" % addr)
+      
+      tarName = self.__get_name(addr)
+      self.log.debug("tar package name: %s" % tarName)
+      
+      fetchPath = os.path.join(os.getcwd(), tarName) 
+      self.log.debug("fetch path: %s" % fetchPath)
+      
+      self.__fetch(addr, fetchPath)
+      self.log.debug("done fetching")
+    
+      tarUrl = "http://%s:%d/%s" % (self._http.server_address[0], 
+                                    self._http.server_address[1], 
+                                    tarName)
+      try: 
+        ringClient.registerTarSource(self.hostname, tarUrl,addr)
+        #ringClient.tarDone(addr)
+      except KeyError, e:
+        self.log.error("registerTarSource and tarDone failed: ", e)
+        raise KeyError(e)
+      
+      check = untar(fetchPath, os.getcwd())
+      
+      if (check == False):
+        raise IOError, "Untarring failed."
+      
+      self.__pkg = self.__get_dir(tarName)
+      self.__pkgDir = os.path.join(os.getcwd(), self.__pkg)      
+    except Exception, e:
+      self.log.error("Failed download tar package: %s" % 
+                     get_exception_error_string())
+      raise Exception(e)
+      
+  def __run_hadoop_commands(self, restart=True):
+    id = 0
+    for desc in self._cfg['commanddesc']:
+      self.log.debug(pprint.pformat(desc.dict))
+      cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log, 
+                          self._cfg['java-home'], restart)
+    
+      self.__hadoopLogDirs.append(cmd.logdir)
+      self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs)
+      
+      try:
+        # if the tarball isn't there, we use the pkgs dir given.
+        if self.__pkgDir == None:
+          pkgdir = desc.getPkgDirs()
+        else:
+          pkgdir = self.__pkgDir
+
+        self.log.debug('This is the packcage dir %s ' % (pkgdir))
+        if not cmd.run(pkgdir):
+          raise ValueError, "Can't launch command: %s" % pkgdir
+      except Exception, e:
+        print get_exception_string()
+        self.__running[id] = cmd
+        raise Exception(e)
+
+      id += 1
+      if desc.isForeground():
+        continue
+      self.__running[id-1] = cmd
+
+  def stop(self):
+    self.log.debug("Entered hodring stop.")
+    if self._http: 
+      self.log.debug("stopping http server...")
+      self._http.stop()
+    
+    self.log.debug("call hodsvcrgy stop...")
+    hodBaseService.stop(self)
+    
+    self.clean_up()
+    
+  def clean_up(self):
+    os.chdir(originalcwd)
+    if not mswindows:
+      # do the UNIX double-fork magic, see Stevens' "Advanced 
+      # Programming in the UNIX Environment" for details (ISBN 0201563177)
+      try: 
+        pid = os.fork() 
+        if pid > 0:
+          # exit first parent
+          sys.exit(0) 
+      except OSError, e: 
+        self.log.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror)) 
+        sys.exit(1)
+
+      # decouple from parent environment
+      os.chdir("/") 
+      os.setsid() 
+      os.umask(0) 
+
+      # do second fork
+      try: 
+        pid = os.fork() 
+        if pid > 0:
+          # exit from second parent, print eventual PID before
+          sys.exit(0) 
+      except OSError, e: 
+        self.log.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
+        sys.exit(1) 
+
+    try:
+#      for cmd in self.__running.values():
+#        self.log.debug("killing %s..." % cmd)
+#        cmd.kill()
+  
+      list = []
+      
+      for cmd in self.__running.values():
+        self.log.debug("addding %s to cleanup list..." % cmd)
+        cmd.addCleanup(list)
+      
+      list.append(self.__tempDir)
+         
+      self.__archive_logs()   
+          
+      for dir in list:
+        if os.path.exists(dir):
+          self.log.debug('removing %s' % (dir))
+          shutil.rmtree(dir, True)
+    except:
+      self.log.error(get_exception_string())
+    sys.exit(0)
+
+  def _xr_method_clusterStart(self, initialize=True):
+    return self.clusterStart(initialize)
+
+  def _xr_method_clusterStop(self):
+    return self.clusterStop()
+ 
+  def __copy_archive_to_dfs(self, archiveFile):        
+    hdfsURIMatch = reHdfsURI.match(self._cfg['log-destination-uri'])
+    
+    # FIXME this is a complete and utter hack. Currently hadoop is broken
+    # and does not understand hdfs:// syntax on the command line :(
+    
+    pid = os.getpid()
+    tempConfDir = '/tmp/%s' % pid
+    os.mkdir(tempConfDir)
+    tempConfFileName = '%s/hadoop-site.xml' % tempConfDir
+    tempHadoopConfig = open(tempConfFileName, 'w')
+    print >>tempHadoopConfig, "<configuration>"
+    print >>tempHadoopConfig, "  <property>"
+    print >>tempHadoopConfig, "    <name>fs.default.name</name>"
+    print >>tempHadoopConfig, "    <value>%s</value>" % hdfsURIMatch.group(1)
+    print >>tempHadoopConfig, "    <description>No description</description>"
+    print >>tempHadoopConfig, "  </property>"
+    print >>tempHadoopConfig, "</configuration>"
+    tempHadoopConfig.close()
+    
+    # END LAME HACK
+    
+    (head, tail) = os.path.split(archiveFile)
+    destFile = os.path.join(hdfsURIMatch.group(2), self._cfg['userid'], 
+                            self._cfg['service-id'], tail)
+    
+    self.log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
+    
+    runningHadoops = self.__running.values()
+    if (len(runningHadoops) == 0):
+      self.log.info("len(runningHadoops) == 0, No running cluster?")
+      self.log.info("Skipping __copy_archive_to_dfs")
+      return
+ 
+    run = runningHadoops[0]
+    hadoopCmd = run.path
+    if self._cfg.has_key('pkgs'):
+      hadoopCmd = os.path.join(self._cfg['pkgs'], 'bin', 'hadoop')
+
+    # LAME HACK AGAIN, using config generated above :( 
+    copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd, 
+      tempConfDir, archiveFile, destFile)
+    
+    self.log.debug(copyCommand)
+    
+    copyThread = simpleCommand('hadoop', copyCommand)
+    copyThread.start()
+    copyThread.wait()
+    copyThread.join()
+    self.log.debug(pprint.pformat(copyThread.output()))
+    
+    # LAME HACK AGAIN, deleting config generated above :( 
+    os.unlink(tempConfFileName)
+    os.rmdir(tempConfDir)
+    os.unlink(archiveFile)
+  
+  def __archive_logs(self):
+    status = True
+    if self._cfg.has_key("log-destination-uri"):
+      try:
+        if self.__hadoopLogDirs:
+          date = time.localtime()
+          for logDir in self.__hadoopLogDirs:
+            (head, tail) = os.path.split(logDir)
+            (head, logType) = os.path.split(head)
+            tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
+              logType, local_fqdn(), date[0], date[1], date[2], date[3], 
+              date[4], date[5], random.randint(0,1000))
+            
+            if self._cfg["log-destination-uri"].startswith('file://'):
+              tarBallFile = os.path.join(self._cfg["log-destination-uri"][7:], 
+                                         tarBallFile)
+            else:
+              tarBallFile = os.path.join(self._cfg['temp-dir'], tarBallFile)
+            
+            self.log.info('archiving log files to: %s' % tarBallFile)
+            status = tar(tarBallFile, logDir, ['*',])
+            self.log.info('archive %s status: %s' % (tarBallFile, status))
+            if status and \
+              self._cfg["log-destination-uri"].startswith('hdfs://'):
+              self.__copy_archive_to_dfs(tarBallFile)
+          dict = {} 
+      except:
+        self.log.error(get_exception_string())
+      
+    return status
+      
+  def start(self):
+    """Run and maintain hodring commands"""
+    
+    try:
+      if self._cfg.has_key('download-addr'):
+        self._http = threadedHTTPServer('', self._cfg['http-port-range'])
+        self.log.info("Starting http server...")
+        self._http.serve_forever()
+        self.log.debug("http://%s:%d" % (self._http.server_address[0],
+                     self._http.server_address[1]))
+      
+      hodBaseService.start(self)
+      
+      ringXRAddress = None
+      if self._cfg.has_key('ringmaster-xrs-addr'):
+        ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0],
+                          self._cfg['ringmaster-xrs-addr'][1])
+        self.log.debug("Ringmaster at %s" % ringXRAddress)
+
+      self.log.debug("Creating service registry XML-RPC client.")
+      serviceClient = hodXRClient(to_http_url(
+                                  self._cfg['svcrgy-addr']))
+      if ringXRAddress == None:
+        self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.")
+        ringList = serviceClient.getServiceInfo(self._cfg['userid'], 
+            self._cfg['service-id'], 'ringmaster', 'hod')
+      
+        self.log.debug(pprint.pformat(ringList))
+      
+        if len(ringList):
+          if isinstance(ringList, list):
+            ringXRAddress = ringList[0]['xrs']
+      
+        count = 0
+        while (ringXRAddress == None and count < 3000):
+          ringList = serviceClient.getServiceInfo(self._cfg['userid'], 
+            self._cfg['service-id'], 'ringmaster', 'hod')
+        
+          if len(ringList):
+            if isinstance(ringList, list):
+              ringXRAddress = ringList[0]['xrs']
+        
+          count = count + 1
+          time.sleep(.2)
+      
+      if ringXRAddress == None:
+        raise Exception("Could not get ringmaster XML-RPC server address.")
+        
+      self.log.debug("Creating ringmaster XML-RPC client.")
+      ringClient = hodXRClient(ringXRAddress)    
+      
+      id = self.hostname + "_" + str(os.getpid())
+      
+      if 'download-addr' in self._cfg:
+        self.__download_package(ringClient)
+      else:
+        self.log.debug("Did not find a download address.")
+          
+      cmdlist = []
+      firstTime = True
+      increment = 0
+      hadoopStartupTime = 2
+       
+      cmdlist = ringClient.getCommand(id)
+
+      while (cmdlist == []):
+        if firstTime:
+          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\
+                        + random.uniform(0,self._cfg['cmd-retry-interval'])
+          firstTime = False
+        else:
+          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \
+                        + random.uniform(0,self._cfg['cmd-retry-interval'])
+        self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime))
+        time.sleep(sleepTime)
+        increment = increment + 1
+        cmdlist = ringClient.getCommand(id)
+
+      self.log.debug(pformat(cmdlist)) 
+      cmdDescs = []
+      for cmds in cmdlist:
+        cmdDescs.append(CommandDesc(cmds['dict'], self.log))
+  
+      self._cfg['commanddesc'] = cmdDescs
+      
+      self.log.info("Running hadoop commands...")
+
+      self.__run_hadoop_commands(False)
+        
+      masterParams = []
+      for k, cmd in self.__running.iteritems():
+        masterParams.extend(cmd.filledInKeyVals)
+  
+      self.log.debug("printing getparams")
+      self.log.debug(pformat(id))
+      self.log.debug(pformat(masterParams))
+      # when this is on a required host, the ringMaster already has our masterParams
+      if(len(masterParams) > 0):
+        ringClient.addMasterParams(id, masterParams)
+    except Exception, e:
+      raise Exception(e)
+
+  def clusterStart(self, initialize=True):
+    """Start a stopped mapreduce/dfs cluster"""
+    if initialize:
+      self.log.debug('clusterStart Method Invoked - Initialize')
+    else:
+      self.log.debug('clusterStart Method Invoked - No Initialize')
+    try:
+      self.log.debug("Creating service registry XML-RPC client.")
+      serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']),
+                                  None, None, 0, 0, 0)
+
+      self.log.info("Fetching ringmaster information from service registry.")
+      count = 0
+      ringXRAddress = None
+      while (ringXRAddress == None and count < 3000):
+        ringList = serviceClient.getServiceInfo(self._cfg['userid'],
+          self._cfg['service-id'], 'ringmaster', 'hod')
+        if len(ringList):
+          if isinstance(ringList, list):
+            ringXRAddress = ringList[0]['xrs']
+        count = count + 1
+
+      if ringXRAddress == None:
+        raise Exception("Could not get ringmaster XML-RPC server address.")
+
+      self.log.debug("Creating ringmaster XML-RPC client.")
+      ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0)
+
+      id = self.hostname + "_" + str(os.getpid())
+
+      cmdlist = []
+      if initialize:
+        if 'download-addr' in self._cfg:
+          self.__download_package(ringClient)
+        else:
+          self.log.debug("Did not find a download address.")
+        while (cmdlist == []):
+          cmdlist = ringClient.getCommand(id)
+      else:
+        while (cmdlist == []):
+          cmdlist = ringClient.getAdminCommand(id)
+
+      self.log.debug(pformat(cmdlist))
+      cmdDescs = []
+      for cmds in cmdlist:
+        cmdDescs.append(CommandDesc(cmds['dict'], self.log))
+
+      self._cfg['commanddesc'] = cmdDescs
+
+      if initialize:
+        self.log.info("Running hadoop commands again... - Initialize")
+        self.__run_hadoop_commands()
+        masterParams = []
+        for k, cmd in self.__running.iteritems():
+          self.log.debug(cmd)
+          masterParams.extend(cmd.filledInKeyVals)
+
+        self.log.debug("printing getparams")
+        self.log.debug(pformat(id))
+        self.log.debug(pformat(masterParams))
+        # when this is on a required host, the ringMaster already has our masterParams
+        if(len(masterParams) > 0):
+          ringClient.addMasterParams(id, masterParams)
+      else:
+        self.log.info("Running hadoop commands again... - No Initialize")
+        self.__run_hadoop_commands()
+
+    except:
+      self.log.error(get_exception_string())
+
+    return True
+
+  def clusterStop(self):
+    """Stop a running mapreduce/dfs cluster without stopping the hodring"""
+    self.log.debug('clusterStop Method Invoked')
+    try:
+      for cmd in self.__running.values():
+        cmd.kill()
+      self.__running = {}
+    except:
+      self.log.error(get_exception_string())
+
+    return True

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/HodRing/hodRing.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,292 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+"""Maui/Torque implementation of NodePool"""
+# -*- python -*-
+
+import os, sys, csv, socket, time, re, pprint
+
+from hodlib.Hod.nodePool import *
+from hodlib.Schedulers.torque import torqueInterface
+from hodlib.Common.threads import simpleCommand
+from hodlib.Common.util import get_exception_string, args_to_string, local_fqdn
+
+class TorqueNodeSet(NodeSet):
+  def __init__(self, id, numNodes, preferredList, isPreemptee):
+    NodeSet.__init__(self, id, numNodes, preferredList, isPreemptee)
+    self.qsubId = None
+    self.addrList = []
+
+  def _setQsubId(self, qsubId):
+    self.qsubId = qsubId
+
+  def _setAddrList(self, addrList):
+    self.addrList = addrList
+
+  def getAddrList(self):
+    return self.addrList
+
+class TorquePool(NodePool):
+  def __init__(self, nodePoolDesc, cfg, log):
+    NodePool.__init__(self, nodePoolDesc, cfg, log)
+
+    environ = os.environ.copy()
+    
+    if self._cfg['resource_manager'].has_key('pbs-server'):
+      environ['PBS_DEFAULT'] = self._cfg['resource_manager']['pbs-server']
+
+    self.__torque = torqueInterface(
+      self._cfg['resource_manager']['batch-home'], environ, self._log)
+
+  def __gen_submit_params(self, nodeSet, walltime = None, qosLevel = None, 
+                          account = None):
+    argList = []
+    stdinList = []
+    
+    npd = self.nodePoolDesc
+    
+    def gen_stdin_list():
+      # Here we are basically generating the standard input for qsub.
+      #  Specifically a script to exec ringmaster.
+      stdinList.append('#!/bin/sh')
+      
+      ringBin = os.path.join(self._cfg['hod']['base-dir'], 'bin', 
+                             'ringmaster')
+      ringArgs = [ringBin,]
+      ringArgs.extend(self._cfg.get_args(exclude=('hod')))
+      
+      ringMasterCommand = args_to_string(ringArgs)
+      
+      self._log.debug("ringmaster cmd: %s" % ringMasterCommand)
+      
+      stdinList.append(ringMasterCommand)
+      
+    def gen_arg_list():      
+      def process_qsub_attributes():
+        rawAttributes = self.nodePoolDesc.getAttrs()
+    
+        # 'W:x' is used to specify torque management extentensions ie -W x= ...
+        resourceManagementExtensions = ''
+        if 'W:x' in rawAttributes:
+          resourceManagementExtensions = rawAttributes['W:x']
+    
+        if qosLevel:
+          if len(resourceManagementExtensions) > 0:
+            resourceManagementExtensions += ';'
+          resourceManagementExtensions += 'QOS:%s' % (qosLevel)
+    
+        rawAttributes['W:x'] = resourceManagementExtensions
+        
+        hostname = local_fqdn()
+    
+        rawAttributes['l:nodes'] = nodeSet._getNumNodes()
+        
+        if walltime:
+          rawAttributes['l:walltime'] = walltime
+        
+        #create a dict of dictionaries for 
+        # various arguments of torque
+        cmds = {}
+        for key in rawAttributes:
+          value = rawAttributes[key]
+    
+          if key.find(':') == -1:
+            raise ValueError, 'Syntax error: missing colon after %s in %s=%s' % (
+              key, key, value)
+    
+          [option, subOption] = key.split(':', 1)
+          if not option in cmds:
+            cmds[option] = {}
+          cmds[option][subOption] = value
+        
+        opts = []
+        #create a string from this
+        #dictionary of dictionaries createde above
+        for k in cmds:
+          csv = []
+          nv = cmds[k]
+          for n in nv:
+            v = nv[n]
+            if len(n) == 0:
+              csv.append(v)
+            else:
+              csv.append('%s=%s' % (n, v))
+          opts.append('-%s' % (k))
+          opts.append(','.join(csv))
+    
+        for option in cmds:
+          commandList = []
+          for subOption in cmds[option]:
+            value = cmds[option][subOption]
+            if len(subOption) == 0:
+                commandList.append(value)
+            else:
+                commandList.append("%s=%s" % (subOption, value))
+          opts.append('-%s' % option)
+          opts.append(','.join(commandList))
+          
+        return opts
+      
+      pkgdir = npd.getPkgDir()
+  
+      qsub = os.path.join(pkgdir, 'bin', 'qsub')
+      sdd = self._cfg['servicedesc']
+      
+      gsvc = None
+      for key in sdd:
+        gsvc = sdd[key]
+        break
+      
+      argList.extend(process_qsub_attributes())
+      argList.extend(('-N', 'HOD'))
+      argList.extend(('-r','n'))
+
+      if 'pbs-user' in self._cfg['resource_manager']:
+        argList.extend(('-u', self._cfg['resource_manager']['pbs-user']))
+  
+      argList.extend(('-d','/tmp/'))
+      if 'queue' in self._cfg['resource_manager']:
+        queue = self._cfg['resource_manager']['queue']
+        argList.extend(('-q',queue))
+  
+      # accounting should recognize userid:pbs-account as being "charged"
+      argList.extend(('-A', (self._cfg['hod']['userid'] + ':' + 
+                   self._cfg['resource_manager']['pbs-account'])))
+    
+      if 'env-vars' in self._cfg['resource_manager']:
+        qsub_envs = self._cfg['resource_manager']['env-vars']
+        argList.extend(('-v', self.__keyValToString(qsub_envs)))
+
+    gen_arg_list()
+    gen_stdin_list()
+    
+    return argList, stdinList
+    
+  def __keyValToString(self, keyValList):
+    ret = ""
+    for key in keyValList:
+      ret = "%s%s=%s," % (ret, key, keyValList[key][0])
+    return ret[:-1]
+  
+  def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
+    if not id:
+      id = self.getNextNodeSetId()
+    
+    nodeSet = TorqueNodeSet(id, numNodes, preferred, isPreemptee)
+
+    self.nodeSetDict[nodeSet.getId()] = nodeSet
+    
+    return nodeSet
+      
+  def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None, 
+                    account = None):
+
+    argList, stdinList = self.__gen_submit_params(nodeSet, walltime, qosLevel, 
+                                                  account)
+    
+    jobId, exitCode = self.__torque.qsub(argList, stdinList)
+    
+    nodeSet.qsubId = jobId
+
+    return jobId, exitCode
+
+  def freeNodeSet(self, nodeSet):
+    
+    exitCode = self.deleteJob(nodeSet.getId())
+    
+    del self.nodeSetDict[nodeSet.getId()]
+  
+    return exitCode
+  
+  def finalize(self):
+    status = 0
+    exitCode = 0
+    for nodeSet in self.nodeSetDict.values():
+      exitCode = self.freeNodeSet(nodeSet)
+      
+    if exitCode > 0 and exitCode != 153:
+      status = 4
+      
+    return status
+    
+  def getWorkers(self):
+    hosts = []
+    
+    qstatInfo = self.__torque(self.getServiceId())
+    if qstatInfo:
+      hosts = qstatInfop['exec_host']
+    
+    return hosts
+ 
+  def pollNodeSet(self, nodeSet):
+    status = NodeSet.COMPLETE  
+    nodeSet = self.nodeSetDict[0] 
+
+    qstatInfo = self.__torque(self.getServiceId())
+
+    if qstatMap:    
+      jobstate = qstatMap['job_state']
+      exechost = qstatMap['exec_host']
+
+    if jobstate == 'Q':
+      status = NodeSet.PENDING
+    elif exechost == None:
+      status = NodeSet.COMMITTED
+    else:
+      nodeSet._setAddrList(exec_host)
+
+    return status
+        
+  def getServiceId(self):
+    id = None
+    
+    nodeSets = self.nodeSetDict.values()
+    if len(nodeSets):
+      id = nodeSets[0].qsubId
+      
+    if id == None:
+      id = os.getenv('PBS_JOBID')
+      
+    return id
+
+  def getJobState(self):
+    #torque error code when credentials fail, a temporary condition sometimes.
+    credFailureErrorCode = 171 
+    credFailureRetries = 10
+    i = 0
+    jobState = False
+    
+    while i < credFailureRetries:
+      qstatInfo, exitCode = self.__torque.qstat(self.getServiceId())
+      if exitCode == 0:
+        jobState = qstatInfo['job_state'] 
+        break
+      else:
+        if exitCode == credFailureErrorCode:
+          time.sleep(1)
+          i = i+1
+        else:
+          break
+    return jobState
+
+  def deleteJob(self, jobId):
+    exitCode = self.__torque.qdel(jobId)
+    return exitCode
+    
+  def runWorkers(self, args):
+    return self.__torque.pbsdsh(args)
+
+
+

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/NodePools/torque.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/RingMaster/__init__.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL



Mime
View raw message