Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3268F18A38 for ; Thu, 5 Nov 2015 03:10:13 +0000 (UTC) Received: (qmail 10497 invoked by uid 500); 5 Nov 2015 03:10:12 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 10453 invoked by uid 500); 5 Nov 2015 03:10:12 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 10442 invoked by uid 99); 5 Nov 2015 03:10:12 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Nov 2015 03:10:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E63F31A2364 for ; Thu, 5 Nov 2015 03:10:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id P1uJX_f6-Cbd for ; Thu, 5 Nov 2015 03:09:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id BB33D43AFB for ; Thu, 5 Nov 2015 03:09:57 +0000 (UTC) Received: (qmail 9980 invoked by uid 99); 5 Nov 2015 03:09:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Nov 2015 03:09:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20663E1869; Thu, 5 Nov 2015 03:09:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rlei@apache.org To: commits@hawq.incubator.apache.org Date: Thu, 05 Nov 2015 03:10:01 -0000 Message-Id: <0991c9be820e42778101536e05dfb212@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/8] incubator-hawq git commit: HAWQ-121. Remove legacy command line tools. http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9932786b/tools/bin/gpmigrator ---------------------------------------------------------------------- diff --git a/tools/bin/gpmigrator b/tools/bin/gpmigrator deleted file mode 100755 index f190184..0000000 --- a/tools/bin/gpmigrator +++ /dev/null @@ -1,3797 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -gpmigrator [options] old-gphome new-gphome - -Options: - -h, --help show this help message and exit - -v, --version show the program's version number and exit - -q quiet mode - -d DIRECTORY the master host data directory - -l DIRECTORY log file directory - -R revert a previous gpmigrator run - --debug debugging information -''' - -#============================================================ -import sys, os -from datetime import datetime as dtdatetime - -# Python version 2.6.2 is expected, must be between 2.5-3.0 -if sys.version_info < (2, 5, 0) or sys.version_info >= (3, 0, 0): - sys.stderr.write("Error: %s is supported on Python versions 2.5 or greater\n" - "Please upgrade python installed on this machine." - % os.path.split(__file__)[-1]) - sys.exit(1) - -try: - from gppylib.operations.gpMigratorUtil import * - from gppylib.operations.hdfs_cmd import * - from gppylib.commands.base import Command -except ImportError, e: - sys.exit('Error: unable to import module: ' + str(e)) - -libdir = os.path.join(sys.path[0], 'lib/') - -logger = get_default_logger() -EXECNAME = os.path.split(__file__)[-1] -MIGRATIONUSER = 'gpmigrator' -LOCKEXT = '.gpmigrator_orig' -WORKDIR = 'gpmigrator' -BACKUPDIR = 'backup' -UPGRADEDIR = 'upgrade' -HDFS_SS = 'gpmigrator' -NEW_FS_POSTFIX = 'new' -PARALLELISM = 16 - -#============================================================ -__version__ = '$Revision: #218 $' - - -#============================================================ -def makeCommand(oldHome, newHome, oldVersion, newVersion, - command, pid, dataDirectories, filespaces, option1, - option2, method, isUpgrade, isRevert, mirrors): - - # space separated list of directories - datadirs = base64.urlsafe_b64encode(pickle.dumps(dataDirectories)) - fse = base64.urlsafe_b64encode(pickle.dumps(filespaces)) - - hasMirrors = len(mirrors) > 0 - - cmd = [ - EXECNAME, - oldHome, - newHome, - '--internal-oldversion=' + urllib.quote(str(oldVersion)), - '--internal-newversion=' + urllib.quote(str(newVersion)), - '--internal-command=' + str(command), - '--internal-pid=' + str(pid), - '--internal-dirs=' + urllib.quote(datadirs), - '--internal-filespaces=' + urllib.quote(fse), - '--internal-option=' + urllib.quote(str(option1)), - '--internal-option2=' + urllib.quote(str(option2)), - '--debug', - '--quiet', - ] - if method: - cmd.append('--internal-method='+ urllib.quote(str(method))) - if isUpgrade: - cmd.append('--internal-isupgrade') - if isRevert: - cmd.append('-R') - if hasMirrors: - cmd.append('--internal-mirrors') - - return cmd - -#============================================================ -class GpControlData(base.Command): - """ - Get the control data for a specified directory. - """ - def __init__(self, name, directory, ctxt=base.LOCAL, remoteHost=None): - self.controldata = None - - cmdStr = "$GPHOME/bin/pg_controldata %s" % directory - base.Command.__init__(self, name, cmdStr, ctxt, remoteHost) - - def get_controldata(self): - if not self.controldata: - self.controldata = {} - for line in self.results.stdout.split('\n'): - try: - (key, value) = line.split(':', 1) - self.controldata[key] = value.strip() - except: - pass - return self.controldata - - @staticmethod - def local(name,directory): - cmd=GpControlData(name,directory) - cmd.run(validateAfter=True) - return cmd.get_controldata() - - -#============================================================ -class GPUpgrade(GPUpgradeBase): - ''' - HAWQ Upgrade Utility - ''' - - # INTERNAL command: - # MASTER - default, run on master, [not included in list] - # MKDIR - Create directories, run on every host - # RMDIR - Remove directories, run on every host - # CHKDIR - Check directories, run on every host - # RESETXLOG - Syncronize xlogs - # DEEPLINK - Recursive hardlink - # EXTRACTCATFILES - build a list of catalog files to be copied - # BUILDSKEL - build a skeleton copy of the old system - # TRANSFORMCAT - apply catalog transformations - # SETCATVERSION - set catalog version - # EXTRACTAOSEGS - get ao seg info - # GETHIGHESTOID - find the highest OID in the cluster - commands = ['SETSTATE', 'MKDIR', 'RMDIR', 'CHKDIR', 'RESETXLOG', - 'DEEPLINK', 'CHKDOWN', 'LOCKDOWN', 'UNLOCK', - 'EXTRACTCATFILES', 'BUILDSKEL', 'TRANSFORMCAT', - 'SETCATVERSION', 'EXTRACTAOSEGS', 'GETHIGHESTOID', - 'TOUCHUPCONFIG', 'FIXCONFIG'] - - #------------------------------------------------------------ - def __init__(self): - ''' - The most basic of initialization - ''' - super(GPUpgrade, self).__init__() - - self.resume = False - self.mirrormode = None - - self.pid = os.getpid() # Process ID of the master - - # Non-master variables - self.option2 = None # Argument passed to cmd - - # Environment and system info - self.datadirs = None # Set of all data and mirror directories - self.filespaces= None # Set of filespace directory (dboid as key) - self.host = None # master hostname - self.upgrade = None # True for upgrade, False for downgrade - self.method = None # transform or dumprestore ? - - # Upgrade information and state - self.quiet = False - self.revert = False - self.checkschema = True - self.warnings = False - self.gpperfmon = None - self.oldversion = None - self.newversion = None - self.newmaster = None # directory: workdir/upgrade/gp-1 - self.state = None # file: backup/state - self.config = {} # gp_init_config for new db - self.segments = [] # list of segments - self.seg_prefix = None - - #------------------------------------------------------------ - def Setup(self): - ''' - Basic initialization, separate from __init__ for exception - handling purposes. - ''' - - # GPHOME, PYTHONPATH must be setup properly - # GPHOME must match the location of this file - # The first PYTHONPATH must be based on the proper gphome - gphome_bin = os.path.realpath(os.path.split(__file__)[0]) - gphome = os.path.split(gphome_bin)[0] - env_GPHOME = os.path.realpath(os.environ.get('GPHOME')) - if (env_GPHOME != gphome): - logger.fatal(" $GPHOME is set to %s which is not newhome" % env_GPHOME) - logger.fatal(' source the greenplum.sh from the newhome to setup env ') - raise UpgradeError('Initialization failed') - - pythonpath = os.path.join(gphome, "lib", "python") - env_PYTHONPATH = os.path.realpath(os.environ.get('PYTHONPATH').split(':')[0]) - if (env_PYTHONPATH != pythonpath): - logger.fatal(' $PYTHONPATH is incorrect ') - logger.fatal(' source the greenplum.sh from the newhome to setup env ') - raise UpgradeError('Initialization failed') - - # This is the same path used by gpinitsystem - self.path = '/usr/kerberos/bin:/usr/sfw/bin:/opt/sfw/bin' - self.path += ':/usr/local/bin:/bin:/usr/bin:/sbin:/usr/sbin:/usr/ucb' - self.path += ':/sw/bin' - - # Set defaults - self.user = os.environ.get('USER') or os.environ.get('LOGNAME') - self.host = self.RunCmd('hostname') - self.masterdir = os.environ.get('MASTER_DATA_DIRECTORY') - - self.ParseInput() - - # Setup worker pool - self.pool = base.WorkerPool(numWorkers=PARALLELISM); - - # Extract path from input master directory - if self.cmd == 'MASTER': - logger.info('Beginning upgrade') - logger.info('Checking configuration') - - if not self.masterdir or len(self.masterdir) == 0: - raise UpgradeError('MASTER_DATA_DIRECTORY is not defined') - self.masterdir = self.masterdir.rstrip('/') - - # The configuration file - conf = os.path.join(self.masterdir, 'postgresql.conf') - - # Simple function to look for settings in the conf file - def getconf(x): - conf_re = re.compile('^\s*%s\s*=\s*(\w+)' % x) - try: - conf_str = self.RunCmd('grep %s %s' % (x, conf)) - except CmdError: - conf_str = "" # grep returns errorcode on no match - value = None - for line in conf_str.split('\n'): - match = conf_re.search(line) - if match: - value = match.group(1) - return value - - # Find the port for this segment: - self.masterport = getconf('port') - if self.masterport == None: - raise UpgradeError('Could not determine master port from ' + conf) - self.masterport = int(self.masterport) - - # Determine if perfmon is enabled - self.gpperfmon = getconf('gp_enable_gpperfmon') - - self.sock_dir = getconf('unix_socket_directory') - if not self.sock_dir: - self.sock_dir = '/tmp/' - - # Verify that (max_connections == superuser_reserved_connections) - # max_conn = getconf('max_connections') - # reserved = getconf('superuser_reserved_connections') - - masterpath, masterdir = os.path.split(self.masterdir) - self.workdir = os.path.join(masterpath, WORKDIR) - self.newmaster = os.path.join(self.workdir, UPGRADEDIR, masterdir) - self.oldenv = self.SetupEnv(self.oldhome, self.masterdir) - self.newenv = self.SetupEnv(self.newhome, self.newmaster) - - else: - self.oldenv = self.SetupEnv(self.oldhome, None) - self.newenv = self.SetupEnv(self.newhome, None) - - # check for hadoop command set in env - if self.cmd == 'MASTER': - self.hdfs_cmd = HDFS_Cmd() - self.hdfs_cmd.check_hdfs() - - #------------------------------------------------------------ - def ParseInput(self): - ''' - Parses and validates input to the script - ''' - - try: - parser = optparse.OptionParser(usage=(cli_help(EXECNAME) or __doc__), add_help_option=False) - parser.add_option('-v', '--version', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('-h', '--help', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('-d', dest='directory', - help=optparse.SUPPRESS_HELP) - parser.add_option('-l', dest='logdir', - help=optparse.SUPPRESS_HELP) - parser.add_option('-q', '--quiet', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('-R', dest='revert', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('-c', '--check-only', dest='checkonly', action='store_true', - help=optparse.SUPPRESS_HELP) - - parser.add_option('--debug', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-disable-checkschema', - dest='nocheckschema', action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-command', dest='command', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-pid', dest='pid', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-dirs', dest='datadirs', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-filespaces', dest='fse', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-option', dest='option', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-option2', dest='option2', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-method', dest='method', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-oldversion', dest='oldversion', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-newversion', dest='newversion', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-isupgrade', dest='upgrade', - action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('--internal-mirrors', dest='mirrors', - action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('--nocheckcat', dest='nocheckcat', - action='store_true', - help=optparse.SUPPRESS_HELP) - parser.add_option('--fault-injection', dest='faultinjection', - type='int', help=optparse.SUPPRESS_HELP) - (options, args) = parser.parse_args() - - if options.version: - print EXECNAME + ' ' + __version__ - sys.exit(0) - - if options.help: - usage(EXECNAME) - sys.exit(0) - - if len(args) != 2: - usage(EXECNAME) - msg = "incorrect number of arguments" - if len(args) > 0: - msg += ": %s" % str(args) - parser.error(msg) - - except Exception, e: - usage(EXECNAME) - raise UpgradeError('Error parsing input: ' + str(e)) - - if options.revert: - self.revert = True - - if options.checkonly: - self.checkonly = True - - if options.directory: - self.masterdir = options.directory - if options.command: - if options.command not in self.commands: - parser.error('INVALID INTERNAL COMMAND: ' + options.command) - self.cmd = options.command - if options.pid: - if self.cmd == 'MASTER': - parser.error('INVALID INTERNAL COMMAND: ' + self.cmd) - self.pid = int(options.pid) - self.oldhome = args[0].rstrip('/') - self.newhome = args[1].rstrip('/') - if options.datadirs: - self.datadirs = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.datadirs)))) - if options.fse: - self.filespaces = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.fse)))) - if options.option: - self.option = urllib.unquote(options.option) - if options.option2: - self.option2 = urllib.unquote(options.option2) - if options.debug: - self.debug = True - enable_verbose_logging() - if options.quiet: - self.quiet = True - quiet_stdout_logging() - if options.logdir: - self.logdir = options.logdir - else: - self.logdir = os.path.join(os.environ['HOME'], 'gpAdminLogs') - - # --internal_mirrors just indicates that the cluster has mirrors, - # for simplicity we have self.mirrors still point to a list, but - # its contents are not directly useful. - if options.mirrors: - self.mirrors = [True] - - try: - setup_tool_logging(EXECNAME, - unix.getLocalHostname(), - unix.getUserName(), - self.logdir) - except OSError, e: - logger.fatal('cannot log to %s: %s' % (self.logdir, str(e))) - exit(1) - - if options.nocheckschema: - self.checkschema = False - - if self.cmd != 'MASTER': - self.oldversion = GpVersion(urllib.unquote(options.oldversion)) - self.newversion = GpVersion(urllib.unquote(options.newversion)) - self.method = options.method - if options.upgrade: - self.upgrade = True - else: - self.upgrade = False - if options.nocheckcat: - self.checkcat = False - else: - self.checkcat = True - self.faultinjection = options.faultinjection - - if self.cmd == 'MASTER': - self.method = "transform" # Currently the only supported method - - today = date.today().strftime('%Y%m%d') - if not os.path.isdir(self.logdir): - os.makedirs(self.logdir, 0700) - logname = os.path.join(self.logdir, '%s_%s.log' % (EXECNAME, today)) - self.logfile = open(logname, 'a') - - self.pid = os.getpid() - if self.masterdir == None: - logger.fatal('MASTER_DATA_DIRECTORY parameter not set') - raise UpgradeError('Initialization failed') - if not os.path.exists(self.masterdir): - logger.fatal('MASTER_DATA_DIRECTORY: %s not found' - % self.masterdir) - raise UpgradeError('Initialization failed') - if not os.path.isabs(self.masterdir): - self.masterdir = os.path.abspath(self.masterdir) - if not os.path.exists(self.oldhome): - logger.fatal(' directory not found: ' + self.oldhome) - raise UpgradeError('Initialization failed') - if not os.path.exists(self.newhome): - logger.fatal(' directory not found: ' + self.newhome) - raise UpgradeError('Initialization failed') - if not os.path.isabs(self.oldhome): - self.oldhome = os.path.abspath(self.oldhome) - if not os.path.isabs(self.newhome): - self.newhome = os.path.abspath(self.newhome) - - - #------------------------------------------------------------ - def Cleanup(self): - ''' - Cleanup open connections. - Separate from __del__ because that caused weird issues with exit() - behavior. - ''' - if self.cmd == 'MASTER': - logger.fatal('Fatal error occurred - Recovering') - try: - self.Shutdown() - except Exception, e: - logger.fatal('Cleanup failure: ' + str(e)) - - try: - self.ReleaseLockdown(self.oldenv) - self.Copy_hawq_config(True) - except Exception, e: - logger.fatal('Cleanup failure: ' + str(e)) - - if self.pool: - del self.pool - - - - #------------------------------------------------------------ - def SetState(self, newstate): - ''' - Sets the state in the upgrader state file - ''' - - # Set state across all hosts, if all hosts succeed then set the - # master state - if self.cmd == 'MASTER': - self.CallSlaves('SETSTATE', newstate) - try: - self.state.write(str(newstate) + "\n") - self.state.flush() - except Exception, e: - raise UpgradeError('Error writing to statefile: (%s)' % str(e)) - else: - - # Build up a unique set of workdirectories on this host - locations = set() - for d in self.datadirs: - (location, _) = os.path.split(d) - locations.add(os.path.join(location, WORKDIR)) - - for d in locations: - statefile = os.path.join(d, 'state') - file = open(statefile, 'a') - file.write(str(newstate) + "\n") - file.close() - - #------------------------------------------------------------ - def CallSlaves(self, cmd, option='', option2='', includeMirrors=False): - ''' - Calls every host to execute the given command with the given option - value - ''' - - logger.debug("Remote call: %s" % cmd) - - # Check for things that should never happen - if self.cmd != 'MASTER': - raise UpgradeError("Recursive communication error") - if not self.array: - raise UpgradeError("Failure initializing array") - if not self.hostcache: - raise UpgradeError("Failure initializing host cache") - if not self.pool: - raise UpgradeError("Failure initializing worker pool") - - if self.upgrade or self.revert: - gphome = self.newhome - else: - gphome = self.oldhome - - # Construct the commands to pass to the worker pool - hosts = self.hostcache.get_hosts() - for host in hosts: - hostname = host.hostname - - # Skip any hosts in the cache that contain no segments for this - # configuration. - if len(host.dbs) == 0: - continue - - # Get the data directories for this host: - datadirs = [] - for seg in host.dbs: - if includeMirrors or seg.isSegmentPrimary(): - datadirs.append(seg.getSegmentDataDirectory()) - - fse = [] - for seg in host.dbs: - if includeMirrors or seg.isSegmentPrimary(): - sfs = seg.getSegmentFilespaces(); - for (oid, dir) in sfs.iteritems(): - fse.append(dir) - - # Skip any hosts that have no applicable data directories - if len(datadirs) == 0: - continue - - cmdList = makeCommand(oldHome=self.oldhome, - newHome=self.newhome, - oldVersion=self.oldversion, - newVersion=self.newversion, - command=cmd, - pid=self.pid, - dataDirectories=datadirs, - filespaces=fse, - option1=option, - option2=option2, - method=self.method, - isUpgrade=self.upgrade, - isRevert=self.revert, - mirrors=self.mirrors) - - c = GpUpgradeCmd("gpmigrator_mirror remote call", - cmdList, - ctxt=base.REMOTE, - remoteHost=hostname) - - self.pool.addCommand(c) - - # Wait for the segments to finish - try: - self.pool.join() - except: - self.pool.haltWork() - self.pool.joinWorkers() - - failure = False - results = [] - for cmd in self.pool.getCompletedItems(): - r = cmd.get_results() - - # Going through the gppylib Command interface all stderr from the - # remote calls gets redirected to stdout, which is unfortunate - # because we'd like to be able to differentiate between the two. - # - # We keep the stdout chatter to a minimum by passing --quiet to - # the remote calls which performs quiet stdout logging. - - # sys.stderr.write(r.stderr) - msg = r.stdout.strip() - results.append(msg) - - if not cmd.was_successful(): - log_literal(logger, logging.ERROR, msg) - failure=True - - if failure: - raise UpgradeError("Fatal Segment Error") - - # Warning this output contains everything written to stdout, - # which unfortunately includes some of the logging information - return "\n".join(results) - - - #------------------------------------------------------------ - def CheckDirectories(self): - if self.cmd == 'MASTER': - fsdirs = self.masterfilespace - else: - fsdirs = self.filespaces - - err = False - - # Build up a unique set of workdirectories on this host - locations = set() - for d in fsdirs: - (location, _) = os.path.split(d) - locations.add(os.path.join(location, WORKDIR)) - - # Check that none of them exist - for d in locations: - if os.path.exists(d): - logger.warn('%s : File exists' % d) - err = True - - if err: - raise UpgradeError('Directories exist') - - #------------------------------------------------------------ - def TransformHDFS(self, isRevert=False): - """ - Remove gpseg* from the hdfs file path - And rename relfile to (segfile_id+1) * segnode_num * segfile_postfix_number in relation dir - """ - def rename_hdfs_relfile(path): - # transform old path in hawq1 to the new path for hawq2.0 - (filespace, segid, tablespace, dbid, filenode) = path.rsplit("/", 4) - segid = segid.replace(self.seg_prefix, "") - if filenode=='PG_VERSION': - new_path = os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, filenode) - # need to skip if segid !=0 - if segid!='0': - return - else: - if '.' not in filenode: - relfile = '0' - relation = filenode - if(segid=='0'): - new_path = os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, relation) - self.hdfs_cmd.run_hdfs_mkdir(new_path, True) - # don't need to copy this relfile because it is always 0 size - return - else: - (relation, relfile) = filenode.rsplit(".", 1) - - segnode_num = len(self.segments) - fid = (int(segid)+1)+segnode_num*(int(relfile)-1) - new_path = os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, relation, str(fid)) - - self.hdfs_cmd.run_hdfs_mv(path, new_path) - - def transform_hdfs_path(path): - # if the hdfs snapshot already exists, rename it - ss_path = os.path.join(path, '.snapshot', HDFS_SS) - if self.hdfs_cmd.run_hdfs_test(ss_path, '-e')==0: - self.hdfs_cmd.run_hdfs_rename_snapshot(path, HDFS_SS, ss_oldname) - # create a hdfs snapshot before applying change - result = self.hdfs_cmd.run_hdfs_create_snapshot(path, HDFS_SS) - # now we can change hdfs - result = self.hdfs_cmd.run_hdfs_ls(path, True) - for dir, path1 in result: - if dir!='d': - rename_hdfs_relfile(path1) - - def revert_hdfs_relfile(path): - # revert the transformation old path in hawq1 to the new path for hawq2.0 - (others, filenode) = path.rsplit("/", 1) - if filenode=='PG_VERSION': - (filespace, postfix, tablespace, dbid) = others.rsplit("/", 3) - new_path = os.path.join(filespace, '%s0'%self.seg_prefix, tablespace, dbid, filenode) - else: - (filespace, postfix, tablespace, dbid, relnode) = others.rsplit("/", 4) - segnode_num = len(self.segments) - fid = int((int(filenode)-1)/segnode_num)+1 - sid = (int(filenode)-1) % segnode_num - new_path = os.path.join(filespace, '%s%i'%(self.seg_prefix, sid), tablespace, dbid, '%s.%i'%(relnode, fid)) - - self.hdfs_cmd.run_hdfs_mv(path, new_path) - - def revert_hdfs_fs(fs): - # because restore snapshot which calls 'hdfs dfs -cp -p' - # will report error in case that hawq need the hdfs config - # 'dfs.namenode.accesstime.precision' set to invalid - # So we need to using 'hdfs dfs -mv' from the new filspace path - #self.hdfs_cmd.run_hdfs_restore_snapshot(path, HDFS_SS) - result = self.hdfs_cmd.run_hdfs_ls(os.path.join(fs, NEW_FS_POSTFIX), True) - for dir, path1 in result: - if dir!='d': - revert_hdfs_relfile(path1) - self.hdfs_cmd.run_hdfs_rm(os.path.join(fs, NEW_FS_POSTFIX),True) - - def transform_hdfs_fs(fs, isRevert): - if isRevert: - revert_hdfs_fs(fs) - else: - result1 = self.hdfs_cmd.run_hdfs_ls(fs, False) - result2 = self.hdfs_cmd.parse_command_output(result1, [[0, '^(d)$', 1], [1, '^(.*/%s\d+)$'%self.seg_prefix, 1]]) - for dir, path in result2: - if dir=='d': - transform_hdfs_path(path) - - # Beginning of the outer func - # Get hdfs path from all filespaces, here remove the postfix 'gpseg*' - dfs_fs = self.Select("SELECT DISTINCT regexp_replace(fselocation, E'/%s\\\\d+','') FROM pg_filespace_entry WHERE fselocation LIKE 'hdfs%%'" % self.seg_prefix, forceutility=True) - - if not dfs_fs: - logger.fatal("Can't get filespace's hdfs location from catalog pg_filespace_entry.") - raise UpgradeError("Can't get filespace's hdfs location from catalog pg_filespace_entry.") - # force to leave safe mode because it is frequently occurs and make hdfs not writable. - self.hdfs_cmd.run_hdfs_leave_safemode() - # rename old snapshot appending with timestamp - ss_oldname = HDFS_SS + dtdatetime.now().strftime('%Y%m%d_%H%M%S'); - # get the new file space postfix, doesn't overwrite the existing same dir - global NEW_FS_POSTFIX - i = 0 - if self.hdfs_cmd.run_hdfs_test(os.path.join(dfs_fs[0],NEW_FS_POSTFIX), '-e')==0: - i = 1 - while self.hdfs_cmd.run_hdfs_test(os.path.join(dfs_fs[0],NEW_FS_POSTFIX+str(i)), '-e')==0: - i += 1 - # get the existing dir number i-1 - if isRevert: - i = i-1; - if i<0: - logger.fatal("Can't get filespace postfix for %s." % dfs_fs[0]) - exit(1) - if i>0: - NEW_FS_POSTFIX = NEW_FS_POSTFIX + str(i) - - for fs in dfs_fs: - if not isRevert: - self.hdfs_cmd.run_hdfs_mkdir(os.path.join(fs, NEW_FS_POSTFIX), True) - transform_hdfs_fs(fs, isRevert) - - #------------------------------------------------------------ - def Revert(self): - ''' - Revert to backup - ''' - - # Check for indication that something is running: - if not self.dbup: - env = None - try: - env = self.CheckUp() - except UpgradeError: - pass - - # We use the catalog versions we cached during versionCheck to - # determine if the MASTER_DATA_DIRECTORY is actually an old - # master data directory, or a partially upgraded one. - # - # Note: it would be better to adjust gpstop so that it knows how - # to stop old versions and always shutdown using a new environment. - # - # Note: It would also be good to avoid starting and stopping more - # than necessary here. - if env == self.oldenv and self.datacat == self.newcat: - self.newenv['MASTER_DATA_DIRECTORY'] = \ - self.oldenv['MASTER_DATA_DIRECTORY'] - env = self.newenv - - if env: - self.dbup = [env, False, False] - - # Now that we've determined what, if anything, is running, we must - # stop it. - try: - if self.faultinjection == -1: - raise UpgradeError("faultinjection=%d" % self.faultinjection) - self.Shutdown() - self.CheckDown(self.revert) - except BaseException, e: - logger.fatal('***************************************') - logger.fatal(str(e)) - logger.fatal('***************************************') - raise UpgradeError("Unable to REVERT\nShutdown the running " + - "database and rerun gpmigrator with '-R'") - - logger.info('Reverting to Backup') - self.revert = True - - statefile = os.path.join(self.workdir, 'state') - if not os.path.exists(statefile): - logger.warn(" '%s' not found" % statefile) - logger.info('Revert aborted') - sys.exit(0) - state = self.RunCmd('cat ' + statefile) - - needrevert = False - needRevertHDFS = False - for line in state.split('\n'): - if line == 'BACKUP_VALID': - needrevert = True - if line == 'TRANSFORMHDFS': - needRevertHDFS = True - - hba = os.path.join(self.masterdir, 'pg_hba.conf'+LOCKEXT) - ident = os.path.join(self.masterdir, 'pg_ident.conf'+LOCKEXT) - if os.path.exists(hba) or os.path.exists(ident): - needunlock = True - else: - needunlock = False - - etc_d = os.path.join(self.newhome, 'etc') - hawq_site_orig = os.path.join(etc_d, 'hawq-site.xml'+LOCKEXT) - hawq_sla_orig = os.path.join(etc_d, 'slaves'+LOCKEXT) - if os.path.exists(hawq_site_orig) or os.path.exists(hawq_sla_orig): - needRevertHawqSiteCfg= True - else: - needRevertHawqSiteCfg = False - - if (needrevert or needunlock): - try: - self.ReadInfo() - except UpgradeError, e: - logger.fatal(str(e)) - logger.info('Revert aborted') - sys.exit(1) - - # Attempt revert - if needrevert: - logger.info('Restoring directories') - self.DeepLink('REVERT') - - # Truncate the statefile - # + Must occur AFTER actual Revert is processed - open(statefile, 'w').truncate() - - if needunlock: - self.ReleaseLockdown(self.oldenv) - if needRevertHawqSiteCfg: - self.Copy_hawq_config(True) - - self.Startup(self.oldenv) - - if needRevertHDFS: - logger.info('Revert HDFS changes') - self.TransformHDFS(True) - - logger.info('Removing upgrade user') - self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER) - self.Shutdown() - - # done - logger.info('Revert Successful') - self.revert = False - - - #------------------------------------------------------------ - def CheckVersions(self): - ''' - Validates that the upgrade from old->new is okay - ''' - - # Log the OS type info - os_type = os.uname()[0] - os_ver = os.uname()[2] - logger.info('Operating System: %s %s' % (os_type, os_ver)) - - # Log version checking - logger.info('Checking version compatibility') - - # If we got a version, but it isn't recognized by the GpVersion class - # then it is likely not a Greenplum database. - oldversion = self.getversion(self.oldhome, self.oldenv) - newversion = self.getversion(self.newhome, self.newenv) - try: - oldversion = GpVersion(oldversion) - except: - raise UpgradeError('Source not a HAWQ instance: ' + oldversion) - try: - newversion = GpVersion(newversion) - except: - raise UpgradeError('Target not a HAWQ instance: ' + newversion) - - logger.info('Source Version: HAWQ %s' % str(oldversion)) - logger.info('Target Version: HAWQ %s' % str(newversion)) - - self.oldversion = oldversion - self.newversion = newversion - self.upgrade = (newversion >= oldversion) - - is_supported_version(oldversion, self.upgrade) - is_supported_version(newversion, self.upgrade) - - if newversion == oldversion: - raise UpgradeError("HAWQ is already version '%s'" - % str(newversion)) - elif newversion.isVersionRelease(oldversion): - raise UpgradeError("Upgrade not needed to go from version '%s' to version '%s'" - % (str(oldversion), str(newversion))) - - # We don't support downgrade - if not self.upgrade: - raise UpgradeError("Downgrade to HAWQ %s not supported from gpmigrator" - % str(newversion)) - - if self.upgrade and not newversion.isVersionCurrentRelease(): - main = GpVersion('main') - raise UpgradeError( - "Upgrade from '%s' to '%s' not supported. Target version should be HAWQ %s" - % (str(oldversion), str(newversion), main.getVersionRelease())) - - logger.debug('Using %s method for migration' % self.method) - - # Compare old version catalog number with the catalog in - # MASTER_DATA_DIRECTORY - catalog_re = re.compile('Catalog version number: *(.*)') - oldcat = self.RunCmd('postgres --catalog-version', env=self.oldenv) - logger.info("Source %s" % oldcat) - newcat = self.RunCmd('postgres --catalog-version', env=self.newenv) - logger.info("Target %s" % newcat) - - # May need this info later - self.oldcat = oldcat - self.newcat = newcat - - control = self.RunCmd('pg_controldata ' + self.masterdir, env=self.oldenv) - for line in control.split('\n'): - m = catalog_re.match(line) - if m: - self.datacat = line - logger.info("Data %s " % line) - if ((not self.revert and line != oldcat) or - (self.revert and line != oldcat and line != newcat)): - logger.debug('catalog mismatch: expected %s, found %s' - % (oldcat, line)) - msg = 'Catalog in %s does not match source binary' % self.masterdir - raise UpgradeError(msg) - break - - # For the moment everything goes: - # - Additional checks will be made once the old database is up - logger.info('Versions are compatible') - - - #------------------------------------------------------------ - def RemoveDirectories(self, force=False): - ''' - Removes upgrade/ and backup/ directories on all segments - ''' - - # If force is set then we remove the directories regardless of - # whether we created them or they were created by another process. - # If force is not set then we will throw an exception when we are - # done if there were directories that we failed to remove. - doerror = False - if self.cmd == 'MASTER': - logger.info('Removing temporary directories') - self.CallSlaves('RMDIR', force, includeMirrors=True) - fsdirs = self.masterfilespace - else: - fsdirs = self.filespaces - - # Build up a unique set of workdirectories on this host - locations = set() - for d in fsdirs: - (location, _) = os.path.split(d) - locations.add(os.path.join(location, WORKDIR)) - - for d in filter(os.path.exists, locations): - flag = os.path.join(d, 'flag') - if (not force) and os.path.exists(flag): - try: - dpid = self.RunCmd('cat ' + flag) - dpid = int(dpid) - except Exception, e: - logger.warn('Read error on file: ' + str(e)) - doerror = True - continue - - if dpid != self.pid: - logger.warn('Directory %s owned by pid %s' - % (d, dpid)) - doerror = True - continue - - # Regardless of the setting of force, if there is an active - # process running, we won't remove the directory. - running = self.RunCmd('find %s -name postmaster.pid -print' % d) - if (running): - logger.info(running) - logger.fatal('Active processes running') - doerror = True - else: - self.RunCmd('rm -rf ' + d) - - # We hold off throwing the error until the end, because we want to make - # sure we removed all of our own directories. - if doerror: - raise UpgradeError('Failed to remove old upgrade directories') - - #------------------------------------------------------------ - def CreateDirectories(self): - ''' - Creates the upgrade/ and backup/ directories on all segments - ''' - - if self.cmd == 'MASTER': - - dirs_exist = False - try: - self.CheckDirectories() - self.CallSlaves('CHKDIR', includeMirrors=True) - except UpgradeError, e: - dirs_exist = True - if dirs_exist: - self.RemoveDirectories(force=True) - - - logger.info('Creating temporary directories') - - # Have the slaves create their directories - self.CallSlaves('MKDIR', includeMirrors=True) - - fsdirs = self.masterfilespace - - else: - fsdirs = self.filespaces - if self.cmd != 'MKDIR': - raise Exception(self.cmd + ' != MKDIR]') - - # Given all the actual segment datadirs we need to look one - # directory up for the datadir locations and create the gpmigrator - # subdirectory there. - locations = set() - for d in fsdirs: - if not os.path.exists(d): - raise UpgradeError('Missing Data Directory: %s' % d) - (location, _) = os.path.split(d) - workpath = os.path.join(location, WORKDIR) - locations.add(workpath) - - # Actually create the directory, none of the should exist yet! - for d in locations: - # Creating directories is an atomic operation, if mkdir - # succeeds then it did not exist previously making it ours - # This prevents mulitiple upgrades running simultaneously. - os.mkdir(d) - self.RunCmd('chmod 700 %s' % d) - - # Claim it in the name of our parent process. We make use of - # this flag during tear down to confirm that we created it. - flag = open(os.path.join(d, 'flag'), 'w') - flag.write('%s\n' % str(self.pid)) - flag.close() - - # Create subdirectories for /upgrade and /backup - upgradepath = os.path.join(d, UPGRADEDIR) - os.mkdir(upgradepath) - self.RunCmd('chmod 700 %s' % upgradepath) - - backuppath = os.path.join(d, BACKUPDIR) - os.mkdir(backuppath) - self.RunCmd('chmod 700 %s' % backuppath) - - if (self.cmd == 'MASTER'): - # Finally now that we have those directories we'll make a state file - self.state = open('%s/state' % self.workdir, 'w') - - #------------------------------------------------------------ - def GenerateConfigFiles(self): - ''' - Creates 'gp_databases' 'gp_array_config' and 'gp_config' files - ''' - - logger.info('Generating config files') - - # Write a dump of the databases hash - db_file = os.path.join(self.workdir, 'gp_databases') - o = open(db_file, 'w') - for (oid, name) in self.dbs.iteritems(): - o.write('%s:%s\n' % (oid, name)) - o.close() - - # Write a dump of the gparray object - array_file = os.path.join(self.workdir, 'gp_array_config') - self.array.dumpToFile(array_file) - - # Write our new gp_config file, this contains the configuration - # information not captured in gp_array_config. - config_file = os.path.join(self.workdir, 'gp_config') - o = open(config_file, 'w') - for item, value in self.config.iteritems(): - if type(value) == int: - o.write('%s=%d\n' % (str(item), value)) - else: - o.write('%s=%s\n' % (str(item), str(value))) - o.close() - logger.info('Configuration files generated') - - - #------------------------------------------------------------ - def ReadInfo(self): - ''' - When restarting an upgrade this method reads the configuration - information from the cached status files. - - It corresponds directly with ExtractInfo() which gathers the same - information from an active database. - - It is also the inverse of GenerateConfigFiles() which creates the files - that this function reads. - - In normal processing usually ExtractInfo() is called rather than - ReadInfo(). This function is used /instead/ when we are doing Revert() - processing, or when resuming an upgrade. E.g. cases when we do not - want to start the database to get the information we need because the - database may not be in a stable state. - ''' - - # self.array - configuration information - array_config = os.path.join(self.workdir, 'gp_array_config') - if not os.path.exists(array_config): - raise UpgradeError(" '%s' not found" % array_config) - self.array = GpArray.initFromFile(array_config) - - # self.dbs - database information - db_file = os.path.join(self.workdir, 'gp_databases') - if not os.path.exists(db_file): - raise UpgradeError(" '%s' not found" % db_file) - f = open(db_file, 'r') - for line in f: - (oid, name) = line.split(':', 1) - self.dbs[oid] = name.strip() - f.close() - - # self.hostcache - cache of hostnames - self.hostcache = GpHostCache(self.array, self.pool) - failed_pings = self.hostcache.ping_hosts(self.pool) - if len(failed_pings) > 0: - raise UpgradeError( - "Cannot upgrade while there are unreachable hosts") - - # Setup the master filespace dir - self.masterfilespace = [] - for (id, loc) in self.array.master.getSegmentFilespaces().iteritems(): - self.masterfilespace.append(loc) - - # self.primaries and self.mirrors - dbs = self.array.getSegDbList() - self.segments = filter(GpDB.isSegmentPrimary, dbs) - self.mirrors = filter(GpDB.isSegmentMirror, dbs) - - # We don't support mirror any more in this script. - if (self.mirrors): - raise UpgradeError( - "Please use gpmigrator_mirror to upgrade a cluster with mirroring") - - # self.config - config_file = os.path.join(self.workdir, 'gp_config') - if not os.path.exists(config_file): - raise UpgradeError(" '%s' not found" % config_file) - f = open(config_file, 'r') - for line in f: - (key, value) = line.split('=', 1) - self.config[key] = value - f.close() - - - #------------------------------------------------------------ - def ExtractInfo(self): - ''' - Get all the information we need from the old instance - ''' - - # Can only extract with a running database - env = self.CheckUp() - - # -- Get the array information - logger.info('Extracting configuration information') - port = self.masterport - user = env['USER'] - url = dbconn.DbURL(port=port, dbname='template0', username=user) - array = GpArray.initFromCatalog(url, utility=True) - self.array = array - - # -- Determine if there are any invalid segments - logger.info('Checking that all segments are valid') - invalid = array.get_invalid_segdbs() - if len(invalid) > 0: - for db in invalid: - logger.fatal('INVALID SEGMENT: %s:/%s' % - (db.getSegmentHostName(), - db.getSegmentDataDirectory())) - raise UpgradeError('Cannot upgrade database with invalid segments') - - - # -- Get a list of available databases, note this includes "template0"! - logger.info('Creating list of databases') - databases = self.Select("SELECT oid, datname from pg_database") - for row in databases: - self.dbs[str(row['oid'])] = row['datname'] - - # -- Look for an existing user named MIGRATIONUSER - logger.info('Checking gpmigrator user') - upgradeuser = self.Select( - "SELECT count(*)::numeric FROM pg_user " + - "WHERE usename = '%s'" % MIGRATIONUSER) - if upgradeuser[0] > 0: - logger.warn("Database user '%s' already exists" % MIGRATIONUSER) - self.Update('DROP USER ' + MIGRATIONUSER) - - if (not self.upgrade): - raise UpgradeError("Downgrade not supported") - - # -- Older releases didn't have hostname as part of configuration - # make sure we have an acurate list of address->host lookups. - logger.info('Validating hosts') - self.hostcache = GpHostCache(self.array, self.pool) - for host in self.hostcache.get_hosts(): - for seg in host.dbs: - seg.setSegmentHostName(host.hostname) - failed_pings = self.hostcache.ping_hosts(self.pool) - if len(failed_pings) > 0: - raise UpgradeError("Cannot upgrade while there are unreachable " - "hosts") - - # Setup the master filespace dirs - self.masterfilespace = [] - for (id, loc) in self.array.master.getSegmentFilespaces().iteritems(): - self.masterfilespace.append(loc) - - # get seg prefix from filespace last part - (others, last_part) = self.masterfilespace[0].rsplit("/", 1) - if last_part[-2:]: - self.seg_prefix = last_part[:-2] - - master = array.master - standbymaster = array.standbyMaster - - # Oddly gparray doesn't have methods for fetching primary segments - # specifically - dbs = array.getSegDbList() - self.segments = filter(GpDB.isSegmentPrimary, dbs) - self.mirrors = filter(GpDB.isSegmentMirror, dbs) - - # We don't support mirror any more in this script. - if (self.mirrors): - raise UpgradeError( - "Please use gpmigrator_mirror to upgrade a cluster with mirroring") - - # Internal sanity checking - if len(self.segments) == 0: - raise UpgradeError('No segments found') - if standbymaster > 0: - raise UpgradeError('Cannot upgrade while standbymaster is running') - - - # To upgrade mirrors from 3.3 format mirrors to 4.0 format mirrors we - # must have been provided with the replication ports. - if len(self.mirrors) > 0: - if (not self.config['REPLICATION_PORT_BASE'] or - not self.config['MIRROR_REPLICATION_PORT_BASE'] or - not self.mirrormode): - logger.fatal("System configured with mirrors:") - if not self.config['REPLICATION_PORT_BASE']: - logger.fatal(" missing option: " - "--replication_port_base=") - if not self.config['MIRROR_REPLICATION_PORT_BASE']: - logger.fatal(" missing option: " - "--mirror_replication_port_base=") - if not self.mirrormode: - logger.fatal(" missing option: --mirror-mode") - raise UpgradeError("Missing required upgrade options") - - if self.mirrormode not in ('redundant', 'single'): - logger.fatal("Unrecognised mirror-mode: %s" % self.mirrormode) - logger.fatal("Valid modes are: redundant, single") - raise UpgradeError("Invalid mirror-mode") - - # Setup the replication ports for all segments in the array by host - for host in self.hostcache.get_hosts(): - - # The port ranges we are assigning into - p_port = self.config['REPLICATION_PORT_BASE'] + 1 - m_port = self.config['MIRROR_REPLICATION_PORT_BASE'] + 1 - - # Used to determine overlapping port ranges - pri_port = [65535, 0] # [min, max] - mir_port = [65535, 0] - pri_rep_port = [65535, 0] - mir_rep_port = [65535, 0] - - for seg in host.dbs: - port = seg.getSegmentPort() - if seg.isSegmentPrimary(): - seg.setSegmentReplicationPort(p_port) - pri_port[0] = min(pri_port[0], port) - pri_port[1] = max(pri_port[1], port) - pri_rep_port[0] = min(pri_rep_port[0], p_port) - pri_rep_port[1] = max(pri_rep_port[1], p_port) - p_port += 1 - else: - seg.setSegmentReplicationPort(m_port) - mir_port[0] = min(mir_port[0], port) - mir_port[1] = max(mir_port[1], port) - mir_rep_port[0] = min(mir_rep_port[0], m_port) - mir_rep_port[1] = max(mir_rep_port[1], m_port) - m_port += 1 - - # Check for overlapping port ranges: - port_mat = [pri_port, mir_port, pri_rep_port, mir_rep_port] - label_mat = ["Primary", "Mirror", "Replication", - "Mirror-Replication"] - - for x in range(0, 4): - for y in range(x+1, 4): - if ((port_mat[x][0] >= port_mat[y][0] and - port_mat[x][0] <= port_mat[y][1]) or - (port_mat[x][1] >= port_mat[y][0] and - port_mat[x][1] <= port_mat[y][1])): - logger.error("Port collision on host %s" % - host.hostname) - logger.error("... %s port range: [%d-%d]" % - (label_mat[x], - port_mat[x][0], - port_mat[x][1])) - logger.error("... %s port range: [%d-%d]" % - (label_mat[y], - port_mat[y][0], - port_mat[y][1])) - raise UpgradeError("Port collision on host %s" % - host.hostname) - - # Check for unsupported index types. - found = False - logger.info("Validating indexes") - oids = sorted(self.dbs.keys()) - for dboid in oids: - db = self.dbs[dboid] - - if db == "template0": - continue - indexes = self.Select(''' - SELECT quote_ident(n.nspname) || '.' || - quote_ident(c.relname) as index, - m.amname as kind - FROM pg_class c - join pg_namespace n on (c.relnamespace = n.oid) - join pg_am m on (c.relam = m.oid) - WHERE c.relkind = 'i' and m.amname in ('gin', 'hash') - ''', db=db) - if len(indexes) > 0: - if found == False: - logger.fatal("Unable to upgrade the following indexes") - found = True - logger.fatal(" Database: %s" % db) - for row in indexes: - logger.fatal(" %s [%s]" % (row['index'], row['kind'])) - if found: - raise UpgradeError("Deprecated index types must be removed prior " - "to upgrade") - - # Check for unsupported AOCS table. - found = False - logger.info("Validating no AOCS table") - oids = sorted(self.dbs.keys()) - for dboid in oids: - db = self.dbs[dboid] - - #if db == "template0": - #continue - aocs_tbl = self.Select(''' - SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) as table - FROM pg_class c join pg_namespace n on (c.relnamespace = n.oid) - WHERE c.relkind = 'r' and c.relstorage='c' - ''', db=db) - if len(aocs_tbl) > 0: - if found == False: - logger.fatal("Unable to upgrade the following AOCS tables") - found = True - logger.fatal(" Database: %s" % db) - for row in aocs_tbl: - logger.fatal(" %s " % (row['table'])) - if found: - raise UpgradeError("Deprecated AOCS tables must be converted to AO or Parquet table " - "prior to upgrade.") - - # Check for SQL_ASCII database encoding - logger.info("Validating database encoding") - encoding = self.Select("SELECT datname FROM pg_database WHERE encoding=0") - if len(encoding) > 0: - logger.error("Deprecated database encodings found:") - for datname in encoding: - logger.error(" %s [SQL_ASCII]" % datname) - raise UpgradeError("Deprecated database encodings found - contact " - "Greenplum Support") - - # ARRAY_NAME - arrayname = self.Select('SELECT gpname FROM gp_id') - self.config['ARRAY_NAME'] = arrayname[0] - - # Determine PORT_BASE, (min of segments - 1) - port_base = array.get_min_primary_port() - 1 - self.config['PORT_BASE'] = port_base - - # Determine MIRROR_PORT_BASE, - if len(self.mirrors) > 0: - mirror_port_base = array.get_min_mirror_port() - 1 - self.config['MIRROR_PORT_BASE'] = mirror_port_base - - # MASTER_HOSTNAME, super easy - self.config['MASTER_HOSTNAME'] = master.getSegmentHostName() - - # MASTER_DIRECTORY, also easy - we already set it up - self.config['MASTER_DIRECTORY'] = self.newmaster - - # MASTER_PORT, super easy - self.config['MASTER_PORT'] = master.getSegmentPort() - - # TRUSTED_SHELL, set manually since there's only one supported value - self.config['TRUSTED_SHELL'] = 'ssh' - - # CHECK_POINT_SEGMENTS - self.config['CHECKPOINT_SEGMENTS'] = \ - self.Select('show checkpoint_segments')[0] - - # ENCODING - self.config['CLIENT_ENCODING'] = \ - self.Select('show client_encoding')[0] - - # LOCALE - self.config['LOCALE'] = \ - self.Select('show lc_ctype')[0] - - # MAX_CONNECTIONS - self.config['MAX_CONNECTIONS'] = \ - self.Select('show max_connections')[0] - - # Add some config needed for hawq2.0 - dfs_url = self.Select("SELECT regexp_replace(fselocation , E'/[^/]*$', '/') \ - FROM pg_filespace, pg_filespace_entry \ - WHERE fsefsoid=pg_filespace.oid AND fselocation LIKE 'hdfs%0' AND fsname='dfs_system'") - if not dfs_url[0]: - logger.fatal("Can't get DFS_URL from catalog.") - raise UpgradeError("Can't get DFS_URL from catalog.") - self.config['DFS_URL'] = dfs_url[0]; - - #check dfs_url exists - if self.hdfs_cmd.run_hdfs_test(dfs_url[0], '-e')!=0: - logger.fatal("hdfs path \"%s\" doesn't exists!"%dfs_url[0]) - raise UpgradeError("hdfs path \"%s\" doesn't exists!"%dfs_url[0]) - - self.config['TEMP_DIR'] = ''; - temp_dir_file = os.path.join(self.masterdir, 'gp_temporary_files_directories') - if os.path.exists(temp_dir_file): - temp_dirs = self.RunCmd('cat ' + temp_dir_file) - isFirst = True - for line in temp_dirs.split('\n'): - if(isFirst): - self.config['TEMP_DIR'] += line - isFirst = False - else: - self.config['TEMP_DIR'] += ',' + line - - - logger.info('Configuration acquired') - - - def copy_rewrite_hba(self, frm, to, upgrdfmt): - ''' - Between 3.2 and 3.3 we have a change in pg_hba.conf. The differences - are: - - "ident sameuser" becomes "ident" - - "ident " becomes "ident map=" - - "ldap "ldap url" becomes "ldap ldapserver=..." - - "pam servicename" becomes "pam pamservice="servicename"" - ''' - f = open(frm, 'r') - t = open(to, 'w') - logger.debug('writing %s to %s for upgrade = %s' - % (frm, to, str(upgrdfmt))) - for line in f: - # translate to new pg_hba.conf format - if upgrdfmt: - - # translate from 3.2 format => 3.3 format: - if self.oldversion < '3.3.0.0' and self.newversion >= '3.3.0.0': - # ident - line = re.compile('ident sameuser$').sub('ident', line) - line = re.compile('ident (?!map=)(.*)$').sub('ident map=\\1', line) - # pam - line = re.compile('pam (?!pamservice=)(.*)$').sub('pam pamservice="\\1"', line) - # ldap is a little more complex - # our objective is to parse the following: - # ldap[s]://[:]/[;prefix[;suffix]] - logger.debug("upgrading ldap entries") - r1 = re.compile('ldap "?(ldap[s]?)://([^:/]+)(:\d+)?/([^"]*)"?') - match = r1.search(line) - if match: - s = "ldapserver=" + match.group(2).strip() - if match.group(3): - s += " ldapport=" + str(match.group(3)[1:]).strip() - if match.group(4): - fixes = match.group(4).strip().split(';') - suffix = "" - prefix = "" - basedn = "" - if len(fixes) > 3: - raise UpgradeError('cannot rebuild ldap auth ' + - 'entry: ' + line) - if len(fixes) == 3: - suffix = ' ldapsuffix="' + fixes[2] + '"' - if len(fixes) >= 2: - prefix = ' ldapprefix="' + fixes[1] + '"' - if len(fixes) >= 1: - basedn = "" # not used - s += prefix + suffix - if match.group(1) == "ldaps": - s += " ldaptls=1" - line = re.compile('ldap (.*)$').sub('ldap ' + s, line) - - else: - # downgrade - if self.newversion < '3.3.0.0' and self.oldversion >= '3.3.0.0': - line = re.compile('ident$').sub('ident sameuser', line) - line = re.compile('ident map=(.*)$').sub('ident \\1', line) - - # logger.debug('writing to %s line %s' % (to, line)) - t.write(line) - f.close() - t.close() - - def move_rewrite_hba(self, frm, to, upgrade): - self.copy_rewrite_hba(frm, to, upgrade) - os.unlink(frm) - - #------------------------------------------------------------ - def Copy_hawq_config(self, isRevert=False): - ''' - Copy new hawq config (hawq-site.xml from template-hawq-site.xml & slaves) at new hawq2.0 $GPHOME/etc - If isRevert, it just revert the change with the backup - ''' - etc_d = os.path.join(self.newhome, 'etc') - - #copy hawq_site.xml file - hawq_site = os.path.join(etc_d, 'hawq-site.xml') - hawq_site_orig = os.path.join(etc_d, 'hawq-site.xml'+LOCKEXT) - hawq_site_templ= os.path.join(etc_d, 'template-hawq-site.xml') - - if not os.path.exists(hawq_site_templ): - if os.path.exists(hawq_site): - hawq_site_templ=hawq_site; - else: - raise UpgradeError("Cannot find the hawq-site.xml or template-hawq-site.xml in path: %", etc_d) - - if isRevert: - if os.path.exists(hawq_site_orig): - self.RunCmd('cp -f %s %s' % (hawq_site_orig, hawq_site)) - else: - if os.path.exists(hawq_site_templ): - self.RunCmd('cp -f %s %s' % (hawq_site, hawq_site_orig)) - if hawq_site_templ != hawq_site: - self.RunCmd('cp -f %s %s' % (hawq_site_templ, hawq_site)) - - # copy slaves file - hawq_sla = os.path.join(etc_d, 'slaves') - hawq_sla_orig = os.path.join(etc_d, 'slaves'+LOCKEXT) - if isRevert: - if os.path.exists(hawq_sla_orig): - self.RunCmd('cp -f %s %s' % (hawq_sla_orig, hawq_sla)) - else: - if os.path.exists(hawq_sla): - self.RunCmd('cp -f %s %s' % (hawq_sla, hawq_sla_orig)) - - #------------------------------------------------------------ - def update_xml_content(self, xmlfile_cont, property_name, property_value=''): - ''' - input: - xmlfile_cont: is file content read from file.readlines() - property_value: when it is '', just remove this property setting. - - Note that the xml file can not support format as: one line contains several xml tags, or one tag cross several lines - ''' - - # first round to find if this property exists, then overwrite this line - line_id = 0 - while(line_id%s' % property_name, line) - if m: - l1=line_id - line_id+=1; - if (line_id', next_line) - if m2: - l2 = line_id - # if empty string, need to remove this property setting - if(property_value==''): - # find '' backwords - l0 = l1 - while (l0>0): - l0 -= 1; - m3 = re.match('\s*', xmlfile_cont[l0]) - if m3: - l1=l0 - break - # find '' forwards - l0 = l2 - while (l0 < len(xmlfile_cont)-1): - l0 += 1; - m3 = re.match('\s*', xmlfile_cont[l0]) - if m3: - l2=l0 - break - # remove these lines - for l3 in range(l2, l1-1, -1): #delete lines in reverse order without affect line_id - del xmlfile_cont[l3] - return - # property exists, just change this property value - p = re.compile('\s*(.*)') - #p_next_line = p.match(next_line) - p_value = p.match(next_line).group(1) - next_line_new = re.sub(p_value, str(property_value), next_line) - xmlfile_cont[line_id] = next_line_new - return - # Error: property exist, value lost - log.fatal('Config file in xml format error.'); - exit(1); - line_id+=1; - - # propert doesn't exist, then add this setting - for line_id in range(len(xmlfile_cont)-1, -1, -1): - line = xmlfile_cont[line_id] - m = re.match('\s*', line) - if m: - xmlfile_cont.insert(line_id, ' \n'); line_id += 1 - xmlfile_cont.insert(line_id, ' %s\n' % property_name); line_id += 1 - xmlfile_cont.insert(line_id, ' %s\n' % str(property_value)); line_id += 1 - xmlfile_cont.insert(line_id, ' \n'); line_id += 1 - xmlfile_cont.insert(line_id, '\n'); line_id += 1 #blank line - - def Change_hawq_config(self): - ''' - Change options' values in hawq_site.xml according to old setting - ''' - - # because running 'hawq config -c xxx -v xxx' has problems here, just need to change xml config directly. - f1 = open("%s/etc/hawq-site.xml" % self.newhome) - file_cont = f1.readlines() - - if self.config['DFS_URL']: - temp_dfs_url = self.config['DFS_URL'].strip() - if temp_dfs_url.startswith("hdfs://"): - temp_dfs_url = temp_dfs_url[7:] - self.update_xml_content(file_cont, 'hawq_dfs_url', temp_dfs_url) - - if self.config['MASTER_HOSTNAME']: - self.update_xml_content(file_cont, 'hawq_master_address_host', self.config['MASTER_HOSTNAME']) - if self.config['MASTER_PORT']: - self.update_xml_content(file_cont, 'hawq_master_address_port', self.config['MASTER_PORT']) - if self.config['MASTER_DIRECTORY']: - self.update_xml_content(file_cont, 'hawq_master_directory', self.config['MASTER_DIRECTORY']) - - # In hawq1.x, temp dir on master or segment are the same - if self.config['TEMP_DIR'] != '': - self.update_xml_content(file_cont, 'hawq_master_temp_directory', self.config['TEMP_DIR']) - self.update_xml_content(file_cont, 'hawq_segment_temp_directory', self.config['TEMP_DIR']) - else: - # set /tmp by default - self.update_xml_content(file_cont, 'hawq_master_temp_directory', '/tmp') - self.update_xml_content(file_cont, 'hawq_segment_temp_directory', '/tmp') - - if self.array.standbyMaster: - self.update_xml_content(file_cont, 'hawq_standby_address_host', self.array.standbyMaster.hostname) - else: - self.update_xml_content(file_cont, 'hawq_standby_address_host') - - if self.config['PORT_BASE']: - self.update_xml_content(file_cont, 'hawq_segment_address_port', self.config['PORT_BASE']+1) - # in hawq2, now all segments has the same data dir - # so here we just set it to the first segment data dir - if self.array.segments[0].primaryDB.datadir: - self.update_xml_content(file_cont, 'hawq_segment_directory', self.array.segments[0].primaryDB.datadir) - - # yarn related is set to default value: yarn disabled - self.update_xml_content(file_cont, 'hawq_global_rm_type', 'none') - self.update_xml_content(file_cont, 'hawq_rm_yarn_address', 'localhost:9980') - self.update_xml_content(file_cont, 'hawq_rm_yarn_scheduler_address', 'localhost:9981') - - # write buffer back to the xml file - f2 = open("%s/etc/hawq-site.xml" % self.newhome, 'w') - f2.writelines(file_cont); - - # Add segment hosts into file slaves - slaves_str = '' - for seg in self.array.segments: - newseg = str(seg.primaryDB.hostname) + "\n" - # remove duplicate segment hosts - if newseg not in slaves_str: - slaves_str += newseg - slaves_file = os.path.join(self.newhome, 'etc', 'slaves') - file = open(slaves_file, 'w') - file.write(slaves_str) - file.close() - - #------------------------------------------------------------ - def SetupLockdown(self): - ''' - Change pg_hba.conf to disallow access to everyone except the - upgrade user. - ''' - - os_type = os.uname()[0].lower() - - if self.cmd == 'MASTER': - logger.info('Locking database') - self.CheckUp() - [env, utility, upgrade] = self.dbup; - if utility: - raise UpgradeError("Cannot lock database in utility mode") - - logger.info('... Creating upgrade user') - self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER) - self.Update('CREATE USER %s with superuser' % MIGRATIONUSER) - - logger.info('... Creating pg_hba.conf ') - - # To write out a new pg_hba.conf we need to know all the ip - # addresses that the master identifies with. - # - # If we re-enable segment locking then this will need to be - # passed to the segments as well. - if os_type == 'sunos': - cmd = 'ifconfig -a inet' - else: - cmd = 'ifconfig' - ifconf = self.RunCmd(cmd) - ipv4_re = re.compile('inet (?:addr:)?(\d+\.\d+\.\d+\.\d+)') - ipv6_re = re.compile('inet6 (?:addr: )?([a-zA-Z0-9:\.]+[a-zA-Z0-9:])') - ip = [] - for line in ifconf.split('\n'): - m = ipv4_re.search(line) - if m: - ip.append(m.group(1)) - m = ipv6_re.search(line) - if m: - ip.append(m.group(1)) - - # Currently we only lock the MASTER node - # - # locking down the segments requires having gpstart - # pass the PGUSER to the segments otherwise the segments - # don't have the permissions to startup. - # - # This leaves a "hole" in the lockdown that still allows - # utility connections to the segments. - hba = os.path.join(self.masterdir, 'pg_hba.conf') - ident = os.path.join(self.masterdir, 'pg_ident.conf') - - if env == self.oldenv or not os.path.exists(hba + LOCKEXT): - # pre-upgrade database makes a backup of files and then - # writes a new one - - # Solaris doesn't support ident authentication - if os_type != 'sunos': - self.RunCmd('mv -f %s %s' % (ident, ident+LOCKEXT)) - file = open(ident, 'w') - file.write('%s %s %s\n' % - (MIGRATIONUSER, self.user, MIGRATIONUSER)) - file.write('%s %s %s\n' % - (self.user, self.user, self.user)) - file.close() - - self.RunCmd('mv -f %s %s' % (hba, hba+LOCKEXT)) - file = open(hba, 'w') - if os_type == 'sunos': - file.write('local all %s trust\n' % MIGRATIONUSER) - file.write('local all %s trust\n' % self.user) - else: - file.write('local all %s ident map=%s\n' - % (MIGRATIONUSER, MIGRATIONUSER)) - file.write('local all %s ident map=%s\n' - % (self.user, self.user)) - - for addr in ip: - cidr_suffix = '/128' if ':' in addr else '/32' # MPP-15889 - file.write('host all %s %s%s trust\n' - % (MIGRATIONUSER, addr, cidr_suffix)) - file.write('host all %s %s%s trust\n' - % (self.user, addr, cidr_suffix)) - - file.close() - - if env == self.newenv: - # upgrade database just copies all locking information - # from the pre-upgrade database which should already be - # locked - dir = self.newmaster - - self.copy_rewrite_hba(hba, os.path.join(dir, 'pg_hba.conf'), - self.upgrade) - self.RunCmd('cp -f %s %s' % (ident, dir)) - - # Grab the original versions to (if any) - if os.path.exists(hba + LOCKEXT): - self.copy_rewrite_hba(hba+LOCKEXT, - os.path.join(dir, - 'pg_hba.conf'+LOCKEXT), - self.upgrade) - if os.path.exists(ident + LOCKEXT): - self.RunCmd('cp -f %s %s' % (ident+LOCKEXT, dir)) - - # With the new lockfiles in place force the server to reload - logger.info('... Syncing') - logger.debug('calling gpstop -u with env: %s' % str(env)) - self.RunCmd('gpstop -u -a -l %s' % self.logdir, env=env) - logger.info('Database Locked') - - # Check if there are any other sessions connected. Since we - # have modified the pg_hba.conf and run gpstop -u no new sessions - # will be allowed, but any that were already connected still - # need to be booted. - active = self.Select("SELECT count(*) FROM pg_stat_activity") - if active[0] > 1: - [env, utility, upgrade] = self.dbup - logger.info('Active sessions detected, restarting server') - self.Shutdown() - self.Startup(env, utility) - - - def CheckClusterReady(self, databases, port): - ''' - Pre-flight checks - ''' - for db in databases: - xact = self.Select("SELECT count(*)::numeric FROM " + - "pg_prepared_xacts", - db=db['dbname'], port=port); - if xact[0] > 0: - raise UpgradeError("Database contains prepared transactions"); - - #------------------------------------------------------------ - def ReleaseLockdown(self, env): - ''' - Revert pg_hba.conf to pre-lockdown state - ''' - - # Two main cases: - # 1) Release lockdown on old environment - # 2) Release lockdown on newly installed environment - # Both cases have the database installed under MASTER_DATA_DIRECTORY - # so we actually treat them exactly the same - - if self.cmd == 'MASTER': - - # nothing to do if we haven't setup an array object yet. - if not self.array: - return - - logger.info('Unlocking database') - - # See comment about disabling lockdown on segments in SetupLockdown() - # self.CallSlaves('UNLOCK') - - datadirs = [self.masterdir] - else: - datadirs = self.datadirs - - for dir in datadirs: - hba = os.path.join(dir, 'pg_hba.conf') - ident = os.path.join(dir, 'pg_ident.conf') - if os.path.exists(hba + LOCKEXT): - forward = False - if self.upgrade and env == self.newenv: - forward = True - elif not self.upgrade and env == self.oldenv: - forward = True - self.move_rewrite_hba(hba+LOCKEXT, hba, forward) - - if os.path.exists(ident + LOCKEXT): - self.RunCmd('mv -f %s %s' % (ident+LOCKEXT, ident)) - - - # if we don't think the database is up, double check! - if not self.dbup: - env = None - try: - env = self.CheckUp() - except UpgradeError: - pass - if env: - self.dbup = [env, False, False] - - # Re-source the conf files if the database is up. - if self.dbup: - if (env == self.oldenv): - self.RunCmd('gpstop -u -l %s' % self.logdir, env=env) - - # MPP-10107 - The server is still running with PGUSER set in its - # environment, which is wrong since that user doesn't exist. - # Ideally that shouldn't matter, but unfortunately it does. We - # deal with this by restarting the server. - # - # Note: the step above is still required because otherwise we - # can't even run this gpstop due to pg_hba.conf lock. - # - # A better solution would be to have gpstart make a constrained - # environment when it starts and stops the server, but that is - # more than a one-line change, and this solves this case with - # minimal impact to anything else. - self.RunCmd('gpstop -ral %s' % self.logdir, env=env) - else: - self.RunCmd('hawq stop cluster -a -u -l %s' % self.logdir, env=env) - self.RunCmd('hawq restart cluster -al %s' % self.logdir, env=env) - - # We would love to actually delete the user, but it's not actually - # safe to do so here because we unlock the source database before we - # are done with the upgrade. - return - - - #------------------------------------------------------------ - def ResetXLogs(self): - ''' - Resets the xlog prior to recreating the schema - ''' - if self.cmd == 'MASTER': - logger.info('Syncing XLogs') - datadirs = [self.masterdir] - self.CallSlaves('RESETXLOG') - else: - datadirs = self.datadirs - - # We should always have at least one data directory... - if len(self.datadirs) < 1: - return - - # Check the mapping in all data directories - for olddir in datadirs: - (d, seg) = os.path.split(olddir) - newdir = os.path.join(d, WORKDIR, UPGRADEDIR, seg) - - # Links are in place, but data will not be visible until - # we sync the pg_control info - oldsync = self.RunCmd('pg_resetxlog -n ' + olddir, - env=self.oldenv) - newsync = self.RunCmd('pg_resetxlog -n ' + newdir, - env=self.newenv) - - logger.debug('*****************************') - logger.debug(' - old xlog -') - logger.debug(oldsync) - logger.debug('*****************************') - logger.debug(' - new xlog -') - logger.debug(newsync) - logger.debug('*****************************') - - - # Nasty bit of code to turn the control data into a - # nice dict object - sync = {} - for line in oldsync.split('\n'): - if len(line) == 0: continue - (field, value) = line.split(':') - sync[field] = value.lstrip() - - # Set the next transaction id in new control - x = sync["Latest checkpoint's NextXID"].find('/') - if x < 0: - nextxid = sync["Latest checkpoint's NextXID"] - else: - nextxid = sync["Latest checkpoint's NextXID"][x+1:] - - self.RunCmd('pg_resetxlog -f -x %s %s' % (nextxid, newdir), - env=self.newenv) - - # Set the WAL archives - self.RunCmd('pg_resetxlog -l %s,%s,%s %s' % - (sync["Latest checkpoint's TimeLineID"], - sync["Current log file ID"], - sync["Next log file segment"], - newdir), env=self.newenv) - - # Set next oid - self.RunCmd('pg_resetxlog -o %s %s' % - (sync["Latest checkpoint's NextOID"], - newdir), env=self.newenv) - - # Set next multitransaction ID/offset - self.RunCmd('pg_resetxlog -m %s %s' % - (sync["Latest checkpoint's NextMultiXactId"], - newdir), env=self.newenv) - self.RunCmd('pg_resetxlog -O %s %s' % - (sync["Latest checkpoint's NextMultiOffset"], - newdir), env=self.newenv) - - # Replace the transaction logs with the old ones - # since the old logs actually correspond to the data - for f in ['pg_clog', - 'pg_distributedlog', - 'pg_subtrans', - 'pg_multixact']: - oldlogs = os.path.join(olddir, f) - newlogs = os.path.join(newdir, f) - - # the only one that wouldn't exist is pg_distributedlog - # and only if we were upgrading/downgrading to/from a - # version < 3.1.1.5 - if (os.path.exists(newlogs)): - self.RunCmd('rm -rf %s' % newlogs) - if (os.path.exists(oldlogs)): - self.RunCmd('cp -rf %s %s' % (oldlogs, newlogs)) - - #------------------------------------------------------------ - def DeepLink(self, style): - ''' - Makes a deep hard link copy of one directory into another - ''' - - if style == 'CLEANUP': - return - - if style not in ['BACKUP', 'INSTALL', 'REVERT', 'SKELETON']: - raise UpgradeError('FATAL', ' DeepLink(%s)!?' % style) - - if self.cmd == 'MASTER': - fsdirs = self.masterfilespace - if style == 'BACKUP': - logger.info('Linking backup') - logger.info('... This may take a while') - includeMirrors = True - elif style == 'INSTALL': - logger.info('Linking upgrade') - logger.info('... This may take a while') - includeMirrors = True - elif style == 'SKELETON': - logger.info('Linking skeleton') - logger.info('... This may take a while') - includeMirrors = False - else: - logger.info('Reverting to backup') - logger.info('... This may take a while') - includeMirrors = True - - - # Have all hosts perform the link first - self.CallSlaves('DEEPLINK', style, includeMirrors=includeMirrors) - - else: - fsdirs = self.filespaces - - for dir in fsdirs: - (location, content) = os.path.split(dir) - - # For Backup fsdirs is the source, for INSTALL it's the target - if style == 'BACKUP': - source = os.path.join(location, content) - destination = os.path.join(location, WORKDIR, BACKUPDIR, content) - if style == 'SKELETON': - source = os.path.join(location, content) - destination = os.path.join(location, WORKDIR, UPGRADEDIR, content) - if style == 'INSTALL': - source = os.path.join(location, WORKDIR, UPGRADEDIR, content) - destination = os.path.join(location, content) - if style == 'REVERT': - source = os.path.join(location, WORKDIR, BACKUPDIR, content) - destination = os.path.join(location, content) - - if not os.path.exists(source): - continue - - # Link the log file - if os.path.isfile(source + '.log'): - oldfile = sou