Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 44030 invoked from network); 28 Jan 2008 16:10:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 28 Jan 2008 16:10:13 -0000 Received: (qmail 31045 invoked by uid 500); 28 Jan 2008 16:10:03 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 31026 invoked by uid 500); 28 Jan 2008 16:10:03 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 31015 invoked by uid 99); 28 Jan 2008 16:10:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Jan 2008 08:10:03 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Jan 2008 16:09:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2BC831A9832; Mon, 28 Jan 2008 08:09:47 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r615924 - in /hadoop/core/trunk/src/contrib/hod: bin/checknodes bin/hodcleanup support/ support/logcondense.py Date: Mon, 28 Jan 2008 16:09:44 -0000 To: core-commits@hadoop.apache.org From: nigel@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080128160947.2BC831A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: nigel Date: Mon Jan 28 08:09:43 2008 New Revision: 615924 URL: http://svn.apache.org/viewvc?rev=615924&view=rev Log: HADOOP-2720. Adding missed files from previous commit Added: hadoop/core/trunk/src/contrib/hod/bin/checknodes hadoop/core/trunk/src/contrib/hod/bin/hodcleanup hadoop/core/trunk/src/contrib/hod/support/ hadoop/core/trunk/src/contrib/hod/support/logcondense.py (with props) Added: hadoop/core/trunk/src/contrib/hod/bin/checknodes URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/checknodes?rev=615924&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/hod/bin/checknodes (added) +++ hadoop/core/trunk/src/contrib/hod/bin/checknodes Mon Jan 28 08:09:43 2008 @@ -0,0 +1,31 @@ +#Licensed to the Apache Software Foundation (ASF) under one +#or more contributor license agreements. See the NOTICE file +#distributed with this work for additional information +#regarding copyright ownership. The ASF licenses this file +#to you under the Apache License, Version 2.0 (the +#"License"); you may not use this file except in compliance +#with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. +#!/bin/bash +PBS_NODES_PATH=`which pbsnodes 2>/dev/null` +if [ -z $PBS_NODES_PATH ] +then + echo Could not find pbsnodes in path. Cannot check available number of nodes. + exit 1 +fi +if [ -z $1 ] +then + echo Usage: checknodes queue-name + exit 2 +fi +num=`$PBS_NODES_PATH :$1 2>&1 | grep "state = " | egrep -v "state = down" | grep -v "state = offline" | wc -l` +totalused=`$PBS_NODES_PATH :$1 2>&1 | grep "jobs =" | wc -l` +numleft=`expr $num - $totalused` +echo $numleft Added: hadoop/core/trunk/src/contrib/hod/bin/hodcleanup URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/bin/hodcleanup?rev=615924&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/hod/bin/hodcleanup (added) +++ hadoop/core/trunk/src/contrib/hod/bin/hodcleanup Mon Jan 28 08:09:43 2008 @@ -0,0 +1,233 @@ +#Licensed to the Apache Software Foundation (ASF) under one +#or more contributor license agreements. See the NOTICE file +#distributed with this work for additional information +#regarding copyright ownership. The ASF licenses this file +#to you under the Apache License, Version 2.0 (the +#"License"); you may not use this file except in compliance +#with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. +#!/bin/sh +""":" +work_dir=$(dirname $0) +base_name=$(basename $0) +original_dir=$PWD +cd $work_dir + +if [ $HOD_PYTHON_HOME ]; then + exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"} +elif [ -e /usr/bin/python ]; then + exec /usr/bin/python -u -OO $base_name ${1+"$@"} +elif [ -e /usr/local/bin/python ]; then + exec /usr/local/bin/python -u -OO $base_name ${1+"$@"} +else + exec python -u -OO $base_name ${1+"$@"} +fi +":""" + +"""The executable to be used by the user""" +import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re +from pprint import pformat +from optparse import OptionParser + +myName = os.path.basename(sys.argv[0]) +myName = re.sub(".*/", "", myName) +binDirectory = os.path.realpath(sys.argv[0]) +rootDirectory = re.sub("/bin/.*", "", binDirectory) +libDirectory = rootDirectory + +sys.path.append(libDirectory) + +from hodlib.Common.threads import simpleCommand +from hodlib.Common.util import local_fqdn, tar, filter_warnings,\ + get_exception_string, get_exception_error_string +from hodlib.Common.logger import hodLog + +filter_warnings() + +reVersion = re.compile(".*(\d+_\d+).*") +reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)") + +VERSION = None +if os.path.exists("./VERSION"): + vFile = open("./VERSION", 'r') + VERSION = vFile.readline() + vFile.close() + +def getLogger(hodRingOptions): + # print >>sys.stderr,"now" + if hodRingOptions['hodring-debug'] > 0: + _baseLogger = hodLog('hodring') + log = _baseLogger.add_logger('main') + + if hodRingOptions.has_key('hodring-stream'): + if hodRingOptions['hodring-stream']: + _baseLogger.add_stream(level=hodRingOptions['hodring-debug'], + addToLoggerNames=('main',)) + + _serviceID = hodRingOptions['service-id'] + if hodRingOptions['hodring-log-dir']: + if _serviceID: + __logDir = os.path.join(hodRingOptions['hodring-log-dir'], "%s.%s" % ( + hodRingOptions['user-id'], _serviceID)) + else: + __logDir = os.path.join(hodRingOptions['hodring-log-dir'], + hodRingOptions['user-id']) + if not os.path.exists(__logDir): + os.mkdir(__logDir) + + _baseLogger.add_file(logDirectory=__logDir, + level=hodRingOptions['hodring-debug'], addToLoggerNames=('main',)) + + if hodRingOptions['hodring-syslog-address']: + _baseLogger.add_syslog(hodRingOptions['hodring-syslog-address'], + level=hodRingOptions['hodring-debug'], addToLoggerNames=('main',)) + return log + +def ensureLogDir(logDir): + """Verify that the passed in log directory exists, and if it doesn't + create it.""" + if not os.path.exists(logDir): + try: + old_mask = os.umask(0) + os.makedirs(logDir, 01777) + os.umask(old_mask) + except Exception, e: + print >>sys.stderr, "Could not create log directories %s. Exception: \ + %s. Stack Trace: %s" % (logDir, get_exception_error_string() + , get_exception_string()) + raise e + + +def __archive_logs(conf, log): + # need log-destination-uri, __hadoopLogDirs, temp-dir + status = True + logUri = conf['log-destination-uri'] + hadoopLogDirs = conf['hadoop-log-dirs'] + if logUri: + try: + if hadoopLogDirs: + date = time.localtime() + for logDir in hadoopLogDirs: + (head, tail) = os.path.split(logDir) + (head, logType) = os.path.split(head) + tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % ( + logType, local_fqdn(), date[0], date[1], date[2], date[3], + date[4], date[5], random.randint(0,1000)) + + if logUri.startswith('file://'): + tarBallFile = os.path.join(logUri[7:], + tarBallFile) + else: + tarBallFile = os.path.join(conf['temp-dir'], tarBallFile) + + log.debug('archiving log files to: %s' % tarBallFile) + status = tar(tarBallFile, logDir, ['*',]) + log.info('archive %s status: %s' % (tarBallFile, status)) + if status and \ + logUri.startswith('hdfs://'): + __copy_archive_to_dfs(conf, tarBallFile) + log.info("copying archive to dfs finished") + dict = {} + except: + log.error(get_exception_string()) + status = False + return status + + +def __copy_archive_to_dfs(conf, archiveFile): + # need log-destination-uri, hadoopCommandstring and/or pkgs + hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri']) + + # FIXME this is a complete and utter hack. Currently hadoop is broken + # and does not understand hdfs:// syntax on the command line :( + + pid = os.getpid() + tempConfDir = '/tmp/%s' % pid + os.mkdir(tempConfDir) + tempConfFileName = '%s/hadoop-site.xml' % tempConfDir + tempHadoopConfig = open(tempConfFileName, 'w') + print >>tempHadoopConfig, "" + print >>tempHadoopConfig, " " + print >>tempHadoopConfig, " fs.default.name" + print >>tempHadoopConfig, " %s" % hdfsURIMatch.group(1) + print >>tempHadoopConfig, " No description" + print >>tempHadoopConfig, " " + print >>tempHadoopConfig, "" + tempHadoopConfig.close() + + # END LAME HACK + + (head, tail) = os.path.split(archiveFile) + destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], conf['service-id'], tail) + + log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile)) + + hadoopCmd = conf['hadoop-command-string'] + if conf['pkgs']: + hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop') + + # LAME HACK AGAIN, using config generated above :( + copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd, + tempConfDir, archiveFile, destFile) + + log.debug(copyCommand) + + copyThread = simpleCommand('hadoop', copyCommand) + copyThread.start() + copyThread.wait() + copyThread.join() + log.debug(pprint.pformat(copyThread.output())) + + # LAME HACK AGAIN, deleting config generated above :( + os.unlink(tempConfFileName) + os.rmdir(tempConfDir) + os.unlink(archiveFile) + +def unpack(): + parser = OptionParser() + option_list=["--log-destination-uri", "--hadoop-log-dirs", \ + "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", \ + "--service-id", "--hodring-debug", "--hodring-log-dir", \ + "--hodring-syslog-address", "--hodring-cleanup-list"] + regexp = re.compile("^--") + for opt in option_list: + parser.add_option(opt,dest=regexp.sub("",opt),action="store") + option_list.append("--hodring-stream") + parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",\ + action="store_true") + (options, args) = parser.parse_args() + _options= {} + for opt in dir(options): + if "--"+opt in option_list: + _options[opt] = getattr(options,opt) + if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']: + _options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",") + _options['hodring-debug'] = int(_options['hodring-debug']) + return _options + + +if __name__ == '__main__': + try: + conf = unpack() + log = getLogger(conf,) + log.debug("Logger initialised successfully") + status = __archive_logs(conf,log) + log.info("Archive status : %s" % status) + list = conf['hodring-cleanup-list'].split(',') + log.info("now removing %s" % list) + for dir in list: + if os.path.exists(dir): + log.debug('removing %s' % (dir)) + shutil.rmtree(dir, True) + log.debug("done") + log.info("Cleanup successfully completed") + except Exception, e: + if log: + log.info("Stack trace:\n%s" %(get_exception_error_string(),get_exception_string())) Added: hadoop/core/trunk/src/contrib/hod/support/logcondense.py URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/support/logcondense.py?rev=615924&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/hod/support/logcondense.py (added) +++ hadoop/core/trunk/src/contrib/hod/support/logcondense.py Mon Jan 28 08:09:43 2008 @@ -0,0 +1,205 @@ +#Licensed to the Apache Software Foundation (ASF) under one +#or more contributor license agreements. See the NOTICE file +#distributed with this work for additional information +#regarding copyright ownership. The ASF licenses this file +#to you under the Apache License, Version 2.0 (the +#"License"); you may not use this file except in compliance +#with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. +#!/bin/sh +""":" +work_dir=$(dirname $0) +base_name=$(basename $0) +cd $work_dir + +if [ $HOD_PYTHON_HOME ]; then +exec $HOD_PYTHON_HOME -OO -u $base_name ${1+"$@"} +elif [ -e /usr/bin/python ]; then +exec /usr/bin/python -OO -u $base_name ${1+"$@"} +elif [ -e /usr/local/bin/python ]; then +exec /usr/local/bin/python -OO -u $base_name ${1+"$@"} +else +exec python -OO -u $base_name ${1+"$@"} +fi +":""" + +from os import popen3 +import os, sys +import re +import time +from datetime import datetime +from optparse import OptionParser + +myName = os.path.basename(sys.argv[0]) +myName = re.sub(".*/", "", myName) + +reVersion = re.compile(".*(\d+_\d+).*") + +VERSION = '$HeadURL$' + +reMatch = reVersion.match(VERSION) +if reMatch: + VERSION = reMatch.group(1) + VERSION = re.sub("_", ".", VERSION) +else: + VERSION = 'DEV' + +options = ( {'short' : "-p", + 'long' : "--package", + 'type' : "string", + 'action' : "store", + 'dest' : "package", + 'metavar' : " ", + 'default' : 'hadoop', + 'help' : "Bin file for hadoop"}, + + {'short' : "-d", + 'long' : "--days", + 'type' : "int", + 'action' : "store", + 'dest' : "days", + 'metavar' : " ", + 'default' : 7, + 'help' : "Number of days before logs are deleted"}, + + {'short' : "-c", + 'long' : "--config", + 'type' : "string", + 'action' : "store", + 'dest' : "config", + 'metavar' : " ", + 'default' : None, + 'help' : "config directory for hadoop"}, + + {'short' : "-l", + 'long' : "--logs", + 'type' : "string", + 'action' : "store", + 'dest' : "log", + 'metavar' : " ", + 'default' : "/user/hod/logs", + 'help' : "directory where the logs are stored"}, + + {'short' : "-n", + 'long' : "--dynamicdfs", + 'type' : "string", + 'action' : "store", + 'dest' : "dynamicdfs", + 'metavar' : " ", + 'default' : "false", + 'help' : "'true', if the cluster is used to bring up dynamic dfs clusters, 'false' otherwise"} + ) + +def getDfsCommand(options, args): + if (options.config == None): + cmd = options.package + " " + "dfs " + args + else: + cmd = options.package + " " + "--config " + options.config + " dfs " + args + return cmd + +def runcondense(): + import shutil + + options = process_args() + # if the cluster is used to bring up dynamic dfs, we must leave NameNode and JobTracker logs, + # otherwise only JobTracker logs. Likewise, in case of dynamic dfs, we must also look for + # deleting datanode logs + filteredNames = ['jobtracker'] + deletedNamePrefixes = ['0-tasktracker-*'] + if options.dynamicdfs == 'true': + filteredNames.append('namenode') + deletedNamePrefixes.append('1-tasktracker-*') + deletedNamePrefixes.append('0-datanode-*') + + cmd = getDfsCommand(options, "-lsr " + options.log) + (stdin, stdout, stderr) = popen3(cmd) + lastjobid = 'none' + toPurge = { } + for line in stdout: + m = re.match("^(.*?)\s.*$", line) + filename = m.group(1) + # file name format: ///[0-1]-[jobtracker|tasktracker|datanode|namenode|]-hostname-YYYYMMDDtime-random.tar.gz + # first strip prefix: + if filename.startswith(options.log): + filename = filename.lstrip(options.log) + if not filename.startswith('/'): + filename = '/' + filename + else: + continue + + # Now get other details from filename. + k = re.match("/(.*)/(.*)/.*-.*-([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9]).*$", filename) + if k: + username = k.group(1) + jobid = k.group(2) + datetimefile = datetime(int(k.group(3)), int(k.group(4)), int(k.group(5))) + datetimenow = datetime.utcnow() + diff = datetimenow - datetimefile + filedate = k.group(3) + k.group(4) + k.group(5) + newdate = datetimenow.strftime("%Y%m%d") + print "%s %s %s %d" % (filename, filedate, newdate, diff.days) + + # if the cluster is used to bring up dynamic dfs, we must also leave NameNode logs. + foundFilteredName = False + for name in filteredNames: + if filename.find(name) >= 0: + foundFilteredName = True + break + + if foundFilteredName: + continue + + if (diff.days > options.days): + desttodel = filename + if not toPurge.has_key(jobid): + toPurge[jobid] = options.log.rstrip("/") + "/" + username + "/" + jobid + + for job in toPurge.keys(): + for prefix in deletedNamePrefixes: + cmd = getDfsCommand(options, "-rm " + toPurge[job] + '/' + prefix) + print cmd + ret = 0 + ret = os.system(cmd) + if (ret != 0): + print >> sys.stderr, "Command failed to delete file " + cmd + + +def process_args(): + global options, myName, VERSION + + usage = "usage: %s " % (myName) + + version = "%s %s" % (myName, VERSION) + + argParser = OptionParser(usage=usage, version=VERSION) + + for option_element in options: + argParser.add_option(option_element['short'], option_element['long'], + type=option_element['type'], action=option_element['action'], + dest=option_element['dest'], default=option_element['default'], + metavar=option_element['metavar'], help=option_element['help']) + + (parsedOptions, args) = argParser.parse_args() + + if not os.path.exists(parsedOptions.package): + argParser.error("Could not find path to hadoop binary: %s" % parsedOptions.package) + if not os.path.exists(parsedOptions.config): + argParser.error("Could not find config: %s" % parsedOptions.config) + if parsedOptions.days <= 0: + argParser.error("Invalid number of days specified, must be > 0: %s" % parsedOptions.config) + if parsedOptions.dynamicdfs!='true' and parsedOptions.dynamicdfs!='false': + argParser.error("Invalid option for dynamicdfs, must be true or false: %s" % parsedOptions.dynamicdfs) + + return parsedOptions + + +if __name__ == '__main__': + runcondense() + Propchange: hadoop/core/trunk/src/contrib/hod/support/logcondense.py ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hadoop/core/trunk/src/contrib/hod/support/logcondense.py ------------------------------------------------------------------------------ svn:executable = * Propchange: hadoop/core/trunk/src/contrib/hod/support/logcondense.py ------------------------------------------------------------------------------ svn:keywords = Id Revision HeadURL