hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject svn commit: r608950 [3/7] - in /lucene/hadoop/trunk/src: contrib/hod/ contrib/hod/bin/ contrib/hod/conf/ contrib/hod/hodlib/ contrib/hod/hodlib/AllocationManagers/ contrib/hod/hodlib/Common/ contrib/hod/hodlib/GridServices/ contrib/hod/hodlib/Hod/ cont...
Date Fri, 04 Jan 2008 18:20:22 GMT
Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,912 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
+# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
+#
+# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
+#------------------------------------------------------------------------------
+
+"""'setup' provides for reading and verifing configuration files based on
+   Python's SafeConfigParser class."""
+
+import sys, os, re, pprint
+
+from ConfigParser import SafeConfigParser
+from optparse import OptionParser, IndentedHelpFormatter, OptionGroup
+from util import get_perms
+from types import typeValidator, is_valid_type, typeToString
+
+reEmailAddress = re.compile("^.*@.*$")
+reEmailDelimit = re.compile("@")
+reComma = re.compile("\s*,\s*")
+reDot = re.compile("\.")
+reCommentHack = re.compile("^.*?\s+#|;.*", flags=re.S)
+reCommentNewline = re.compile("\W$")
+reKeyVal = r"(?<!\\)="
+reKeyVal = re.compile(reKeyVal)
+
+errorPrefix = 'error'
+requiredPerms = '0660'
+
+class definition:
+    def __init__(self):
+        """Generates a configuration definition object."""
+        self.__def = {}
+        self.__defOrder = []
+
+    def __repr__(self):
+        return pprint.pformat(self.__def)  
+
+    def __getitem__(self, section):
+        return self.__def[section]
+
+    def __iter__(self):
+        return iter(self.__def)
+
+    def sections(self):
+        """Returns a list of sections/groups."""
+        
+        if len(self.__defOrder):
+            return self.__defOrder
+        else:  
+            return self.__def.keys()
+      
+    def add_section(self, section):
+        """Add a configuration section / option group."""
+        
+        if self.__def.has_key(section):
+            raise Exception("Section already exists: '%s'" % section)
+        else:
+            self.__def[section] = {}
+
+    def add_def(self, section, var, type, desc, help = True, default = None, 
+                req = True, validate = True, short = None):
+        """ Add a variable definition.
+        
+            section  - section name
+            var      - variable name
+            type     - valid hodlib.types
+            desc     - description of variable
+            help     - display help for this variable
+            default  - default value
+            req      - bool, requried?
+            validate - bool, validate type value?
+            short    - short symbol (1 character),
+            help     - bool, display help?"""
+            
+        if self.__def.has_key(section):
+            if not is_valid_type(type):
+                raise Exception("Type (type) is invalid: %s.%s - '%s'" % (section, var, 
+                                                                type))
+            if not isinstance(desc, str):
+                raise Exception("Description (desc) must be a string: %s.%s - '%s'" % (
+                    section, var, desc))
+            if not isinstance(req, bool):
+                raise Exception("Required (req) must be a bool: %s.%s - '%s'" % (section, 
+                                                                       var, 
+                                                                       req))
+            if not isinstance(validate, bool):
+                raise Exception("Validate (validate) must be a bool: %s.%s - '%s'" % (
+                    section, var, validate))
+              
+            if self.__def[section].has_key(var):
+                raise Exception("Variable name already defined: '%s'" % var)
+            else:
+                self.__def[section][var] = { 'type'     : type,
+                                             'desc'     : desc,
+                                             'help'     : help,
+                                             'default'  : default,
+                                             'req'      : req,
+                                             'validate' : validate,
+                                             'short'    : short }                
+        else:    
+            raise Exception("Section does not exist: '%s'" % section)
+          
+    def add_defs(self, defList, defOrder=None):
+        """ Add a series of definitions.
+        
+            defList = { section0 : ((name0, 
+                                     type0, 
+                                     desc0, 
+                                     help0,
+                                     default0,
+                                     req0, 
+                                     validate0,
+                                     short0),
+                                  ....
+                                    (nameN, 
+                                     typeN, 
+                                     descN,
+                                     helpN, 
+                                     defaultN, 
+                                     reqN, 
+                                     validateN,
+                                     shortN)),             
+                           ....
+                           
+                        sectionN : ... }
+                        
+            Where the short synmbol is optional and can only be one char."""
+                        
+        for section in defList.keys():
+            self.add_section(section)
+            for defTuple in defList[section]:
+                if isinstance(defTuple, tuple): 
+                    if len(defTuple) < 7:
+                        raise Exception(
+                            "section %s is missing an element: %s" % (
+                            section, pprint.pformat(defTuple)))
+                else:
+                    raise Exception("section %s of defList is not a tuple" % 
+                                    section)
+                
+                if len(defTuple) == 7:
+                    self.add_def(section, defTuple[0], defTuple[1], 
+                                 defTuple[2], defTuple[3], defTuple[4], 
+                                 defTuple[5], defTuple[6])
+                else:
+                    self.add_def(section, defTuple[0], defTuple[1], 
+                                 defTuple[2], defTuple[3], defTuple[4], 
+                                 defTuple[5], defTuple[6], defTuple[7])                     
+        if defOrder:
+            for section in defOrder:
+                if section in self.__def:
+                    self.__defOrder.append(section)
+                    
+            for section in self.__def:
+                if not section in defOrder:
+                    raise Exception(
+                        "section %s is missing from specified defOrder." % 
+                        section)
+            
+class baseConfig:
+    def __init__(self, configDef, originalDir=None):
+        self.__toString = typeToString()
+        self.__validated = False
+        self._configDef = configDef
+        self._options = None
+        self._mySections = []
+        self._dict = {}
+        self.configFile = None
+        self.__originalDir = originalDir
+
+        if self._configDef:
+            self._mySections = configDef.sections()
+
+    def __repr__(self):
+        """Returns a string representation of a config object including all
+           normalizations."""
+
+        print_string = '';
+        for section in self._mySections:
+            print_string = "%s[%s]\n" % (print_string, section)
+            options = self._dict[section].keys()
+            for option in options:
+                print_string = "%s%s = %s\n" % (print_string, option,
+                    self._dict[section][option])
+
+            print_string = "%s\n" % (print_string)
+
+        print_string = re.sub("\n\n$", "", print_string)
+
+        return print_string
+
+    def __getitem__(self, section):
+        """ Returns a dictionary of configuration name and values by section.
+        """
+        return self._dict[section]
+
+    def __setitem__(self, section, value):
+        self._dict[section] = value
+
+    def __iter__(self):
+        return iter(self._dict)
+
+    def has_key(self, section):
+        status = False
+        if section in self._dict:
+            status = True
+            
+        return status
+
+    # 'private' method which prints an configuration error messages
+    def var_error(self, section, option, *addData):
+        errorStrings = []  
+        if not self._dict[section].has_key(option):
+          self._dict[section][option] = None
+        errorStrings.append("%s: invalid '%s' specified in section %s: %s" % (
+            errorPrefix, option, section, self._dict[section][option]))
+
+        if addData:
+            errorStrings.append("%s: additional info: %s\n" % (errorPrefix,
+                addData[0]))
+        return errorStrings
+
+    def var_error_suggest(self, errorStrings):
+        if self.configFile:
+            errorStrings.append("See configuration file: %s" % \
+                self.configFile)
+        
+        if self._options:
+            errorStrings.append("Configuration can be overridden by options, see -h")
+    
+    def __get_args(self, section):
+        def __dummyToString(type, value):
+            return value
+        
+        toString = __dummyToString
+        if self.__validated:
+            toString = self.__toString
+            
+        args = []
+        if isinstance(self._dict[section], dict):
+            for option in self._dict[section]:
+                if section in self._configDef and \
+                option in self._configDef[section]:
+                  if self._configDef[section][option]['type'] == 'bool':
+                    if self._dict[section][option] == 'True' or \
+                        self._dict[section][option] == True:
+                        args.append("--%s.%s" % (section, option))
+                  else:
+                    args.append("--%s.%s" % (section, option))
+                    args.append(toString(
+                           self._configDef[section][option]['type'], 
+                           self._dict[section][option]))
+        else:
+            if section in self._configDef:
+              if self._configDef[section][option]['type'] == 'bool':
+                if self._dict[section] == 'True' or \
+                    self._dict[section] == True:
+                    args.append("--%s" % section)
+              else:
+                if self._dict[section] != 'config':
+                  args.append("--%s" % section)
+                  args.append(toString(self._configDef[section]['type'], 
+                                             self._dict[section]))
+                    
+        return args
+                
+    def values(self):
+        return self._dict.values()
+      
+    def keys(self):
+        return self._dict.keys()
+    
+    def get_args(self, exclude=None, section=None):
+        """Retrieve a tuple of config arguments."""
+        
+        args = []
+        if section:
+            args = self.__get_args(section)
+        else:
+            for section in self._dict:
+                if exclude:
+                    if not section in exclude:
+                        args.extend(self.__get_args(section))
+                else:
+                    args.extend(self.__get_args(section))
+        
+        return tuple(args)
+        
+    def verify(self):
+        """Verifies each configuration variable, using the configValidator
+           class, based on its type as defined by the dictionary configDef.
+           Upon encountering a problem an error is printed to STDERR and
+           false is returned."""
+        
+        oldDir = os.getcwd()
+        if self.__originalDir:
+          os.chdir(self.__originalDir)
+        
+        status = True
+        statusMsgs = []
+	
+        if self._configDef:
+            errorCount = 0
+            configValidator = typeValidator()
+
+            # foreach section and option by type string as defined in configDef
+            #   add value to be validated to validator
+            for section in self._mySections:
+                for option in self._configDef[section].keys():
+                    configVarName = "%s.%s" % (section, option)
+
+                    if self._dict[section].has_key(option):
+                        if self._configDef[section][option].has_key('validate'):
+                            if self._configDef[section][option]['validate']:
+                                # is the section.option needed to be validated?
+                                configValidator.add(configVarName,
+                                    self._configDef[section][option]['type'],
+                                    self._dict[section][option])
+                            elif self._configDef[section][option]['type'] \
+                                != 'file' and \
+                                self._configDef[section][option]['type'] != \
+                                'directory':
+                                self[section][option] = \
+                                    configValidator.normalize(
+                                    self._configDef[section][option]['type'], 
+                                    self._dict[section][option])
+                            if self._configDef[section][option]['default'] != \
+                                None:
+                                self._configDef[section][option]['default'] = \
+                                    configValidator.normalize(
+                                    self._configDef[section][option]['type'],
+                                    self._configDef[section][option]['default']
+                                    )
+                                self._configDef[section][option]['default'] = \
+                                    self.__toString(
+                                    self._configDef[section][option]['type'], 
+                                    self._configDef[section][option]['default']
+                                    )
+                        else:        
+                            configValidator.add(configVarName,
+                                self._configDef[section][option]['type'],
+                                self._dict[section][option])
+                    elif self._configDef[section][option]['req']:
+                        statusMsgs.append("%s: %s.%s is not defined."
+                             % (errorPrefix, section, option))
+                        errorCount = errorCount + 1                         
+
+            configValidator.validate()
+
+            for valueInfo in configValidator.validatedInfo:
+                sectionsOptions = reDot.split(valueInfo['name'])
+
+                if valueInfo['isValid'] == 1:
+                    self._dict[sectionsOptions[0]][sectionsOptions[1]] = \
+                        valueInfo['normalized']
+                else:
+                    if valueInfo['errorData']:
+                        statusMsgs.extend(self.var_error(sectionsOptions[0],
+                            sectionsOptions[1], valueInfo['errorData']))
+                    else:
+                        statusMsgs.extend(self.var_error(sectionsOptions[0],
+                            sectionsOptions[1]))
+                    errorCount = errorCount + 1
+
+            if errorCount > 1:
+                statusMsgs.append( "%s: %s problems found." % (
+                    errorPrefix, errorCount))
+                self.var_error_suggest(statusMsgs)
+                status = False
+            elif errorCount > 0:
+                statusMsgs.append( "%s: %s problem found." % (
+                    errorPrefix, errorCount))
+                self.var_error_suggest(statusMsgs)
+                status = False
+        
+        self.__validated = True
+
+        if self.__originalDir:
+          os.chdir(oldDir)
+
+        return status,statusMsgs
+
+class config(SafeConfigParser, baseConfig):
+    def __init__(self, configFile, configDef=None, originalDir=None, 
+                 options=None, checkPerms=False):
+        """Constructs config object.
+
+           configFile - configuration file to read
+           configDef  - definition object
+           options    - options object
+           checkPerms - check file permission on config file, 0660
+
+           sample configuration file:
+
+            [snis]
+            modules_dir  = modules/       ; location of infoModules
+            md5_defs_dir = etc/md5_defs   ; location of infoTree md5 defs
+            info_store   = var/info       ; location of nodeInfo store
+            cam_daemon   = localhost:8200 ; cam daemon address"""
+
+
+        SafeConfigParser.__init__(self)
+        baseConfig.__init__(self, configDef, originalDir)
+
+        if(os.path.exists(configFile)):
+          self.configFile = configFile
+        else:
+          raise IOError
+        
+        self._options = options
+        
+	## UNUSED CODE : checkPerms is never True
+  ## zim: this code is used if one instantiates config() with checkPerms set to
+  ## True.
+        if checkPerms: self.__check_perms()
+
+        self.read(configFile)
+
+        self._configDef = configDef
+        if not self._configDef:
+            self._mySections = self.sections()
+
+        self.__initialize_config_dict()
+
+    def __initialize_config_dict(self):
+        """ build a dictionary of config vars keyed by section name defined in
+           configDef, if options defined override config"""
+
+        for section in self._mySections:
+            items = self.items(section)
+            self._dict[section] = {}
+
+            # First fill self._dict with whatever is given in hodrc.
+            # Going by this, options given at the command line either override
+            # options in hodrc, or get appended to the list, like for
+            # hod.client-params. Note that after this dict has _only_ hodrc
+            # params
+            for keyValuePair in items:
+                # stupid commenting bug in ConfigParser class, lines without an
+                #  option value pair or section required that ; or # are at the
+                #  beginning of the line, :(
+                newValue = reCommentHack.sub("", keyValuePair[1])
+                newValue = reCommentNewline.sub("", newValue)
+                self._dict[section][keyValuePair[0]] = newValue
+            # end of filling with options given in hodrc
+            # now start filling in command line options
+            if self._options:    
+                for option in self._configDef[section].keys():
+                    if self._options[section].has_key(option):
+                        # the user has given an option
+                        compoundOpt = "%s.%s" %(section,option)
+                        if ( compoundOpt == \
+                              'gridservice-mapred.final-server-params' \
+                              or compoundOpt == \
+                                    'gridservice-hdfs.final-server-params' \
+                              or compoundOpt == \
+                                    'gridservice-mapred.server-params' \
+                              or compoundOpt == \
+                                    'gridservice-hdfs.server-params' \
+                              or compoundOpt == \
+                                    'hod.client-params' ):
+                 
+                           if ( compoundOpt == \
+                              'gridservice-mapred.final-server-params' \
+                              or compoundOpt == \
+                                    'gridservice-hdfs.final-server-params' ):
+                              overwrite = False
+                           else: overwrite = True
+
+                           # Append to the current list of values in self._dict
+                           if not self._dict[section].has_key(option):
+                             self._dict[section][option] = ""
+                           dictOpts = self._dict[section][option].split(",")
+                           dictOptsKeyVals = {}
+                           for opt in dictOpts:
+                              if opt != '':
+                                # when dict _has_ params from hodrc
+                                if reKeyVal.search(opt):
+                                  (key, val) = reKeyVal.split(opt,1)
+                                  # we only consider the first '=' for splitting
+                                  # we do this to support passing params like
+                                  # mapred.child.java.opts=-Djava.library.path=some_dir
+                                  dictOptsKeyVals[key] = val
+                                else: 
+                                  # this means an invalid option. Leaving it
+                                  #for config.verify to catch
+                                  dictOptsKeyVals[opt] = None
+                                
+                           cmdLineOpts = self._options[section][option].split(",")
+
+                           for opt in cmdLineOpts:
+                              if reKeyVal.search(opt):
+                                # Same as for hodrc options. only consider
+                                # the first =
+                                ( key, val ) = reKeyVal.split(opt,1)
+                              else:
+                                key = opt
+                                val = None
+                              # whatever is given at cmdline overrides
+                              # what is given in hodrc only for non-final params
+                              if dictOptsKeyVals.has_key(key):
+                                if overwrite:
+                                  dictOptsKeyVals[key] = val
+                              else: dictOptsKeyVals[key] = val
+                              
+                           self._dict[section][option] = ""
+                           for key in dictOptsKeyVals:
+                              if self._dict[section][option] == "":
+                                if dictOptsKeyVals[key]:
+                                  self._dict[section][option] = key + "=" + \
+                                    dictOptsKeyVals[key]
+                                else: #invalid option. let config.verify catch
+                                  self._dict[section][option] = key
+                              else:
+                                if dictOptsKeyVals[key]:
+                                  self._dict[section][option] = \
+                                    self._dict[section][option] + "," + key + \
+                                      "=" + dictOptsKeyVals[key]
+                                else:  #invalid option. let config.verify catch
+                                  self._dict[section][option] = \
+                                    self._dict[section][option] + "," + key
+
+                        else:
+                             # for rest of the options, that don't need
+                            # appending business.
+                            # options = cmdline opts + defaults
+                            # dict    = hodrc opts only
+                            # only non default opts can overwrite any opt
+                            # currently in dict
+                           if not self._dict[section].has_key(option):
+                              # options not mentioned in hodrc
+                              self._dict[section][option] = \
+                                               self._options[section][option]
+                           elif self._configDef[section][option]['default'] != \
+                                               self._options[section][option]:
+                              # option mentioned in hodrc but user has given a
+                              # non-default option
+                              self._dict[section][option] = \
+                                               self._options[section][option]
+
+    ## UNUSED METHOD
+    ## zim: is too :)
+    def __check_perms(self):
+        perms = None
+        if self._options:  
+            try:
+                perms = get_perms(self.configFile)
+            except OSError, data:
+                self._options.print_help()
+                raise Exception("*** could not find config file: %s" % data)
+                sys.exit(1)
+        else:
+            perms = get_perms(self.configFile)
+               
+        if perms != requiredPerms:
+            error = "*** '%s' has invalid permission: %s should be %s\n" % \
+                (self.configFile, perms, requiredPerms)
+            raise Exception( error)
+            sys.exit(1)
+
+class formatter(IndentedHelpFormatter):
+    def format_option_strings(self, option):
+        """Return a comma-separated list of option strings & metavariables."""
+        if option.takes_value():
+            metavar = option.metavar or option.dest.upper()
+            short_opts = [sopt
+                          for sopt in option._short_opts]
+            long_opts = [self._long_opt_fmt % (lopt, metavar)
+                         for lopt in option._long_opts]
+        else:
+            short_opts = option._short_opts
+            long_opts = option._long_opts
+
+        if self.short_first:
+            opts = short_opts + long_opts
+        else:
+            opts = long_opts + short_opts
+
+        return ", ".join(opts)    
+
+class options(OptionParser, baseConfig):
+    def __init__(self, optionDef, usage, version, originalDir=None, 
+                 withConfig=False, defaultConfig=None, defaultLocation=None):
+        """Constructs and options object.
+         
+           optionDef     - definition object
+           usage         - usage statement
+           version       - version string
+           withConfig    - used in conjunction with a configuration file
+           defaultConfig - default configuration file
+        
+        """
+        OptionParser.__init__(self, usage=usage)
+        baseConfig.__init__(self, optionDef, originalDir)
+        
+        self.formatter = formatter(4, max_help_position=100, width=180, 
+                                   short_first=1)
+        
+        self.__version = version
+        self.__withConfig = withConfig
+        self.__defaultConfig = defaultConfig
+        self.__defaultLoc = defaultLocation
+        self.args = []
+        self.__optionList = []
+        self.__compoundOpts = []
+        self.__shortMap = {}
+        self.__alphaString = 'abcdefghijklmnopqrstuvxyzABCDEFGHIJKLMNOPQRSTUVXYZ1234567890'
+        self.__alpha = []
+        self.__parsedOptions = {}
+        self.__reserved = [ 'h' ]
+        
+        self.__orig_grps = []
+        self.__orig_grp_lists = {}
+        self.__orig_option_list = []
+        
+        self.__display_grps = []
+        self.__display_grp_lists = {}
+        self.__display_option_list = [] 
+        
+        self.config = None
+        
+        if self.__withConfig:
+            self.__reserved.append('c')
+        self.__reserved.append('v')
+        
+        self.__gen_alpha()            
+
+        # build self.__optionList, so it contains all the options that are
+        # possible. the list elements are of the form section.option
+        for section in self._mySections:
+            if self.__withConfig and section == 'config':
+                raise Exception(
+                    "withConfig set 'config' cannot be used as a section name")
+            for option in self._configDef[section].keys():
+                if '.' in option:
+                    raise Exception("Options cannot contain: '.'")
+                elif self.__withConfig and option == 'config':
+                    raise Exception(
+                        "With config set, option config is not allowed.")
+                elif self.__withConfig and option == 'verbose-help':
+                    raise Exception(
+                        "With config set, option verbose-help is not allowed.")                 
+                self.__optionList.append(self.__splice_compound(section, 
+                                                                option))
+        self.__build_short_map()
+        self.__add_options()
+        self.__init_display_options() 
+        
+        (self.__parsedOptions, self.args) = self.parse_args()
+
+        if self.__withConfig:
+            self.config = self.__parsedOptions.config
+            if not self.config:
+                self.error("configuration file must be specified")
+            if not os.path.exists(self.config):
+                if self.__defaultLoc and not re.search("/", self.config):
+                    self.__parsedOptions.config = os.path.join(
+                        self.__defaultLoc, self.config)
+    
+        self.__build_dict()   
+
+    
+    def __init_display_options(self):
+        self.__orig_option_list = self.option_list[:]
+        optionListTitleMap = {}
+        for option in self.option_list:
+            optionListTitleMap[option._long_opts[0]] = option
+      
+        self.__orig_grps = self.option_groups[:]
+        for group in self.option_groups:
+            self.__orig_grp_lists[group.title] = group.option_list[:]
+                                    
+        groupTitleMap = {}
+        optionTitleMap = {}
+        for group in self.option_groups:
+            groupTitleMap[group.title] = group
+            optionTitleMap[group.title] = {}
+            for option in group.option_list:
+                (sectionName, optionName) = \
+                    self.__split_compound(option._long_opts[0])
+                optionTitleMap[group.title][optionName] = option
+          
+        for section in self._mySections:
+            for option in self._configDef[section]:
+                if self._configDef[section][option]['help']:
+                    if groupTitleMap.has_key(section):
+                        if not self.__display_grp_lists.has_key(section):
+                            self.__display_grp_lists[section] = []
+                        self.__display_grp_lists[section].append(
+                            optionTitleMap[section][option])
+                    
+                    try:    
+                        self.__display_option_list.append(
+                            optionListTitleMap["--" + self.__splice_compound(
+                            section, option)])
+                    except KeyError:
+                        pass
+        try:
+            self.__display_option_list.append(optionListTitleMap['--config'])
+        except KeyError:
+            pass
+          
+        self.__display_option_list.append(optionListTitleMap['--help'])
+        self.__display_option_list.append(optionListTitleMap['--verbose-help'])
+        self.__display_option_list.append(optionListTitleMap['--version'])
+                    
+        self.__display_grps = self.option_groups[:]             
+        for section in self._mySections:
+            if self.__display_grp_lists.has_key(section):
+                self.__orig_grp_lists[section] = \
+                    groupTitleMap[section].option_list
+            else:
+                try:
+                    self.__display_grps.remove(groupTitleMap[section])
+                except KeyError:
+                    pass
+                
+    def __gen_alpha(self):
+        assignedOptions = []
+        for section in self._configDef:
+            for option in self._configDef[section]:
+                if self._configDef[section][option]['short']:
+                    assignedOptions.append(
+                        self._configDef[section][option]['short'])
+        
+        for symbol in self.__alphaString:
+            if not symbol in assignedOptions:
+                self.__alpha.append(symbol)
+
+    def __splice_compound(self, section, option):
+        return "%s.%s" % (section, option)
+        
+    def __split_compound(self, compound):    
+        return compound.split('.')
+        
+    def __build_short_map(self):
+        """ build a short_map of parametername : short_option. This is done
+        only for those parameters that don't have short options already
+        defined in configDef.
+        If possible, the first letter in the option that is not already
+        used/reserved as a short option is allotted. Otherwise the first
+        letter in __alpha that isn't still used is allotted.
+        e.g. { 'hodring.java-home': 'T', 'resource_manager.batch-home': 'B' }
+        """
+
+        optionsKey = {}
+        for compound in self.__optionList:
+            (section, option) = self.__split_compound(compound)
+            if not optionsKey.has_key(section):
+                optionsKey[section] = []
+            optionsKey[section].append(option)
+        
+        for section in self._configDef.sections():
+            options = optionsKey[section]
+            options.sort()
+            for option in options:
+                if not self._configDef[section][option]['short']:
+                    compound = self.__splice_compound(section, option)
+                    shortOptions = self.__shortMap.values()
+                    for i in range(0, len(option)):
+                        letter = option[i]
+                        letter = letter.lower()
+                        if letter in self.__alpha:
+                            if not letter in shortOptions and \
+                                not letter in self.__reserved:
+                                self.__shortMap[compound] = letter
+                                break
+                    if not self.__shortMap.has_key(compound):
+                        for i in range(0, len(self.__alpha)):
+                            letter = self.__alpha[i]
+                            if not letter in shortOptions and \
+                                not letter in self.__reserved:
+                                self.__shortMap[compound] = letter
+
+    def __add_option(self, config, compoundOpt, section, option, group=None):
+        addMethod = self.add_option
+        if group: addMethod=group.add_option
+        
+        self.__compoundOpts.append(compoundOpt)
+        
+        if compoundOpt == 'gridservice-mapred.final-server-params' or \
+           compoundOpt == 'gridservice-hdfs.final-server-params' or \
+           compoundOpt == 'gridservice-mapred.server-params' or \
+           compoundOpt == 'gridservice-hdfs.server-params' or \
+           compoundOpt == 'hod.client-params':
+          _action = 'append'
+        elif config[section][option]['type'] == 'bool':
+          _action = 'store_true'
+        else:
+          _action = 'store'
+
+        if self.__shortMap.has_key(compoundOpt):
+          addMethod("-" + self.__shortMap[compoundOpt],
+                          "--" + compoundOpt, dest=compoundOpt, 
+                          action= _action, 
+                          metavar=config[section][option]['type'],
+                          default=config[section][option]['default'],
+                          help=config[section][option]['desc'])
+        else:
+          if config[section][option]['short']:
+            addMethod("-" + config[section][option]['short'], 
+                              "--" + compoundOpt, dest=compoundOpt, 
+                              action= _action,
+                              metavar=config[section][option]['type'],
+                              default=config[section][option]['default'],
+                              help=config[section][option]['desc'])   
+          else:
+            addMethod('', "--" + compoundOpt, dest=compoundOpt, 
+                              action= _action, 
+                              metavar=config[section][option]['type'],
+                              default=config[section][option]['default'],
+                              help=config[section][option]['desc'])   
+                           
+    def __add_options(self):
+        if self.__withConfig:
+            self.add_option("-c", "--config", dest='config', 
+                action='store', default=self.__defaultConfig, 
+                metavar='config_file',
+                help="Full path to configuration file.")
+
+        self.add_option("", "--verbose-help", 
+            action='help', default=None, 
+            metavar='flag',
+            help="Display verbose help information.")
+        
+        self.add_option("-v", "--version", 
+            action='version', default=None, 
+            metavar='flag',
+            help="Display version information.")
+        
+        self.version = self.__version
+  
+        if len(self._mySections) > 1:
+            for section in self._mySections:
+                group = OptionGroup(self, section)
+                for option in self._configDef[section]:
+                    compoundOpt = self.__splice_compound(section, option)
+                    self.__add_option(self._configDef, compoundOpt, section, 
+                                      option, group)
+                self.add_option_group(group)
+        else:
+            for section in self._mySections:
+                for option in self._configDef[section]:
+                    compoundOpt = self.__splice_compound(section, option)
+                    self.__add_option(self._configDef, compoundOpt, section, 
+                                      option)
+                    
+    def __build_dict(self):
+        if self.__withConfig:
+            self._dict['config'] = str(getattr(self.__parsedOptions, 'config'))
+        for compoundOption in dir(self.__parsedOptions):
+            if compoundOption in self.__compoundOpts:
+                (section, option) = self.__split_compound(compoundOption)
+                if not self._dict.has_key(section):
+                    self._dict[section] = {}
+                
+                if getattr(self.__parsedOptions, compoundOption):
+                    _attr = getattr(self.__parsedOptions, compoundOption)
+                    # when we have multi-valued parameters passed separately
+                    # from command line, python optparser pushes them into a
+                    # list. So converting all such lists to strings
+                    if type(_attr) == type([]):
+                      import string
+                      _attr = string.join(_attr,',')
+                    self._dict[section][option] = _attr
+                    
+        for section in self._configDef:
+            for option in self._configDef[section]: 
+                if self._configDef[section][option]['type'] == 'bool':
+                    compoundOption = self.__splice_compound(section, option)
+                    if not self._dict.has_key(section):
+                        self._dict[section] = {}
+                    
+                    if option not in self._dict[section]:
+                        self._dict[section][option] = False
+ 
+    def __set_display_groups(self):
+        if not '--verbose-help' in sys.argv:
+            self.option_groups = self.__display_grps
+            self.option_list = self.__display_option_list
+            for group in self.option_groups:
+                group.option_list = self.__display_grp_lists[group.title]
+ 
+    def __unset_display_groups(self):
+        if not '--verbose-help' in sys.argv:
+            self.option_groups = self.__orig_grps
+            self.option_list = self.__orig_option_list
+            for group in self.option_groups:
+                group.option_list = self.__orig_grp_lists[group.title]      
+ 
+    def print_help(self, file=None):
+        self.__set_display_groups()
+        OptionParser.print_help(self, file)
+        self.__unset_display_groups()
+                        
+    def verify(self):
+        return baseConfig.verify(self)

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/setup.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,622 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+# Various socket server and helper classes.
+#
+# Christopher Zimmerman - zim@yahoo-inc.com - 03/07/2007
+#
+import os, sys, socket, threading, pprint, re, xmlrpclib, time
+  
+from select import select
+from SocketServer import ThreadingMixIn, ForkingMixIn
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from random import Random
+from urlparse import urlparse
+
+Fault = xmlrpclib.Fault
+
+from hodlib.Common.util import local_fqdn
+from hodlib.Common.logger import hodDummyLogger
+
+class hodHTTPHandler(BaseHTTPRequestHandler):
+  port = -1
+
+  def __init__(self, request, client_address, server, registerService):
+    self.registerService = registerService
+    BaseHTTPRequestHandler.__init__(self, request, client_address, server)
+  
+  def log_message(self, *args):
+    """Forget logging for now."""
+    
+    pass
+      
+  def do_GET(self):
+    self.fullUrl = "http://%s:%s%s" % (self.server.server_address[0],
+                                       self.server.server_address[1], 
+                                       self.path)
+    
+    parsedUrl = urlparse(self.fullUrl)
+    self.writeHeaders()
+    self.writeData(parsedUrl)
+  
+  def w(self, string):
+    self.wfile.write("%s\n" % string)
+  
+  def writeHeaders(self):
+   self.send_response(200, 'OK')
+   self.send_header('Content-type', 'text/html')
+   self.end_headers()   
+     
+  def sendWrongPage(self, userJob):
+    self.w('<font class="alert">')
+    if userJob == False:
+      self.w('invalid URL specified')   
+    elif re.match("^\d+$", userJob):
+      self.w('invalid URL specified, job <b>%s</b> does not exist' % userJob)
+    elif re.match("^\w+$", userJob):
+      self.w('invalid URL specified, user <b>%s</b> does not exist' % userJob) 
+    self.w('</font>')
+    
+  def getServiceHosts(self, serviceInfo):
+    hostInfo = { 'long' : {}, 'short' : {} }
+    for user in serviceInfo:
+      for job in serviceInfo[user]:
+        for host in serviceInfo[user][job]:
+          for serviceItem in serviceInfo[user][job][host]:
+            serviceName = serviceItem.keys()
+            serviceName = serviceName[0]
+            if isinstance(serviceItem[serviceName], str):
+              hostInfo['short'][self.getJobKey(user, job, host)] = True
+            hostInfo['long'][self.getJobKey(user, job, host)] = True
+    
+    return hostInfo
+
+  def getJobInfo(self, job, serviceInfo):
+    jobInfo = {}
+    
+    for user in serviceInfo.keys():
+      for someJob in serviceInfo[user].keys():
+        if job == someJob:
+          jobInfo[user] = { job : serviceInfo[user][job] }
+    
+    return jobInfo
+  
+  def getJobKey(self, user, job, host):
+    return "%s-%s-%s" % (user, job, host)
+  
+  def writeData(self, parsedUrl):
+    options = parsedUrl[4]
+    serviceInfo = self.server.service.getServiceInfo()
+    users = serviceInfo.keys()
+    users.sort()
+
+    self.w("<html>")
+    self.w("<body>")
+    self.w("<head>")
+    self.writeCSS()
+    self.w("</head>")
+    self.w('<font class="header2">HOD Service Registry Information</font>')
+    if serviceInfo == {}:
+      self.w('<br><br><font class="header">&nbsp;&nbsp;No HOD clusters configured.</font>')
+    else:
+      if parsedUrl[2] == '/':
+        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+        count = 0
+        for user in users:
+          self.writeUserData(user, options, serviceInfo, count)
+          count = count + 1
+      elif parsedUrl[2][1:] in serviceInfo:
+        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+        self.writeUserData(parsedUrl[2][1:], options, serviceInfo, 0)
+      elif re.match("^\d+$", parsedUrl[2][1:]):
+        jobInfo = self.getJobInfo(parsedUrl[2][1:], serviceInfo)
+        if jobInfo.keys():
+          self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+          for user in jobInfo.keys():
+            self.writeUserData(user, options, jobInfo, 0)   
+        else:
+          self.sendWrongPage(parsedUrl[2][1:]) 
+          self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+          count = 0
+          for user in users:
+            self.writeUserData(user, options, serviceInfo, count)
+            count = count + 1
+      elif re.match("^\w+$", parsedUrl[2][1:]):
+        self.sendWrongPage(parsedUrl[2][1:]) 
+        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+        count = 0
+        for user in users:
+          self.writeUserData(user, options, serviceInfo, count)
+          count = count + 1        
+      else:
+        self.sendWrongPage(False) 
+        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
+        count = 0
+        for user in users:
+          self.writeUserData(user, options, serviceInfo, count)
+          count = count + 1
+
+    self.w('</table>')
+    self.w("</pre>")
+    self.w("</body>")
+    self.w("</html>")
+
+  def writeCSS(self):
+    self.w('<style type="text/css">')
+    
+    self.w('table.main { border: 0px; padding: 1; background-color: #E1ECE0; width: 70%; margin: 10; }')
+    self.w('table.sub1 { background-color: #F1F1F1; padding: 0; }')
+    self.w('table.sub2 { background-color: #FFFFFF; padding: 0; }')
+    self.w('table.sub3 { border: 1px solid #EEEEEE; background-color: #FFFFFF; padding: 0; }')
+    self.w('td.header { border-bottom: 1px solid #CCCCCC; padding: 2;}')
+    self.w('td.service1 { border: 0px; background-color: #FFFFFF; padding: 2; width: 10%}')
+    self.w('td.service2 { border: 0px; background-color: #FFFFFF; padding: 2; width: 90%}')
+    self.w('td { vertical-align: top; padding: 0; }')
+    self.w('td.noborder { border-style: none; border-collapse: collapse; }')
+    self.w('tr.colored { background-color: #F1F1F1; }')
+    self.w('font { font-family: Helvetica, Arial, sans-serif; font-size: 10pt; color: #666666; }')
+    self.w('font.header { font-family: Helvetica, Arial, sans-serif;  font-size: 10pt; color: #333333; font-style: bold }')
+    self.w('font.header2 { font-family: Helvetica, Arial, sans-serif; font-size: 16pt; color: #333333; }')
+    self.w('font.sml { font-family: Helvetica, Arial, sans-serif; font-size: 8pt; color: #666666; }')
+    self.w('font.alert { font-family: Helvetica, Arial, sans-serif; font-size: 9pt; color: #FF7A22; }')
+    self.w('a { font-family: Helvetica, Arial, sans-serif; text-decoration:none; font-size: 10pt; color: #111111; }')
+    self.w('a:visited { font-family: Helvetica, Arial, sans-serif; color:#2D4628; text-decoration:none; font-size: 10pt; }')
+    self.w('a:hover { font-family: Helvetica, Arial, sans-serif; color:#00A033; text-decoration:none; font-size: 10pt; }')
+    self.w('a.small { font-family:  Helvetica, Arial, sans-serif; text-decoration:none; font-size: 8pt }')
+    self.w('a.small:hover { color:#822499; text-decoration:none; font-size: 8pt }')
+
+    self.w("</style>")
+
+  def writeUserData(self, user, options, serviceInfo, count):
+    hostInfo = self.getServiceHosts(serviceInfo)
+    hostKey = 'short'
+    if options == 'display=long':
+      hostKey = 'long'
+
+    if count == 0:
+      self.w('<tr>')
+      self.w('<td class="header" colspan="2">')
+      self.w('<font class="header">Active Users</font>')
+      self.w('</td>')
+      self.w('</tr>')
+    self.w('<tr>')
+    self.w('<td><font>%s</font></td>' % user)
+    self.w('<td>')
+    jobIDs = serviceInfo[user].keys()
+    jobIDs.sort()
+    for jobID in jobIDs: 
+      self.w('<table class="sub1" width="100%">')
+      if count == 0:
+        self.w('<tr>')
+        self.w('<td class="header" colspan="2">')
+        self.w('<font class="header">PBS Job Identifiers</font>')
+        self.w('</td>')
+        self.w('</tr>')        
+      self.w('<tr>')
+      self.w('<td><font>%s</font></td>' % jobID)
+      self.w('<td>')
+      hosts = serviceInfo[user][jobID].keys()
+      hosts.sort()
+      for host in hosts:
+        if hostInfo[hostKey].has_key(self.getJobKey(user, jobID, host)):
+          self.w('<table class="sub2" width="100%">')
+          if count == 0:
+            self.w('<tr>')
+            self.w('<td class="header" colspan="2">')
+            self.w('<font class="header">Hosts Running Services</font>')
+            self.w('</td>')
+            self.w('</tr>')  
+          self.w('<tr>')
+          self.w('<td><font>%s</font></td>' % host)
+          self.w('<td>')
+          self.w('<table class="sub3" width="100%">')
+          self.w('<tr>')
+          self.w('<td colspan="2">')
+          self.w('<font class="header">Service Information</font>')
+          self.w('</td>')
+          self.w('</tr>')  
+          for serviceItem in serviceInfo[user][jobID][host]:
+            serviceName = serviceItem.keys()
+            serviceName = serviceName[0]
+            if isinstance(serviceItem[serviceName], dict) and \
+              options == 'display=long':
+              self.w('<tr class="colored">')
+              self.w('<td><font>%s</font></td>' % serviceName)
+              self.w('<td>')
+              self.w('<table width="100%">')
+              for key in serviceItem[serviceName]:
+                self.w('<tr>')
+                self.w('<td class="service1"><font>%s</font></td>' % key)
+                self.w('<td class="service2"><font>%s</font></td>' % serviceItem[serviceName][key])
+                self.w('</tr>')
+              self.w('</table>')
+              self.w('</td>')
+              self.w('</tr>')
+            elif isinstance(serviceItem[serviceName], str):
+              self.w('<tr class="colored">')
+              self.w('<td><font class="service1">%s</font></td>' % serviceName)
+              self.w('<td>')
+              (host, port) = serviceItem[serviceName].split(':')
+              hostnameInfo = socket.gethostbyname_ex(host)
+              if serviceName.startswith('mapred'):
+                self.w('<a href="http://%s:%s">Hadoop Job Tracker</a>' % (hostnameInfo[0], port))
+              elif serviceName.startswith('hdfs'):
+                self.w('<a href="http://%s:%s">HDFS Name Node</a>&nbsp' % (hostnameInfo[0], port))
+              else:
+                self.w('<font class="service2">%s</font>' % serviceItem[serviceName])
+              self.w('</td>')
+              self.w('</tr>')
+          self.w('</table>')    
+          self.w('</td>')
+          self.w('</tr>')
+          self.w('</table>')
+          count = count + 1
+      self.w('</td>')  
+      self.w('</tr>')
+      self.w('</table>')
+      count = count + 1
+    self.w('</td>')
+    self.w('</tr>')
+#    self.w("<pre>")
+#    self.w(pprint.pformat(serviceInfo))
+#    self.w("</pre>")
+    
+class baseSocketServer:
+    def __init__(self, host, ports):
+        self.host = host
+        self.ports = ports
+        self.__stopForever = threading.Event()
+        self.__stopForever.clear()
+        self.__run = threading.Event()
+        self.__run.set()    
+        self.server_address = ()
+        self.mThread = None
+        
+    def server_bind(self):
+        """server_bind() method binds to a random range of ports."""
+
+        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+        if len(self.ports) > 1:
+            randomPort = Random(os.getpid())
+            portSequence = range(self.ports[0], self.ports[1])
+
+            maxTryCount = abs(self.ports[0] - self.ports[1])
+            tryCount = 0
+            while True:
+                somePort = randomPort.choice(portSequence)
+                self.server_address = (self.host, somePort)
+                try:
+                    self.socket.bind(self.server_address)
+                except socket.gaierror, errData:
+                    raise socket.gaierror, errData
+                except:
+                    tryCount = tryCount + 1
+                    if tryCount > maxTryCount:
+                        bindError = "bind failure for port range %s:%d" % (
+                            self.ports)
+
+                        raise socket.error, bindError
+                else:
+                    break
+        else:
+            self.server_address = (self.host, int(self.ports[0]))
+            self.socket.bind(self.server_address)
+        
+        if self.host == '':
+            self.server_address = (local_fqdn(), self.server_address[1])
+
+    def _serve_forever(self):
+        """Replacement for serve_forever loop.
+        
+           All baseSocketServers run within a master thread; that thread
+           imitates serve_forever, but checks an event (self.__stopForever) 
+           before processing new connections.
+        """
+        
+        while not self.__stopForever.isSet():
+            (rlist, wlist, xlist) = select([self.socket], [], [], 
+                                           1)
+            
+            if (len(rlist) > 0 and self.socket == rlist[0]):
+                self.handle_request()
+        
+            while not self.__run.isSet():
+                if self.__stopForever.isSet():
+                    break
+                time.sleep(1)
+        
+        self.server_close()
+        
+        return True
+
+    def serve_forever(self):
+        """Handle requests until stopForever event flag indicates stop."""
+
+        self.mThread = threading.Thread(name="baseSocketServer", 
+                                        target=self._serve_forever)
+        self.mThread.start()
+
+        return self.mThread
+
+    def pause(self):
+        """Temporarily stop servicing requests."""
+
+        self.__run.clear()
+
+    def cont(self):
+        """Resume servicing requests."""
+
+        self.__run.set()
+
+    def stop(self):
+        """Set the stopForever flag to tell serve_forever() to exit."""
+    
+        self.__stopForever.set()
+        if self.mThread: self.mThread.join()
+        return True
+
+    def is_alive(self):
+        if self.mThread != None:
+            return self.mThread.isAlive()
+        else:
+            return False
+
+class threadedHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):
+    def __init__(self, host, ports):
+        baseSocketServer.__init__(self, host, ports)
+        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)
+
+class forkingHTTPServer(baseSocketServer, ForkingMixIn, HTTPServer):
+    def __init__(self, host, ports):
+        baseSocketServer.__init__(self, host, ports)
+        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)
+
+class hodHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):
+    service = None 
+    def __init__(self, host, ports, serviceobj = None):
+        self.service = serviceobj
+        baseSocketServer.__init__(self, host, ports)
+        HTTPServer.__init__(self, self.server_address, hodHTTPHandler)
+
+    def finish_request(self, request, client_address):
+        self.RequestHandlerClass(request, client_address, self, self.service)
+        
+class hodXMLRPCServer(baseSocketServer, ThreadingMixIn, SimpleXMLRPCServer):
+    def __init__(self, host, ports, 
+                 requestHandler=SimpleXMLRPCRequestHandler, 
+                 logRequests=False, allow_none=False, encoding=None):
+        baseSocketServer.__init__(self, host, ports)
+        SimpleXMLRPCServer.__init__(self, self.server_address, requestHandler, 
+                                    logRequests)
+        
+        self.register_function(self.stop, 'stop')
+
+try:
+    from twisted.web import server, xmlrpc
+    from twisted.internet import reactor, defer
+    from twisted.internet.threads import deferToThread
+    from twisted.python import log
+                
+    class twistedXMLRPC(xmlrpc.XMLRPC):
+        def __init__(self, logger):
+            xmlrpc.XMLRPC.__init__(self)
+            
+            self.__XRMethods = {}
+            self.__numRequests = 0
+            self.__logger = logger
+            self.__pause = False
+    
+        def render(self, request):
+            request.content.seek(0, 0)
+            args, functionPath = xmlrpclib.loads(request.content.read())
+            try:
+                function = self._getFunction(functionPath)
+            except Fault, f:
+                self._cbRender(f, request)
+            else:
+                request.setHeader("content-type", "text/xml")
+                defer.maybeDeferred(function, *args).addErrback(
+                    self._ebRender).addCallback(self._cbRender, request)
+            
+            return server.NOT_DONE_YET
+    
+        def _cbRender(self, result, request):
+            if isinstance(result, xmlrpc.Handler):
+                result = result.result
+            if not isinstance(result, Fault):
+                result = (result,)
+            try:
+                s = xmlrpclib.dumps(result, methodresponse=1)
+            except:
+                f = Fault(self.FAILURE, "can't serialize output")
+                s = xmlrpclib.dumps(f, methodresponse=1)
+            request.setHeader("content-length", str(len(s)))
+            request.write(s)
+            request.finish()
+     
+        def _ebRender(self, failure):
+            if isinstance(failure.value, Fault):
+                return failure.value
+            log.err(failure)
+            return Fault(self.FAILURE, "error")
+        
+        def _getFunction(self, methodName):
+            while self.__pause:
+                time.sleep(1)
+            
+            self.__numRequests = self.__numRequests + 1
+            function = None
+            try:
+                def defer_function(*args):
+                    return deferToThread(self.__XRMethods[methodName], 
+                                         *args)
+                function = defer_function
+                self.__logger.info(
+                    "[%s] processing defered XML-RPC call to: %s ..." % 
+                    (self.__numRequests, methodName))            
+            except KeyError:
+                self.__logger.warn(
+                    "[%s] fault %s on XML-RPC call to %s, method not found." % (
+                    self.__numRequests, self.NOT_FOUND, methodName))
+                raise xmlrpc.NoSuchFunction(self.NOT_FOUND, 
+                                            "method %s not found" % methodName)
+            
+            return function
+        
+        def register_function(self, functionRef, methodName):
+            self.__XRMethods[methodName] = functionRef
+            
+        def list_methods(self):
+            return self.__XRMethods.keys()
+        
+        def num_requests(self):
+            return self.__numRequests
+        
+        def pause(self):
+            self.__pause = True
+        
+        def cont(self):
+            self.__pause = False
+            
+    class twistedXMLRPCServer:
+        def __init__(self, host, ports, logger=None, threadPoolSize=100):
+            self.__host = host
+            self.__ports = ports
+            
+            if logger == None:
+                logger = hodDummyLogger()
+            
+            self.__logger = logger
+                
+            self.server_address = ['', '']
+            reactor.suggestThreadPoolSize(threadPoolSize)    
+    
+            self.__stopForever = threading.Event()
+            self.__stopForever.clear()
+            self.__mThread = None
+                
+            self.__xmlrpc = twistedXMLRPC(self.__logger)
+                
+        def _serve_forever(self):
+            if len(self.__ports) > 1:
+                randomPort = Random(os.getpid())
+                portSequence = range(self.__ports[0], self.__ports[1])
+    
+                maxTryCount = abs(self.__ports[0] - self.__ports[1])
+                tryCount = 0
+                while True:
+                    somePort = randomPort.choice(portSequence)
+                    self.server_address = (self.__host, int(somePort))
+                    if self.__host == '':
+                        self.server_address = (local_fqdn(), self.server_address[1])
+                    try:
+                        reactor.listenTCP(int(somePort), server.Site(
+                            self.__xmlrpc), interface=self.__host)
+                        reactor.run(installSignalHandlers=0)
+                    except:
+                        self.__logger.debug("Failed to bind to: %s:%s." % (
+                            self.__host, somePort))
+                        tryCount = tryCount + 1
+                        if tryCount > maxTryCount:
+                            self.__logger.warn("Failed to bind to: %s:%s" % (
+                                self.__host, self.__ports))
+                            sys.exit(1)
+                    else:
+                        break
+            else:
+                try:
+                    self.server_address = (self.__host, int(self.__ports[0]))
+                    if self.__host == '':
+                        self.server_address = (local_fqdn(), self.server_address[1])
+                    reactor.listenTCP(int(self.__ports[0]), server.Site(self.__xmlrpc), 
+                                      interface=self.__host)
+                    reactor.run(installSignalHandlers=0)
+                except:
+                    self.__logger.warn("Failed to bind to: %s:%s."% (
+                            self.__host, self.__ports[0]))
+                    sys.exit(1)
+            
+        def serve_forever(self):
+            """Handle requests until stopForever event flag indicates stop."""
+    
+            self.__mThread = threading.Thread(name="XRServer",
+                                              target=self._serve_forever)
+            self.__mThread.start()
+            
+            if not self.__mThread.isAlive():
+                raise Exception("Twisted XMLRPC server thread dead.")
+                    
+        def register_function(self, functionRef, methodName):
+            self.__xmlrpc.register_function(functionRef, methodName)
+        
+        def register_introspection_functions(self):
+            pass
+        
+        def register_instance(self, instance):
+            for method in dir(instance):
+                if not method.startswith('_'):
+                    self.register_function(getattr(instance, method), method)
+        
+        def pause(self):
+            self.__xmlrpc.pause()
+        
+        def cont(self):
+            self.__xmlrpc.cont()
+        
+        def stop(self):
+            def stop_thread():
+                time.sleep(2)
+                reactor.stop()
+                
+            self.__stopForever.set()
+            
+            stopThread = threading.Thread(name='XRStop', target=stop_thread)
+            stopThread.start()
+                
+            return True
+            
+        def is_alive(self):
+            status = False
+            if reactor.running == 1:
+                status = True
+            
+            return status
+        
+        def status(self):
+            """Return status information on running XMLRPC Server."""
+            stat = { 'XR server address'     : self.server_address,
+                     'XR methods'            : self.system_listMethods(),
+                     'XR server alive'       : self.is_alive(),
+                     'XR requests processed' : self.__xmlrpc.num_requests(),
+                     'XR server stop flag'   : self.__stopForever.isSet()}
+            return(stat)
+        
+        def system_listMethods(self):
+            return self.__xmlrpc.list_methods()
+        
+        def get_server_address(self):
+            waitCount = 0
+            while self.server_address == '':
+                if waitCount == 9:
+                    break 
+                time.sleep(1)
+                waitCount = waitCount + 1
+                
+            return self.server_address
+except ImportError:
+    pass

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/socketServers.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,177 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+# $Id:tcp.py 6172 2007-05-22 20:26:54Z zim $
+#
+# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
+#------------------------------------------------------------------------------
+
+""" TCP related classes. """
+
+import socket, re, string
+reAddress    = re.compile(":")
+reMayBeIp = re.compile("^\d+\.\d+\.\d+\.\d+$")
+reValidPort = re.compile("^\d+$")
+
+class Error(Exception):
+    def __init__(self, msg=''):
+        self.message = msg
+        Exception.__init__(self, msg)
+
+    def __repr__(self):
+        return self.message
+
+class tcpError(Error):
+    def __init__(self, message):
+        Error.__init__(self, message)
+
+class tcpSocket:
+    def __init__(self, address, timeout=30, autoflush=0):
+        """Constructs a tcpSocket object.
+
+           address - standard tcp address (HOST:PORT)
+           timeout - socket timeout"""
+
+        self.address = address
+        self.__autoFlush = autoflush
+        self.__remoteSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.__remoteSock.settimeout(timeout)
+        self.host = None
+        self.port = None
+        splitAddress = address
+        if isinstance(address, (tuple, list)):
+            self.host = address[0]
+            self.port = int(address[1])
+        else:
+            splitAddress = get_address_tuple(address)
+            if not splitAddress[0]:
+                self.host = 'localhost'
+            else:
+                self.host = splitAddress[0]
+
+            self.port = int(splitAddress[1])
+
+        self.__fileObjectOut = ''
+        self.__fileObjectIn = ''
+
+    def __repr__(self):
+        return self.address
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        sockLine = self.read()
+        if not sockLine:
+            raise StopIteration
+
+        return sockLine
+
+    def open(self):
+        """Attempts to open a socket to the specified address."""
+
+        socketAddress = (self.host, self.port)
+
+        try:
+            self.__remoteSock.connect(socketAddress)
+            if self.__autoFlush:
+                self.__fileObjectOut = self.__remoteSock.makefile('wb', 0)
+            else:
+                self.__fileObjectOut = self.__remoteSock.makefile('wb')
+
+            self.__fileObjectIn  = self.__remoteSock.makefile('rb', 0)
+        except:
+            raise tcpError, "connection failure: %s" % self.address
+
+    def flush(self):
+        """Flushes write buffer."""
+        self.__fileObjectOut.flush()
+
+    def close(self):
+        """Attempts to close and open socket connection"""
+
+        try:
+            self.__remoteSock.close()
+            self.__fileObjectOut.close()
+            self.__fileObjectIn.close()
+        except socket.error, exceptionObject:
+            exceptionMessage = "close failure %s %s" % (self.address,
+                exceptionObject.__str__())
+            raise tcpError, exceptionMessage
+
+    def verify(self):
+        """Verifies that a given IP address/host and port are valid. This
+           method will not attempt to open a socket to the specified address.
+        """
+
+        isValidAddress = False
+        if reMayBeIp.match(self.host):
+            if check_ip_address(self.host):
+                if reValidPort.match(str(self.port)):
+                    isValidAddress = True
+        else:
+            if reValidPort.match(str(self.port)):
+                isValidAddress = True
+
+        return(isValidAddress)
+
+    def read(self):
+        """Reads a line off of the active socket."""
+
+        return self.__fileObjectIn.readline()
+
+    def write(self, string):
+        """Writes a string to the active socket."""
+
+        print >> self.__fileObjectOut, string
+
+def check_net_address(address):
+    valid = True
+    pieces = string.split(address, '.')
+    if len(pieces) != 4:
+        valid = False
+    else:
+        for piece in pieces:
+            if int(piece) < 0 or int(piece) > 255:
+                valid = False
+
+    return valid
+
+def check_ip_address(address):
+    valid = True
+    pieces = string.split(address, '.')
+    if len(pieces) != 4:
+        valid = False
+    else:
+        if int(pieces[0]) < 1 or int(pieces[0]) > 254:
+            valid = False
+        for i in range(1,4):
+            if int(pieces[i]) < 0 or int(pieces[i]) > 255:
+                valid = False
+
+    return valid
+
+def get_address_tuple(address):
+    """ Returns an address tuple for TCP address.
+
+        address - TCP address of the form host:port
+
+        returns address tuple (host, port)
+    """
+
+    addressList = reAddress.split(address)
+    addressTuple = (addressList[0], int(addressList[1]))
+
+    return addressTuple

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/tcp.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py?rev=608950&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py (added)
+++ lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py Fri Jan  4 10:20:17 2008
@@ -0,0 +1,382 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+import threading, time, os, sys, pprint
+
+from popen2 import Popen4, Popen3, MAXFD
+from signal import SIGTERM, SIGKILL
+
+class baseThread(threading.Thread):
+    """Base CAM threading class.  The run method should be overridden."""
+
+    def __init__(self, name):
+        threading.Thread.__init__(self, name=name)
+        self.stopFlag = threading.Event()
+        self.stopFlag.clear()
+        self.running = threading.Event()
+        self.running.set()
+        self.isFinished = threading.Event()
+        self.isFinished.clear()
+
+    def join(self, timeout=None):
+        self.stopFlag.set()
+        threading.Thread.join(self, timeout)
+
+    def pause(self):
+        """Pause thread."""
+
+        self.running.clear()
+
+    def cont(self):
+        """Resume thread operation."""
+
+        self.running.set()
+
+class simpleCommand(baseThread):
+    """Command execution object.  Command output and exit status are captured.
+
+       Public class attributes:
+
+       cmdString    - command to be executed
+       outputBuffer - command output, stdout + stderr
+       status       - exit status, as returned by wait
+       
+       stdin        - standard input for command
+       stdout       - standard output of command when buffer == False
+       stderr       - standard error of command when mode == 3 and buffer == False
+       
+       """
+
+    def __init__(self, name, cmdString, env=os.environ, mode=4, buffer=True, 
+                 wait=True, chdir=None):
+        """Class initialization.
+
+           name        - thread name to use when running the command
+           cmdString   - command string to execute
+           inputString - string to print to command's stdin
+           env         - shell environment dictionary
+           mode        - 3 for popen3 and 4 for popen4
+           buffer      - out put to be retrieved with output() method
+           wait        - return immediately after start() is called and output 
+                         command results as they come to stdout"""
+
+        baseThread.__init__(self, name=name)
+
+        self.cmdString = cmdString
+        self.__mode = mode
+        self.__buffer = buffer
+        self.__wait = wait
+        self.__chdir = chdir
+        self.__outputBuffer = []
+        self.__status = None
+        self.__pid = None
+        self.__isFinished = threading.Event()
+        self.__isFinished.clear()
+        
+        self.stdin = None
+        self.stdout = None
+        self.stderr = None
+
+        self.__env = env
+    
+    def run(self):
+        """ Overridden run method.  Most of the work happens here.  start()
+            should be called in place of this method."""
+            
+        oldDir = None
+        if self.__chdir:
+            if os.path.exists(self.__chdir):
+                oldDir = os.getcwd()  
+                os.chdir(self.__chdir)
+            else:
+                raise Exception(
+                    "simpleCommand: invalid chdir specified: %s" % 
+                    self.__chdir)
+            
+        cmd = None
+        if self.__mode == 3:
+            cmd = _Popen3Env(self.cmdString, env=self.__env)
+        else:
+            cmd = _Popen4Env(self.cmdString, env=self.__env)
+        self.__pid = cmd.pid
+
+        self.stdin = cmd.tochild
+        
+        if self.__mode == 3:
+            self.stderr = cmd.childerr
+
+        while cmd.fromchild == None:
+            time.sleep(1)
+        
+        if self.__buffer == True:
+            output = cmd.fromchild.readline()
+            while output != '':
+                while not self.running.isSet():
+                    if self.stopFlag.isSet():
+                        break
+                    time.sleep(1)
+                self.__outputBuffer.append(output)
+                output = cmd.fromchild.readline()
+
+        elif self.__wait == False:
+            for output in cmd.fromchild.readlines():
+                while not self.running.isSet():
+                    if self.stopFlag.isSet():
+                        break
+                    time.sleep(1)
+                
+                print output,
+        else:
+            self.stdout = cmd.fromchild
+
+        self.__status = cmd.poll()
+        while self.__status == -1:
+            while not self.running.isSet():
+                if self.stopFlag.isSet():
+                    break
+                time.sleep(1)
+
+            self.__status = cmd.poll()
+            time.sleep(1)
+
+        if oldDir:
+            os.chdir(oldDir)
+
+        self.__isFinished.set()
+        
+        sys.exit(0)
+
+    def output(self):
+        return self.__outputBuffer[:]
+
+    def wait(self):
+        """Wait blocking until command execution completes."""
+
+        self.__isFinished.wait()
+
+        return os.WEXITSTATUS(self.__status)
+
+    def is_running(self):
+        """Returns boolean, are we running?"""
+        
+        status = True
+        if self.__isFinished.isSet():
+            status = False
+            
+        return status 
+
+    def exit_code(self):
+        """ Returns process exit code."""
+        
+        if self.__status != None:
+            return os.WEXITSTATUS(self.__status)
+        else:
+            return None
+        
+    def exit_status_string(self):
+        """Return a string representation of the command's exit status."""
+
+        statusString = None
+        if self.__status:
+            exitStatus = os.WEXITSTATUS(self.__status)
+            exitSignal = os.WIFSIGNALED(self.__status)
+            coreDump   = os.WCOREDUMP(self.__status)
+
+            statusString = "exit code: %s | signal: %s | core %s" % \
+                (exitStatus, exitSignal, coreDump)
+
+        return(statusString)
+
+    def stop(self):
+        """Stop the running command and join it's execution thread."""
+
+        self.join()
+
+    def kill(self):
+        count = 0
+        while self.is_running():
+          try:
+            if count > 20:
+              os.kill(self.__pid, SIGKILL)
+              break
+            else:  
+              os.kill(self.__pid, SIGTERM)
+          except:
+            break
+          
+          time.sleep(.1)
+          count = count + 1
+        
+        self.stop()
+        
+class _Popen3Env(Popen3):
+    def __init__(self, cmd, capturestderr=False, bufsize=-1, env=os.environ):
+        self._env = env
+        Popen3.__init__(self, cmd, capturestderr, bufsize)
+    
+    def _run_child(self, cmd):
+        if isinstance(cmd, basestring):
+            cmd = ['/bin/sh', '-c', cmd]
+        for i in xrange(3, MAXFD):
+            try:
+                os.close(i)
+            except OSError:
+                pass
+
+        try:
+            os.execvpe(cmd[0], cmd, self._env)
+        finally:
+            os._exit(1)
+            
+class _Popen4Env(_Popen3Env, Popen4):
+    childerr = None
+
+    def __init__(self, cmd, bufsize=-1, env=os.environ):
+        self._env = env
+        Popen4.__init__(self, cmd, bufsize)
+        
+class loop(baseThread):
+    """ A simple extension of the threading.Thread class which continuously
+        executes a block of code until join().
+    """
+
+    def __init__(self, name, functionRef, functionArgs=None, sleep=1, wait=0,
+        offset=False):
+        """Initialize a loop object.
+
+           name         - thread name
+           functionRef  - a function reference
+           functionArgs - function arguments in the form of a tuple,
+           sleep        - time to wait between function execs
+           wait         - time to wait before executing the first time
+           offset       - set true to sleep as an offset of the start of the
+                          last func exec instead of the end of the last func
+                          exec
+        """
+
+        self.__functionRef  = functionRef
+        self.__functionArgs = functionArgs
+        self.__sleep        = sleep
+        self.__wait         = wait
+        self.__offset       = offset
+
+        baseThread.__init__(self, name=name)
+
+    def run(self):
+        """Do not call this directly.  Call self.start()."""
+
+        startTime = None
+        while not self.stopFlag.isSet():
+            sleep = self.__sleep
+            if self.__wait > 0:
+                startWaitCount = 0
+                while not self.stopFlag.isSet():
+                    while not self.running.isSet():
+                        if self.stopFlag.isSet():
+                            break
+                        time.sleep(1)
+                    time.sleep(0.5)
+                    startWaitCount = startWaitCount + .5
+                    if startWaitCount >= self.__wait:
+                        self.__wait = 0
+                        break
+            startTime = time.time()
+
+            if not self.stopFlag.isSet():
+                if self.running.isSet():
+                    if self.__functionArgs:
+                        self.__functionRef(self.__functionArgs)
+                    else:
+                        self.__functionRef()
+            endTime = time.time()
+
+            while not self.running.isSet():
+                time.sleep(1)
+
+            while not self.stopFlag.isSet():
+                while not self.running.isSet():
+                    if self.stopFlag.isSet():
+                        break
+                    time.sleep(1)
+
+                currentTime = time.time()
+                if self.__offset:
+                    elapsed = time.time() - startTime
+                else:
+                    elapsed = time.time() - endTime
+
+                if elapsed >= self.__sleep:
+                    break
+
+                time.sleep(0.5)
+        
+        self.isFinished.set()
+
+    def set_sleep(self, sleep, wait=None, offset=None):
+        """Modify loop frequency paramaters.
+
+           sleep        - time to wait between function execs
+           wait         - time to wait before executing the first time
+           offset       - set true to sleep as an offset of the start of the
+                          last func exec instead of the end of the last func
+                          exec
+        """
+
+        self.__sleep = sleep
+        if wait != None:
+            self.__wait = wait
+        if offset != None:
+            self.__offset = offset
+
+    def get_sleep(self):
+        """Get loop frequency paramaters.
+        Returns a dictionary with sleep, wait, offset.
+        """
+
+        return {
+            'sleep'  : self.__sleep,
+            'wait'   : self.__wait,
+            'offset' : self.__offset,
+            }
+        
+class func(baseThread):
+    """ A simple extension of the threading.Thread class which executes 
+        a function in a separate thread.
+    """
+
+    def __init__(self, name, functionRef, functionArgs=None):
+        """Initialize a func object.
+
+           name         - thread name
+           functionRef  - a function reference
+           functionArgs - function arguments in the form of a tuple,
+        """
+
+        self.__functionRef  = functionRef
+        self.__functionArgs = functionArgs
+
+        baseThread.__init__(self, name=name)
+
+    def run(self):
+        """Do not call this directly.  Call self.start()."""
+
+        if not self.stopFlag.isSet():
+            if self.running.isSet():
+                if self.__functionArgs:
+                    self.__functionRef(self.__functionArgs)
+                else:
+                    self.__functionRef()
+        sys.exit(0)

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py
------------------------------------------------------------------------------
    svn:executable = *

Propchange: lucene/hadoop/trunk/src/contrib/hod/hodlib/Common/threads.py
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL



Mime
View raw message