Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 54896 invoked from network); 20 Mar 2008 09:06:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Mar 2008 09:06:46 -0000 Received: (qmail 70717 invoked by uid 500); 20 Mar 2008 09:06:44 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 70582 invoked by uid 500); 20 Mar 2008 09:06:44 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 70573 invoked by uid 99); 20 Mar 2008 09:06:44 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Mar 2008 02:06:43 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Mar 2008 09:06:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7F83C1A9832; Thu, 20 Mar 2008 02:06:21 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r639214 - in /hadoop/core/trunk: CHANGES.txt src/contrib/hod/bin/hod src/contrib/hod/hodlib/Hod/hod.py src/contrib/hod/testing/lib.py Date: Thu, 20 Mar 2008 09:06:20 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080320090621.7F83C1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Thu Mar 20 02:06:19 2008 New Revision: 639214 URL: http://svn.apache.org/viewvc?rev=639214&view=rev Log: HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting the cluster directory. Contributed by Vinod Kumar Vavilapalli. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/contrib/hod/bin/hod hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py hadoop/core/trunk/src/contrib/hod/testing/lib.py Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639214&r1=639213&r2=639214&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Thu Mar 20 02:06:19 2008 @@ -117,6 +117,9 @@ HADOOP-2910. Throttle IPC Client/Server during bursts of requests or server slowdown. (Hairong Kuang via dhruba) + HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting + the cluster directory. (Vinod Kumar Vavilapalli via ddas) + OPTIMIZATIONS HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing Modified: hadoop/core/trunk/src/contrib/hod/bin/hod URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hod?rev=639214&r1=639213&r2=639214&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/hod/bin/hod (original) +++ hadoop/core/trunk/src/contrib/hod/bin/hod Thu Mar 20 02:06:19 2008 @@ -86,7 +86,7 @@ ('clusterdir', 'directory', 'Directory where cluster state information and hadoop-site.xml' + ' will be stored.', - True, None, False, True, 'd'), + True, None, False, False, 'd'), ('syslog-address', 'address', 'Syslog address.', False, None, False, True, 'y'), Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=639214&r1=639213&r2=639214&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original) +++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Thu Mar 20 02:06:19 2008 @@ -87,7 +87,8 @@ os.remove(item) class hodRunner: - def __init__(self, cfg): + + def __init__(self, cfg, log=None, cluster=None): self.__hodhelp = hodHelp() self.__ops = self.__hodhelp.ops self.__cfg = cfg @@ -96,14 +97,22 @@ self.__user = getpass.getuser() self.__registry = None self.__baseLogger = None - self.__setup_logger() + # Allowing to pass in log object to help testing - a stub can be passed in + if log is None: + self.__setup_logger() + else: + self.__log = log self.__userState = hodState(self.__cfg['hod']['user_state']) self.__clusterState = None self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None } - self.__cluster = hadoopCluster(self.__cfg, self.__log) + # Allowing to pass in log object to help testing - a stib can be passed in + if cluster is None: + self.__cluster = hadoopCluster(self.__cfg, self.__log) + else: + self.__cluster = cluster def __setup_logger(self): self.__baseLogger = hodlib.Common.logger.hodLog('hod') @@ -206,6 +215,12 @@ self.__opCode = 3 return + clusterList = self.__userState.read(CLUSTER_DATA_FILE) + if clusterDir in clusterList.keys(): + self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir)) + self.__opCode = 12 + return + self.__setup_cluster_logger(clusterDir) if re.match('\d+-\d+', nodes): (min, max) = nodes.split("-") @@ -292,8 +307,7 @@ self.__setup_cluster_state(clusterDir) clusterInfo = self.__clusterState.read() if clusterInfo == {}: - self.__opCode = 15 - self.__log.critical("Cluster %s not allocated." % clusterDir) + self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) else: self.__opCode = \ self.__cluster.deallocate(clusterDir, clusterInfo) @@ -302,9 +316,7 @@ self.__clusterState.clear() self.__remove_cluster(clusterDir) else: - self.__log.critical("Invalid cluster directory '%s' specified." % - clusterDir) - self.__opCode = 3 + self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) else: print self.__hodhelp.help_deallocate() self.__log.critical("%s operation requires one argument. " % operation @@ -314,8 +326,15 @@ def _op_list(self, args): clusterList = self.__userState.read(CLUSTER_DATA_FILE) for path in clusterList.keys(): + if not os.path.isdir(path): + self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) + continue self.__setup_cluster_state(path) clusterInfo = self.__clusterState.read() + if clusterInfo == {}: + # something wrong with the cluster directory. + self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) + continue clusterStatus = self.__cluster.check_cluster(clusterInfo) if clusterStatus == 12: self.__log.info("alive\t%s\t%s" % (clusterList[path], path)) @@ -334,34 +353,51 @@ if os.path.isdir(clusterDir): self.__setup_cluster_state(clusterDir) clusterInfo = self.__clusterState.read() - clusterStatus = self.__cluster.check_cluster(clusterInfo) - if clusterStatus == 12: - self.__print_cluster_info(clusterInfo) - self.__log.info("hadoop-site.xml at %s" % clusterDir) - elif clusterStatus == 10: - self.__log.critical("%s cluster is dead" % clusterDir) - elif clusterStatus == 13: - self.__log.warn("%s cluster hdfs is dead" % clusterDir) - elif clusterStatus == 14: - self.__log.warn("%s cluster mapred is dead" % clusterDir) - - if clusterStatus != 12: - if clusterStatus == 15: - self.__log.critical("Cluster %s not allocated." % clusterDir) - else: + if clusterInfo == {}: + # something wrong with the cluster directory. + self.__handle_invalid_cluster_directory(clusterDir) + else: + clusterStatus = self.__cluster.check_cluster(clusterInfo) + if clusterStatus == 12: self.__print_cluster_info(clusterInfo) self.__log.info("hadoop-site.xml at %s" % clusterDir) + elif clusterStatus == 10: + self.__log.critical("%s cluster is dead" % clusterDir) + elif clusterStatus == 13: + self.__log.warn("%s cluster hdfs is dead" % clusterDir) + elif clusterStatus == 14: + self.__log.warn("%s cluster mapred is dead" % clusterDir) + + if clusterStatus != 12: + if clusterStatus == 15: + self.__log.critical("Cluster %s not allocated." % clusterDir) + else: + self.__print_cluster_info(clusterInfo) + self.__log.info("hadoop-site.xml at %s" % clusterDir) - self.__opCode = clusterStatus + self.__opCode = clusterStatus else: - self.__log.critical("'%s' does not exist." % clusterDir) - self.__opCode = 3 + self.__handle_invalid_cluster_directory(clusterDir) else: print self.__hodhelp.help_info() self.__log.critical("%s operation requires one argument. " % operation + "A cluster path.") self.__opCode = 3 - + + def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False): + clusterList = self.__userState.read(CLUSTER_DATA_FILE) + if clusterDir in clusterList.keys(): + # previously allocated cluster. + self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir)) + if cleanUp: + self.__cluster.delete_job(clusterList[clusterDir]) + self.__remove_cluster(clusterDir) + self.__log.critical("Freeing resources allocated to the cluster.") + self.__opCode = 3 + else: + self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir)) + self.__opCode = 15 + def __print_cluster_info(self, clusterInfo): keys = clusterInfo.keys() Modified: hadoop/core/trunk/src/contrib/hod/testing/lib.py URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/testing/lib.py?rev=639214&r1=639213&r2=639214&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/hod/testing/lib.py (original) +++ hadoop/core/trunk/src/contrib/hod/testing/lib.py Thu Mar 20 02:06:19 2008 @@ -62,3 +62,49 @@ for i in range(0,79): str = str + "*" print >>sys.stderr, "\n", str, "\n" + +# This class captures all log messages logged by hodRunner and other classes. +# It is then used to verify that certain log messages have come. This is one +# way to validate that messages printed to the logger are correctly written. +class MockLogger: + def __init__(self): + self.__logLines = {} + + def info(self, message): + self.__logLines[message] = 'info' + + def critical(self, message): + self.__logLines[message] = 'critical' + + def warn(self, message): + self.__logLines[message] = 'warn' + + def debug(self, message): + # don't track debug lines. + pass + + # verify a certain message has been logged at the defined level of severity. + def hasMessage(self, message, level): + if not self.__logLines.has_key(message): + return False + return self.__logLines[message] == level + +# Stub class to test cluster manipulation operations. +class MockHadoopCluster: + + def __init__(self): + # store the operations received. + self.__operations = {} + + def delete_job(self, jobid): + self.__operations['delete_job'] = [jobid] + + def wasOperationPerformed(self, operation, args): + if self.__operations.has_key(operation): + actualArgs = self.__operations[operation] + for arg in actualArgs: + if arg not in args: + break + else: + return True + return False