hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [5/8] incubator-hawq git commit: HAWQ-121. Remove legacy command line tools.
Date Thu, 05 Nov 2015 03:10:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9932786b/tools/bin/gpmigrator
----------------------------------------------------------------------
diff --git a/tools/bin/gpmigrator b/tools/bin/gpmigrator
deleted file mode 100755
index f190184..0000000
--- a/tools/bin/gpmigrator
+++ /dev/null
@@ -1,3797 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-'''
-gpmigrator [options] old-gphome new-gphome
-
-Options:
-  -h, --help            show this help message and exit
-  -v, --version         show the program's version number and exit
-  -q                    quiet mode
-  -d DIRECTORY          the master host data directory
-  -l DIRECTORY          log file directory
-  -R                    revert a previous gpmigrator run
-  --debug               debugging information
-'''
-
-#============================================================
-import sys, os
-from datetime import datetime as dtdatetime
-
-# Python version 2.6.2 is expected, must be between 2.5-3.0
-if sys.version_info < (2, 5, 0) or sys.version_info >= (3, 0, 0):
-    sys.stderr.write("Error: %s is supported on Python versions 2.5 or greater\n"
-                     "Please upgrade python installed on this machine."
-                     % os.path.split(__file__)[-1])
-    sys.exit(1)
-
-try:
-    from gppylib.operations.gpMigratorUtil import *
-    from gppylib.operations.hdfs_cmd import *
-    from gppylib.commands.base import Command
-except ImportError, e:
-    sys.exit('Error: unable to import module: ' + str(e))
-
-libdir = os.path.join(sys.path[0], 'lib/')
-
-logger         = get_default_logger()
-EXECNAME       = os.path.split(__file__)[-1]
-MIGRATIONUSER  = 'gpmigrator'
-LOCKEXT        = '.gpmigrator_orig'
-WORKDIR        = 'gpmigrator'
-BACKUPDIR      = 'backup'
-UPGRADEDIR     = 'upgrade'
-HDFS_SS        = 'gpmigrator'
-NEW_FS_POSTFIX = 'new'
-PARALLELISM    = 16
-
-#============================================================
-__version__ = '$Revision: #218 $'
-
-
-#============================================================
-def makeCommand(oldHome, newHome, oldVersion, newVersion,
-                 command, pid, dataDirectories, filespaces, option1,
-                 option2, method, isUpgrade, isRevert, mirrors):
-
-        # space separated list of directories
-        datadirs = base64.urlsafe_b64encode(pickle.dumps(dataDirectories))
-        fse      = base64.urlsafe_b64encode(pickle.dumps(filespaces))
-
-        hasMirrors = len(mirrors) > 0
-
-        cmd = [
-            EXECNAME,
-            oldHome,
-            newHome,
-            '--internal-oldversion=' + urllib.quote(str(oldVersion)),
-            '--internal-newversion=' + urllib.quote(str(newVersion)),
-            '--internal-command=' + str(command),
-            '--internal-pid=' + str(pid),
-            '--internal-dirs=' + urllib.quote(datadirs),
-            '--internal-filespaces=' + urllib.quote(fse),
-            '--internal-option=' + urllib.quote(str(option1)),
-            '--internal-option2=' + urllib.quote(str(option2)),
-            '--debug',
-            '--quiet',
-            ]
-        if method:
-            cmd.append('--internal-method='+ urllib.quote(str(method)))
-        if isUpgrade:
-            cmd.append('--internal-isupgrade')
-        if isRevert:
-            cmd.append('-R')
-        if hasMirrors:
-            cmd.append('--internal-mirrors')
-
-        return cmd
-
-#============================================================
-class GpControlData(base.Command):
-    """
-    Get the control data for a specified directory.
-    """
-    def __init__(self, name, directory, ctxt=base.LOCAL, remoteHost=None):
-        self.controldata = None
-
-        cmdStr = "$GPHOME/bin/pg_controldata %s" % directory
-        base.Command.__init__(self, name, cmdStr, ctxt, remoteHost)
-
-    def get_controldata(self):
-        if not self.controldata:
-            self.controldata = {}
-            for line in self.results.stdout.split('\n'):
-                try:
-                    (key, value) = line.split(':', 1)
-                    self.controldata[key] = value.strip()
-                except:
-                    pass
-        return self.controldata
-
-    @staticmethod
-    def local(name,directory):
-        cmd=GpControlData(name,directory)
-        cmd.run(validateAfter=True)
-        return cmd.get_controldata()
-
-
-#============================================================
-class GPUpgrade(GPUpgradeBase):
-    '''
-    HAWQ Upgrade Utility
-    '''
-
-    # INTERNAL command:
-    #   MASTER    - default, run on master, [not included in list]
-    #   MKDIR     - Create directories, run on every host
-    #   RMDIR     - Remove directories, run on every host
-    #   CHKDIR    - Check directories, run on every host
-    #   RESETXLOG - Syncronize xlogs
-    #   DEEPLINK  - Recursive hardlink
-    #   EXTRACTCATFILES - build a list of catalog files to be copied
-    #   BUILDSKEL - build a skeleton copy of the old system
-    #   TRANSFORMCAT - apply catalog transformations
-    #   SETCATVERSION - set catalog version
-    #   EXTRACTAOSEGS - get ao seg info
-    #   GETHIGHESTOID - find the highest OID in the cluster
-    commands = ['SETSTATE', 'MKDIR', 'RMDIR', 'CHKDIR', 'RESETXLOG',
-                'DEEPLINK', 'CHKDOWN', 'LOCKDOWN', 'UNLOCK',
-                'EXTRACTCATFILES', 'BUILDSKEL', 'TRANSFORMCAT',
-                'SETCATVERSION', 'EXTRACTAOSEGS', 'GETHIGHESTOID',
-                'TOUCHUPCONFIG', 'FIXCONFIG']
-
-    #------------------------------------------------------------
-    def __init__(self):
-        '''
-        The most basic of initialization
-        '''
-        super(GPUpgrade, self).__init__()
-
-        self.resume     = False
-        self.mirrormode = None
-
-        self.pid       = os.getpid()   # Process ID of the master
-
-        # Non-master variables
-        self.option2   = None     # Argument passed to cmd
-
-        # Environment and system info
-        self.datadirs  = None     # Set of all data and mirror directories
-        self.filespaces= None     # Set of filespace directory (dboid as key)
-        self.host      = None     # master hostname
-        self.upgrade   = None     # True for upgrade, False for downgrade
-        self.method    = None     # transform or dumprestore ?
-
-        # Upgrade information and state
-        self.quiet      = False
-        self.revert     = False
-        self.checkschema = True
-        self.warnings   = False
-        self.gpperfmon  = None
-        self.oldversion = None
-        self.newversion = None
-        self.newmaster  = None     # directory: workdir/upgrade/gp-1
-        self.state      = None     # file:      backup/state
-        self.config     = {}       # gp_init_config for new db
-        self.segments   = []       # list of segments
-        self.seg_prefix = None
-
-    #------------------------------------------------------------
-    def Setup(self):
-        '''
-        Basic initialization, separate from __init__ for exception
-        handling purposes.
-        '''
-
-        # GPHOME, PYTHONPATH must be setup properly
-        #    GPHOME must match the location of this file
-        #    The first PYTHONPATH must be based on the proper gphome
-        gphome_bin = os.path.realpath(os.path.split(__file__)[0])
-        gphome     = os.path.split(gphome_bin)[0]
-        env_GPHOME = os.path.realpath(os.environ.get('GPHOME'))
-        if (env_GPHOME != gphome):
-            logger.fatal(" $GPHOME is set to %s which is not newhome" % env_GPHOME)
-            logger.fatal(' source the greenplum.sh from the newhome to setup env ')
-            raise UpgradeError('Initialization failed')
-
-        pythonpath     = os.path.join(gphome, "lib", "python")
-        env_PYTHONPATH = os.path.realpath(os.environ.get('PYTHONPATH').split(':')[0])
-        if (env_PYTHONPATH != pythonpath):
-            logger.fatal(' $PYTHONPATH is incorrect ')
-            logger.fatal(' source the greenplum.sh from the newhome to setup env ')
-            raise UpgradeError('Initialization failed')
-
-        # This is the same path used by gpinitsystem
-        self.path = '/usr/kerberos/bin:/usr/sfw/bin:/opt/sfw/bin'
-        self.path += ':/usr/local/bin:/bin:/usr/bin:/sbin:/usr/sbin:/usr/ucb'
-        self.path += ':/sw/bin'
-
-        # Set defaults
-        self.user      = os.environ.get('USER') or os.environ.get('LOGNAME')
-        self.host      = self.RunCmd('hostname')
-        self.masterdir = os.environ.get('MASTER_DATA_DIRECTORY')
-
-        self.ParseInput()
-
-        # Setup worker pool
-        self.pool = base.WorkerPool(numWorkers=PARALLELISM);
-
-        # Extract path from input master directory
-        if self.cmd == 'MASTER':
-            logger.info('Beginning upgrade')
-            logger.info('Checking configuration')
-
-            if not self.masterdir or len(self.masterdir) == 0:
-                raise UpgradeError('MASTER_DATA_DIRECTORY is not defined')
-            self.masterdir = self.masterdir.rstrip('/')
-
-            # The configuration file
-            conf = os.path.join(self.masterdir, 'postgresql.conf')
-
-            # Simple function to look for settings in the conf file
-            def getconf(x):
-                conf_re  = re.compile('^\s*%s\s*=\s*(\w+)' % x)
-                try:
-                    conf_str = self.RunCmd('grep %s %s' % (x, conf))
-                except CmdError:
-                    conf_str = ""     # grep returns errorcode on no match
-                value = None
-                for line in conf_str.split('\n'):
-                    match = conf_re.search(line)
-                    if match:
-                        value = match.group(1)
-                return value
-
-            # Find the port for this segment:
-            self.masterport = getconf('port')
-            if self.masterport == None:
-                raise UpgradeError('Could not determine master port from ' + conf)
-            self.masterport = int(self.masterport)
-
-            # Determine if perfmon is enabled
-            self.gpperfmon = getconf('gp_enable_gpperfmon')
-
-            self.sock_dir = getconf('unix_socket_directory')
-            if not self.sock_dir:
-                self.sock_dir = '/tmp/'
-
-            # Verify that (max_connections == superuser_reserved_connections)
-            # max_conn = getconf('max_connections')
-            # reserved = getconf('superuser_reserved_connections')
-
-            masterpath, masterdir = os.path.split(self.masterdir)
-            self.workdir   = os.path.join(masterpath, WORKDIR)
-            self.newmaster = os.path.join(self.workdir, UPGRADEDIR, masterdir)
-            self.oldenv    = self.SetupEnv(self.oldhome, self.masterdir)
-            self.newenv    = self.SetupEnv(self.newhome, self.newmaster)
-
-        else:
-            self.oldenv = self.SetupEnv(self.oldhome, None)
-            self.newenv = self.SetupEnv(self.newhome, None)
-
-        # check for hadoop command set in env
-        if self.cmd == 'MASTER':
-            self.hdfs_cmd = HDFS_Cmd()
-            self.hdfs_cmd.check_hdfs()
-
-    #------------------------------------------------------------
-    def ParseInput(self):
-        '''
-        Parses and validates input to the script
-        '''
-
-        try:
-            parser = optparse.OptionParser(usage=(cli_help(EXECNAME) or __doc__), add_help_option=False)
-            parser.add_option('-v', '--version', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-h', '--help', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-d', dest='directory',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-l', dest='logdir',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-q', '--quiet', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-R', dest='revert', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('-c', '--check-only', dest='checkonly', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-
-            parser.add_option('--debug', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-disable-checkschema',
-                              dest='nocheckschema', action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-command', dest='command',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-pid', dest='pid',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-dirs', dest='datadirs',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-filespaces', dest='fse',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-option', dest='option',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-option2', dest='option2',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-method', dest='method',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-oldversion', dest='oldversion',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-newversion', dest='newversion',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-isupgrade', dest='upgrade',
-                              action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--internal-mirrors', dest='mirrors',
-                              action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--nocheckcat', dest='nocheckcat',
-                              action='store_true',
-                              help=optparse.SUPPRESS_HELP)
-            parser.add_option('--fault-injection', dest='faultinjection',
-                              type='int', help=optparse.SUPPRESS_HELP)
-            (options, args) = parser.parse_args()
-
-            if options.version:
-                print EXECNAME + ' ' + __version__
-                sys.exit(0)
-
-            if options.help:
-                usage(EXECNAME)
-                sys.exit(0)
-
-            if len(args) != 2:
-                usage(EXECNAME)
-                msg = "incorrect number of arguments"
-                if len(args) > 0:
-                    msg += ": %s" % str(args)
-                parser.error(msg)
-
-        except Exception, e:
-            usage(EXECNAME)
-            raise UpgradeError('Error parsing input: ' + str(e))
-
-        if options.revert:
-            self.revert = True
-
-        if options.checkonly:
-            self.checkonly = True
-
-        if options.directory:
-            self.masterdir = options.directory
-        if options.command:
-            if options.command not in self.commands:
-                parser.error('INVALID INTERNAL COMMAND: ' + options.command)
-            self.cmd = options.command
-        if options.pid:
-            if self.cmd == 'MASTER':
-                parser.error('INVALID INTERNAL COMMAND: ' + self.cmd)
-            self.pid = int(options.pid)
-        self.oldhome   = args[0].rstrip('/')
-        self.newhome   = args[1].rstrip('/')
-        if options.datadirs:
-            self.datadirs = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.datadirs))))
-        if options.fse:
-            self.filespaces = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.fse))))
-        if options.option:
-            self.option   = urllib.unquote(options.option)
-        if options.option2:
-            self.option2  = urllib.unquote(options.option2)
-        if options.debug:
-            self.debug = True
-            enable_verbose_logging()
-        if options.quiet:
-            self.quiet = True
-            quiet_stdout_logging()
-        if options.logdir:
-            self.logdir = options.logdir
-        else:
-            self.logdir = os.path.join(os.environ['HOME'], 'gpAdminLogs')
-
-        # --internal_mirrors just indicates that the cluster has mirrors,
-        # for simplicity we have self.mirrors still point to a list, but
-        # its contents are not directly useful.
-        if options.mirrors:
-            self.mirrors = [True]
-
-        try:
-            setup_tool_logging(EXECNAME,
-                               unix.getLocalHostname(),
-                               unix.getUserName(),
-                               self.logdir)
-        except OSError, e:
-            logger.fatal('cannot log to %s: %s' % (self.logdir, str(e)))
-            exit(1)
-
-        if options.nocheckschema:
-            self.checkschema = False
-
-        if self.cmd != 'MASTER':
-            self.oldversion = GpVersion(urllib.unquote(options.oldversion))
-            self.newversion = GpVersion(urllib.unquote(options.newversion))
-            self.method = options.method
-            if options.upgrade:
-                self.upgrade = True
-            else:
-                self.upgrade = False
-        if options.nocheckcat:
-            self.checkcat = False
-        else:
-            self.checkcat = True
-        self.faultinjection = options.faultinjection
-
-        if self.cmd == 'MASTER':
-            self.method = "transform"  # Currently the only supported method
-
-            today = date.today().strftime('%Y%m%d')
-            if not os.path.isdir(self.logdir):
-                os.makedirs(self.logdir, 0700)
-            logname = os.path.join(self.logdir, '%s_%s.log' % (EXECNAME, today))
-            self.logfile = open(logname, 'a')
-
-            self.pid = os.getpid()
-            if self.masterdir == None:
-                logger.fatal('MASTER_DATA_DIRECTORY parameter not set')
-                raise UpgradeError('Initialization failed')
-            if not os.path.exists(self.masterdir):
-                logger.fatal('MASTER_DATA_DIRECTORY: %s not found'
-                             % self.masterdir)
-                raise UpgradeError('Initialization failed')
-            if not os.path.isabs(self.masterdir):
-                self.masterdir = os.path.abspath(self.masterdir)
-            if not os.path.exists(self.oldhome):
-                logger.fatal(' directory not found: ' + self.oldhome)
-                raise UpgradeError('Initialization failed')
-            if not os.path.exists(self.newhome):
-                logger.fatal(' directory not found: ' + self.newhome)
-                raise UpgradeError('Initialization failed')
-            if not os.path.isabs(self.oldhome):
-                self.oldhome = os.path.abspath(self.oldhome)
-            if not os.path.isabs(self.newhome):
-                self.newhome = os.path.abspath(self.newhome)
-
-
-    #------------------------------------------------------------
-    def Cleanup(self):
-        '''
-        Cleanup open connections.
-        Separate from __del__ because that caused weird issues with exit()
-        behavior.
-        '''
-        if self.cmd == 'MASTER':
-            logger.fatal('Fatal error occurred - Recovering')
-            try:
-                self.Shutdown()
-            except Exception, e:
-                logger.fatal('Cleanup failure: ' + str(e))
-
-            try:
-                self.ReleaseLockdown(self.oldenv)
-                self.Copy_hawq_config(True)
-            except Exception, e:
-                logger.fatal('Cleanup failure: ' + str(e))
-
-            if self.pool:
-                del self.pool
-
-
-
-    #------------------------------------------------------------
-    def SetState(self, newstate):
-        '''
-        Sets the state in the upgrader state file
-        '''
-
-        # Set state across all hosts, if all hosts succeed then set the
-        # master state
-        if self.cmd == 'MASTER':
-            self.CallSlaves('SETSTATE', newstate)
-            try:
-                self.state.write(str(newstate) + "\n")
-                self.state.flush()
-            except Exception, e:
-                raise UpgradeError('Error writing to statefile: (%s)' % str(e))
-        else:
-
-            # Build up a unique set of workdirectories on this host
-            locations = set()
-            for d in self.datadirs:
-                (location, _) = os.path.split(d)
-                locations.add(os.path.join(location, WORKDIR))
-
-            for d in locations:
-                statefile = os.path.join(d, 'state')
-                file = open(statefile, 'a')
-                file.write(str(newstate) + "\n")
-                file.close()
-
-    #------------------------------------------------------------
-    def CallSlaves(self, cmd, option='', option2='', includeMirrors=False):
-        '''
-        Calls every host to execute the given command with the given option
-        value
-        '''
-
-        logger.debug("Remote call: %s" % cmd)
-
-        # Check for things that should never happen
-        if self.cmd != 'MASTER':
-            raise UpgradeError("Recursive communication error")
-        if not self.array:
-            raise UpgradeError("Failure initializing array")
-        if not self.hostcache:
-            raise UpgradeError("Failure initializing host cache")
-        if not self.pool:
-            raise UpgradeError("Failure initializing worker pool")
-
-        if self.upgrade or self.revert:
-            gphome = self.newhome
-        else:
-            gphome = self.oldhome
-
-        # Construct the commands to pass to the worker pool
-        hosts = self.hostcache.get_hosts()
-        for host in hosts:
-            hostname = host.hostname
-
-            # Skip any hosts in the cache that contain no segments for this
-            # configuration.
-            if len(host.dbs) == 0:
-                continue
-
-            # Get the data directories for this host:
-            datadirs = []
-            for seg in host.dbs:
-                if includeMirrors or seg.isSegmentPrimary():
-                    datadirs.append(seg.getSegmentDataDirectory())
-
-            fse = []
-            for seg in host.dbs:
-                if includeMirrors or seg.isSegmentPrimary():
-                    sfs = seg.getSegmentFilespaces();
-                    for (oid, dir) in sfs.iteritems():
-                        fse.append(dir)
-
-            # Skip any hosts that have no applicable data directories
-            if len(datadirs) == 0:
-               continue
-
-            cmdList = makeCommand(oldHome=self.oldhome,
-                             newHome=self.newhome,
-                             oldVersion=self.oldversion,
-                             newVersion=self.newversion,
-                             command=cmd,
-                             pid=self.pid,
-                             dataDirectories=datadirs,
-                             filespaces=fse,
-                             option1=option,
-                             option2=option2,
-                             method=self.method,
-                             isUpgrade=self.upgrade,
-                             isRevert=self.revert,
-                             mirrors=self.mirrors)
-
-            c = GpUpgradeCmd("gpmigrator_mirror remote call",
-                             cmdList,
-                             ctxt=base.REMOTE,
-                             remoteHost=hostname)
-
-            self.pool.addCommand(c)
-
-        # Wait for the segments to finish
-        try:
-            self.pool.join()
-        except:
-            self.pool.haltWork()
-            self.pool.joinWorkers()
-
-        failure = False
-        results = []
-        for cmd in self.pool.getCompletedItems():
-            r = cmd.get_results()
-
-            # Going through the gppylib Command interface all stderr from the
-            # remote calls gets redirected to stdout, which is unfortunate
-            # because we'd like to be able to differentiate between the two.
-            #
-            # We keep the stdout chatter to a minimum by passing --quiet to
-            # the remote calls which performs quiet stdout logging.
-
-            # sys.stderr.write(r.stderr)
-            msg = r.stdout.strip()
-            results.append(msg)
-
-            if not cmd.was_successful():
-                log_literal(logger, logging.ERROR, msg)
-                failure=True
-
-        if failure:
-            raise UpgradeError("Fatal Segment Error")
-
-        # Warning this output contains everything written to stdout,
-        # which unfortunately includes some of the logging information
-        return "\n".join(results)
-
-
-    #------------------------------------------------------------
-    def CheckDirectories(self):
-        if self.cmd == 'MASTER':
-            fsdirs = self.masterfilespace
-        else:
-            fsdirs = self.filespaces
-
-        err = False
-
-        # Build up a unique set of workdirectories on this host
-        locations = set()
-        for d in fsdirs:
-            (location, _) = os.path.split(d)
-            locations.add(os.path.join(location, WORKDIR))
-
-        # Check that none of them exist
-        for d in locations:
-            if os.path.exists(d):
-                logger.warn('%s : File exists' % d)
-                err = True
-
-        if err:
-            raise UpgradeError('Directories exist')
-
-    #------------------------------------------------------------
-    def TransformHDFS(self, isRevert=False):
-        """
-        Remove gpseg* from the hdfs file path
-        And rename relfile to (segfile_id+1) * segnode_num * segfile_postfix_number in relation dir
-        """
-        def rename_hdfs_relfile(path):
-            # transform old path in hawq1 to the new path for hawq2.0
-            (filespace, segid, tablespace, dbid, filenode) = path.rsplit("/", 4)
-            segid = segid.replace(self.seg_prefix, "")
-            if filenode=='PG_VERSION':
-                new_path = os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, filenode)
-                # need to skip if segid !=0
-                if segid!='0':
-                    return
-            else:
-                if '.' not in filenode:
-                    relfile = '0'
-                    relation = filenode
-                    if(segid=='0'):
-                        new_path =  os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, relation)
-                        self.hdfs_cmd.run_hdfs_mkdir(new_path, True)
-                    # don't need to copy this relfile because it is always 0 size
-                    return
-                else:
-                    (relation, relfile) = filenode.rsplit(".", 1)
-
-                segnode_num = len(self.segments)
-                fid = (int(segid)+1)+segnode_num*(int(relfile)-1)
-                new_path = os.path.join(filespace, NEW_FS_POSTFIX, tablespace, dbid, relation, str(fid))
-
-            self.hdfs_cmd.run_hdfs_mv(path, new_path)
-
-        def transform_hdfs_path(path):
-            # if the hdfs snapshot already exists, rename it
-            ss_path =  os.path.join(path, '.snapshot', HDFS_SS)
-            if self.hdfs_cmd.run_hdfs_test(ss_path, '-e')==0:
-                self.hdfs_cmd.run_hdfs_rename_snapshot(path, HDFS_SS, ss_oldname)
-            # create a hdfs snapshot before applying change
-            result = self.hdfs_cmd.run_hdfs_create_snapshot(path, HDFS_SS)
-            # now we can change hdfs
-            result = self.hdfs_cmd.run_hdfs_ls(path, True)
-            for dir, path1 in result:
-                if dir!='d':
-                    rename_hdfs_relfile(path1)
-
-        def revert_hdfs_relfile(path):
-            # revert the transformation old path in hawq1 to the new path for hawq2.0
-            (others, filenode) =  path.rsplit("/", 1)
-            if filenode=='PG_VERSION':
-                (filespace, postfix, tablespace, dbid) = others.rsplit("/", 3)
-                new_path = os.path.join(filespace, '%s0'%self.seg_prefix, tablespace, dbid, filenode)
-            else:
-                (filespace, postfix, tablespace, dbid, relnode) = others.rsplit("/", 4)
-                segnode_num = len(self.segments)
-                fid = int((int(filenode)-1)/segnode_num)+1
-                sid = (int(filenode)-1) % segnode_num
-                new_path = os.path.join(filespace, '%s%i'%(self.seg_prefix, sid), tablespace, dbid, '%s.%i'%(relnode, fid))
-
-            self.hdfs_cmd.run_hdfs_mv(path, new_path)
-
-        def revert_hdfs_fs(fs):
-            # because restore snapshot which calls 'hdfs dfs -cp -p'
-            # will report error in case that hawq need the hdfs config
-            # 'dfs.namenode.accesstime.precision' set to invalid
-            # So we need to using 'hdfs dfs -mv' from the new filspace path
-            #self.hdfs_cmd.run_hdfs_restore_snapshot(path, HDFS_SS)
-            result = self.hdfs_cmd.run_hdfs_ls(os.path.join(fs, NEW_FS_POSTFIX), True)
-            for dir, path1 in result:
-                if dir!='d':
-                    revert_hdfs_relfile(path1)
-            self.hdfs_cmd.run_hdfs_rm(os.path.join(fs, NEW_FS_POSTFIX),True)
-
-        def transform_hdfs_fs(fs, isRevert):
-            if isRevert:
-                revert_hdfs_fs(fs)
-            else:
-                result1 = self.hdfs_cmd.run_hdfs_ls(fs, False)
-                result2 = self.hdfs_cmd.parse_command_output(result1, [[0, '^(d)$', 1], [1, '^(.*/%s\d+)$'%self.seg_prefix, 1]])
-                for dir, path in result2:
-                    if dir=='d':
-                        transform_hdfs_path(path)
-
-        # Beginning of the outer func
-        # Get hdfs path from all filespaces, here remove the postfix 'gpseg*'
-        dfs_fs = self.Select("SELECT DISTINCT regexp_replace(fselocation, E'/%s\\\\d+','') FROM pg_filespace_entry WHERE fselocation LIKE 'hdfs%%'" % self.seg_prefix, forceutility=True)
-
-        if not dfs_fs:
-            logger.fatal("Can't get filespace's hdfs location from catalog pg_filespace_entry.")
-            raise UpgradeError("Can't get filespace's hdfs location from catalog pg_filespace_entry.")
-        # force to leave safe mode because it is frequently occurs and make hdfs not writable.
-        self.hdfs_cmd.run_hdfs_leave_safemode()
-        # rename old snapshot appending with timestamp
-        ss_oldname = HDFS_SS + dtdatetime.now().strftime('%Y%m%d_%H%M%S');
-        # get the new file space postfix, doesn't overwrite the existing same dir
-        global NEW_FS_POSTFIX
-        i = 0
-        if self.hdfs_cmd.run_hdfs_test(os.path.join(dfs_fs[0],NEW_FS_POSTFIX), '-e')==0:
-            i = 1
-            while self.hdfs_cmd.run_hdfs_test(os.path.join(dfs_fs[0],NEW_FS_POSTFIX+str(i)), '-e')==0:
-                i += 1
-        # get the existing dir number i-1
-        if isRevert:
-            i = i-1;
-        if i<0:
-            logger.fatal("Can't get filespace postfix for %s." % dfs_fs[0])
-            exit(1)
-        if i>0:
-            NEW_FS_POSTFIX = NEW_FS_POSTFIX + str(i)
-
-        for fs in dfs_fs:
-            if not isRevert:
-                self.hdfs_cmd.run_hdfs_mkdir(os.path.join(fs, NEW_FS_POSTFIX), True)
-            transform_hdfs_fs(fs, isRevert)
-
-    #------------------------------------------------------------
-    def Revert(self):
-        '''
-        Revert to backup
-        '''
-
-        # Check for indication that something is running:
-        if not self.dbup:
-            env = None
-            try:
-                env = self.CheckUp()
-            except UpgradeError:
-                pass
-
-            # We use the catalog versions we cached during versionCheck to
-            # determine if the MASTER_DATA_DIRECTORY is actually an old
-            # master data directory, or a partially upgraded one.
-            #
-            # Note: it would be better to adjust gpstop so that it knows how
-            # to stop old versions and always shutdown using a new environment.
-            #
-            # Note: It would also be good to avoid starting and stopping more
-            # than necessary here.
-            if env == self.oldenv and self.datacat == self.newcat:
-                self.newenv['MASTER_DATA_DIRECTORY'] = \
-                    self.oldenv['MASTER_DATA_DIRECTORY']
-                env = self.newenv
-
-            if env:
-                self.dbup = [env, False, False]
-
-        # Now that we've determined what, if anything, is running, we must
-        # stop it.
-        try:
-            if self.faultinjection == -1:
-                raise UpgradeError("faultinjection=%d" % self.faultinjection)
-            self.Shutdown()
-            self.CheckDown(self.revert)
-        except BaseException, e:
-            logger.fatal('***************************************')
-            logger.fatal(str(e))
-            logger.fatal('***************************************')
-            raise UpgradeError("Unable to REVERT\nShutdown the running " +
-                               "database and rerun gpmigrator with '-R'")
-
-        logger.info('Reverting to Backup')
-        self.revert = True
-
-        statefile = os.path.join(self.workdir, 'state')
-        if not os.path.exists(statefile):
-            logger.warn("  '%s' not found" % statefile)
-            logger.info('Revert aborted')
-            sys.exit(0)
-        state = self.RunCmd('cat ' + statefile)
-
-        needrevert = False
-        needRevertHDFS = False
-        for line in state.split('\n'):
-            if line == 'BACKUP_VALID':
-                needrevert = True
-            if line == 'TRANSFORMHDFS':
-                needRevertHDFS = True
-
-        hba   = os.path.join(self.masterdir, 'pg_hba.conf'+LOCKEXT)
-        ident = os.path.join(self.masterdir, 'pg_ident.conf'+LOCKEXT)
-        if os.path.exists(hba) or os.path.exists(ident):
-            needunlock = True
-        else:
-            needunlock = False
-
-        etc_d = os.path.join(self.newhome, 'etc')
-        hawq_site_orig = os.path.join(etc_d, 'hawq-site.xml'+LOCKEXT)
-        hawq_sla_orig = os.path.join(etc_d, 'slaves'+LOCKEXT)
-        if os.path.exists(hawq_site_orig) or os.path.exists(hawq_sla_orig):
-            needRevertHawqSiteCfg= True
-        else:
-            needRevertHawqSiteCfg = False
-
-        if (needrevert or needunlock):
-            try:
-                self.ReadInfo()
-            except UpgradeError, e:
-                logger.fatal(str(e))
-                logger.info('Revert aborted')
-                sys.exit(1)
-
-        # Attempt revert
-        if needrevert:
-            logger.info('Restoring directories')
-            self.DeepLink('REVERT')
-
-        # Truncate the statefile
-        # + Must occur AFTER actual Revert is processed
-        open(statefile, 'w').truncate()
-
-        if needunlock:
-            self.ReleaseLockdown(self.oldenv)
-        if needRevertHawqSiteCfg:
-            self.Copy_hawq_config(True)
-
-        self.Startup(self.oldenv)
-
-        if needRevertHDFS:
-            logger.info('Revert HDFS changes')
-            self.TransformHDFS(True)
-
-        logger.info('Removing upgrade user')
-        self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER)
-        self.Shutdown()
-
-        # done
-        logger.info('Revert Successful')
-        self.revert = False
-
-
-    #------------------------------------------------------------
-    def CheckVersions(self):
-        '''
-        Validates that the upgrade from old->new is okay
-        '''
-
-        # Log the OS type info
-        os_type = os.uname()[0]
-        os_ver  = os.uname()[2]
-        logger.info('Operating System: %s %s' % (os_type, os_ver))
-
-        # Log version checking
-        logger.info('Checking version compatibility')
-
-        # If we got a version, but it isn't recognized by the GpVersion class
-        # then it is likely not a Greenplum database.
-        oldversion = self.getversion(self.oldhome, self.oldenv)
-        newversion = self.getversion(self.newhome, self.newenv)
-        try:
-            oldversion = GpVersion(oldversion)
-        except:
-            raise UpgradeError('Source not a HAWQ instance: ' + oldversion)
-        try:
-            newversion = GpVersion(newversion)
-        except:
-            raise UpgradeError('Target not a HAWQ instance: ' + newversion)
-
-        logger.info('Source Version: HAWQ %s' % str(oldversion))
-        logger.info('Target Version: HAWQ %s' % str(newversion))
-
-        self.oldversion = oldversion
-        self.newversion = newversion
-        self.upgrade = (newversion >= oldversion)
-
-        is_supported_version(oldversion, self.upgrade)
-        is_supported_version(newversion, self.upgrade)
-
-        if newversion == oldversion:
-            raise UpgradeError("HAWQ is already version '%s'"
-                               % str(newversion))
-        elif newversion.isVersionRelease(oldversion):
-            raise UpgradeError("Upgrade not needed to go from version '%s' to version '%s'"
-                               % (str(oldversion), str(newversion)))
-
-        # We don't support downgrade
-        if not self.upgrade:
-            raise UpgradeError("Downgrade to HAWQ %s not supported from gpmigrator"
-                               % str(newversion))
-
-        if self.upgrade and not newversion.isVersionCurrentRelease():
-            main = GpVersion('main')
-            raise UpgradeError(
-                "Upgrade from '%s' to '%s' not supported.  Target version should be HAWQ %s"
-                % (str(oldversion), str(newversion), main.getVersionRelease()))
-
-        logger.debug('Using %s method for migration' % self.method)
-
-        # Compare old version catalog number with the catalog in
-        # MASTER_DATA_DIRECTORY
-        catalog_re = re.compile('Catalog version number: *(.*)')
-        oldcat = self.RunCmd('postgres --catalog-version', env=self.oldenv)
-        logger.info("Source %s" % oldcat)
-        newcat = self.RunCmd('postgres --catalog-version', env=self.newenv)
-        logger.info("Target %s" % newcat)
-
-        # May need this info later
-        self.oldcat = oldcat
-        self.newcat = newcat
-
-        control = self.RunCmd('pg_controldata ' + self.masterdir, env=self.oldenv)
-        for line in control.split('\n'):
-            m = catalog_re.match(line)
-            if m:
-                self.datacat = line
-                logger.info("Data   %s " % line)
-                if ((not self.revert and line != oldcat) or
-                    (self.revert and line != oldcat and line != newcat)):
-                    logger.debug('catalog mismatch: expected %s, found %s'
-                             % (oldcat, line))
-                    msg = 'Catalog in %s does not match source binary' % self.masterdir
-                    raise UpgradeError(msg)
-                break
-
-        # For the moment everything goes:
-        #   - Additional checks will be made once the old database is up
-        logger.info('Versions are compatible')
-
-
-    #------------------------------------------------------------
-    def RemoveDirectories(self, force=False):
-        '''
-        Removes upgrade/ and backup/ directories on all segments
-        '''
-
-        # If force is set then we remove the directories regardless of
-        # whether we created them or they were created by another process.
-        # If force is not set then we will throw an exception when we are
-        # done if there were directories that we failed to remove.
-        doerror = False
-        if self.cmd == 'MASTER':
-            logger.info('Removing temporary directories')
-            self.CallSlaves('RMDIR', force, includeMirrors=True)
-            fsdirs = self.masterfilespace
-        else:
-            fsdirs = self.filespaces
-
-        # Build up a unique set of workdirectories on this host
-        locations = set()
-        for d in fsdirs:
-            (location, _) = os.path.split(d)
-            locations.add(os.path.join(location, WORKDIR))
-
-        for d in filter(os.path.exists, locations):
-            flag = os.path.join(d, 'flag')
-            if (not force) and os.path.exists(flag):
-                try:
-                    dpid = self.RunCmd('cat ' + flag)
-                    dpid = int(dpid)
-                except Exception, e:
-                    logger.warn('Read error on file:  ' + str(e))
-                    doerror = True
-                    continue
-
-                if dpid != self.pid:
-                    logger.warn('Directory %s owned by pid %s'
-                             % (d, dpid))
-                    doerror = True
-                    continue
-
-            # Regardless of the setting of force, if there is an active
-            # process running, we won't remove the directory.
-            running = self.RunCmd('find %s -name postmaster.pid -print' % d)
-            if (running):
-                logger.info(running)
-                logger.fatal('Active processes running')
-                doerror = True
-            else:
-                self.RunCmd('rm -rf ' + d)
-
-        # We hold off throwing the error until the end, because we want to make
-        # sure we removed all of our own directories.
-        if doerror:
-            raise UpgradeError('Failed to remove old upgrade directories')
-
-    #------------------------------------------------------------
-    def CreateDirectories(self):
-        '''
-        Creates the upgrade/ and backup/ directories on all segments
-        '''
-
-        if self.cmd == 'MASTER':
-
-            dirs_exist = False
-            try:
-                self.CheckDirectories()
-                self.CallSlaves('CHKDIR', includeMirrors=True)
-            except UpgradeError, e:
-                dirs_exist = True
-            if dirs_exist:
-                self.RemoveDirectories(force=True)
-
-
-            logger.info('Creating temporary directories')
-
-            # Have the slaves create their directories
-            self.CallSlaves('MKDIR', includeMirrors=True)
-
-            fsdirs = self.masterfilespace
-
-        else:
-            fsdirs = self.filespaces
-            if self.cmd != 'MKDIR':
-                raise Exception(self.cmd + ' != MKDIR]')
-
-        # Given all the actual segment datadirs we need to look one
-        # directory up for the datadir locations and create the gpmigrator
-        # subdirectory there.
-        locations = set()
-        for d in fsdirs:
-            if not os.path.exists(d):
-                raise UpgradeError('Missing Data Directory: %s' % d)
-            (location, _) = os.path.split(d)
-            workpath = os.path.join(location, WORKDIR)
-            locations.add(workpath)
-
-        # Actually create the directory, none of the should exist yet!
-        for d in locations:
-            # Creating directories is an atomic operation, if mkdir
-            # succeeds then it did not exist previously making it ours
-            # This prevents mulitiple upgrades running simultaneously.
-            os.mkdir(d)
-            self.RunCmd('chmod 700 %s' % d)
-
-            # Claim it in the name of our parent process.  We make use of
-            # this flag during tear down to confirm that we created it.
-            flag = open(os.path.join(d, 'flag'), 'w')
-            flag.write('%s\n' % str(self.pid))
-            flag.close()
-
-            # Create subdirectories for /upgrade and /backup
-            upgradepath = os.path.join(d, UPGRADEDIR)
-            os.mkdir(upgradepath)
-            self.RunCmd('chmod 700 %s' % upgradepath)
-
-            backuppath = os.path.join(d, BACKUPDIR)
-            os.mkdir(backuppath)
-            self.RunCmd('chmod 700 %s' % backuppath)
-
-        if (self.cmd == 'MASTER'):
-            # Finally now that we have those directories we'll make a state file
-            self.state = open('%s/state' % self.workdir, 'w')
-
-    #------------------------------------------------------------
-    def GenerateConfigFiles(self):
-        '''
-        Creates 'gp_databases' 'gp_array_config' and 'gp_config' files
-        '''
-
-        logger.info('Generating config files')
-
-        # Write a dump of the databases hash
-        db_file = os.path.join(self.workdir, 'gp_databases')
-        o = open(db_file, 'w')
-        for (oid, name) in self.dbs.iteritems():
-            o.write('%s:%s\n' % (oid, name))
-        o.close()
-
-        # Write a dump of the gparray object
-        array_file = os.path.join(self.workdir, 'gp_array_config')
-        self.array.dumpToFile(array_file)
-
-        # Write our new gp_config file, this contains the configuration
-        # information not captured in gp_array_config.
-        config_file = os.path.join(self.workdir, 'gp_config')
-        o = open(config_file, 'w')
-        for item, value in self.config.iteritems():
-            if type(value) == int:
-                o.write('%s=%d\n' % (str(item), value))
-            else:
-                o.write('%s=%s\n' % (str(item), str(value)))
-        o.close()
-        logger.info('Configuration files generated')
-
-
-    #------------------------------------------------------------
-    def ReadInfo(self):
-        '''
-        When restarting an upgrade this method reads the configuration
-        information from the cached status files.
-
-        It corresponds directly with ExtractInfo() which gathers the same
-        information from an active database.
-
-        It is also the inverse of GenerateConfigFiles() which creates the files
-        that this function reads.
-
-        In normal processing usually ExtractInfo() is called rather than
-        ReadInfo().  This function is used /instead/ when we are doing Revert()
-        processing, or when resuming an upgrade.  E.g. cases when we do not
-        want to start the database to get the information we need because the
-        database may not be in a stable state.
-        '''
-
-        # self.array - configuration information
-        array_config = os.path.join(self.workdir, 'gp_array_config')
-        if not os.path.exists(array_config):
-            raise UpgradeError("  '%s' not found" % array_config)
-        self.array = GpArray.initFromFile(array_config)
-
-        # self.dbs - database information
-        db_file = os.path.join(self.workdir, 'gp_databases')
-        if not os.path.exists(db_file):
-            raise UpgradeError("  '%s' not found" % db_file)
-        f = open(db_file, 'r')
-        for line in f:
-            (oid, name) = line.split(':', 1)
-            self.dbs[oid] = name.strip()
-        f.close()
-
-        # self.hostcache - cache of hostnames
-        self.hostcache = GpHostCache(self.array, self.pool)
-        failed_pings = self.hostcache.ping_hosts(self.pool)
-        if len(failed_pings) > 0:
-            raise UpgradeError(
-                "Cannot upgrade while there are unreachable hosts")
-
-        # Setup the master filespace dir
-        self.masterfilespace = []
-        for (id, loc) in self.array.master.getSegmentFilespaces().iteritems():
-            self.masterfilespace.append(loc)
-
-        # self.primaries and self.mirrors
-        dbs = self.array.getSegDbList()
-        self.segments = filter(GpDB.isSegmentPrimary, dbs)
-        self.mirrors  = filter(GpDB.isSegmentMirror, dbs)
-
-        # We don't support mirror any more in this script.
-        if (self.mirrors):
-            raise UpgradeError(
-                "Please use gpmigrator_mirror to upgrade a cluster with mirroring")
-
-        # self.config
-        config_file = os.path.join(self.workdir, 'gp_config')
-        if not os.path.exists(config_file):
-            raise UpgradeError("  '%s' not found" % config_file)
-        f = open(config_file, 'r')
-        for line in f:
-            (key, value) = line.split('=', 1)
-            self.config[key] = value
-        f.close()
-
-
-    #------------------------------------------------------------
-    def ExtractInfo(self):
-        '''
-        Get all the information we need from the old instance
-        '''
-
-        # Can only extract with a running database
-        env = self.CheckUp()
-
-        # -- Get the array information
-        logger.info('Extracting configuration information')
-        port  = self.masterport
-        user  = env['USER']
-        url   = dbconn.DbURL(port=port, dbname='template0', username=user)
-        array = GpArray.initFromCatalog(url, utility=True)
-        self.array = array
-
-        # -- Determine if there are any invalid segments
-        logger.info('Checking that all segments are valid')
-        invalid = array.get_invalid_segdbs()
-        if len(invalid) > 0:
-            for db in invalid:
-                logger.fatal('INVALID SEGMENT: %s:/%s' %
-                             (db.getSegmentHostName(),
-                              db.getSegmentDataDirectory()))
-            raise UpgradeError('Cannot upgrade database with invalid segments')
-
-
-        # -- Get a list of available databases, note this includes "template0"!
-        logger.info('Creating list of databases')
-        databases = self.Select("SELECT oid, datname from pg_database")
-        for row in databases:
-            self.dbs[str(row['oid'])] = row['datname']
-
-        # -- Look for an existing user named MIGRATIONUSER
-        logger.info('Checking gpmigrator user')
-        upgradeuser = self.Select(
-            "SELECT count(*)::numeric FROM pg_user " +
-            "WHERE usename = '%s'" % MIGRATIONUSER)
-        if upgradeuser[0] > 0:
-            logger.warn("Database user '%s' already exists" % MIGRATIONUSER)
-            self.Update('DROP USER ' + MIGRATIONUSER)
-
-        if (not self.upgrade):
-            raise UpgradeError("Downgrade not supported")
-
-        # -- Older releases didn't have hostname as part of configuration
-        # make sure we have an acurate list of address->host lookups.
-        logger.info('Validating hosts')
-        self.hostcache = GpHostCache(self.array, self.pool)
-        for host in self.hostcache.get_hosts():
-            for seg in host.dbs:
-                seg.setSegmentHostName(host.hostname)
-        failed_pings = self.hostcache.ping_hosts(self.pool)
-        if len(failed_pings) > 0:
-            raise UpgradeError("Cannot upgrade while there are unreachable "
-                               "hosts")
-
-        # Setup the master filespace dirs
-        self.masterfilespace = []
-        for (id, loc) in self.array.master.getSegmentFilespaces().iteritems():
-            self.masterfilespace.append(loc)
-
-        # get seg prefix from filespace last part
-        (others, last_part) = self.masterfilespace[0].rsplit("/", 1)
-        if last_part[-2:]:
-            self.seg_prefix = last_part[:-2]
-
-        master = array.master
-        standbymaster = array.standbyMaster
-
-        # Oddly gparray doesn't have methods for fetching primary segments
-        # specifically
-        dbs = array.getSegDbList()
-        self.segments = filter(GpDB.isSegmentPrimary, dbs)
-        self.mirrors  = filter(GpDB.isSegmentMirror, dbs)
-
-        # We don't support mirror any more in this script.
-        if (self.mirrors):
-            raise UpgradeError(
-                "Please use gpmigrator_mirror to upgrade a cluster with mirroring")
-
-        # Internal sanity checking
-        if len(self.segments) == 0:
-            raise UpgradeError('No segments found')
-        if standbymaster > 0:
-            raise UpgradeError('Cannot upgrade while standbymaster is running')
-
-
-        # To upgrade mirrors from 3.3 format mirrors to 4.0 format mirrors we
-        # must have been provided with the replication ports.
-        if len(self.mirrors) > 0:
-            if (not self.config['REPLICATION_PORT_BASE'] or
-                not self.config['MIRROR_REPLICATION_PORT_BASE'] or
-                not self.mirrormode):
-                logger.fatal("System configured with mirrors:")
-                if not self.config['REPLICATION_PORT_BASE']:
-                    logger.fatal("  missing option:  "
-                                 "--replication_port_base=<port>")
-                if not self.config['MIRROR_REPLICATION_PORT_BASE']:
-                    logger.fatal("  missing option:  "
-                                 "--mirror_replication_port_base=<port>")
-                if not self.mirrormode:
-                    logger.fatal("  missing option:  --mirror-mode")
-                raise UpgradeError("Missing required upgrade options")
-
-            if self.mirrormode not in ('redundant', 'single'):
-                logger.fatal("Unrecognised mirror-mode: %s" % self.mirrormode)
-                logger.fatal("Valid modes are: redundant, single")
-                raise UpgradeError("Invalid mirror-mode")
-
-            # Setup the replication ports for all segments in the array by host
-            for host in self.hostcache.get_hosts():
-
-                # The port ranges we are assigning into
-                p_port = self.config['REPLICATION_PORT_BASE'] + 1
-                m_port = self.config['MIRROR_REPLICATION_PORT_BASE'] + 1
-
-                # Used to determine overlapping port ranges
-                pri_port     = [65535, 0]  # [min, max]
-                mir_port     = [65535, 0]
-                pri_rep_port = [65535, 0]
-                mir_rep_port = [65535, 0]
-
-                for seg in host.dbs:
-                    port = seg.getSegmentPort()
-                    if seg.isSegmentPrimary():
-                        seg.setSegmentReplicationPort(p_port)
-                        pri_port[0]     = min(pri_port[0], port)
-                        pri_port[1]     = max(pri_port[1], port)
-                        pri_rep_port[0] = min(pri_rep_port[0], p_port)
-                        pri_rep_port[1] = max(pri_rep_port[1], p_port)
-                        p_port += 1
-                    else:
-                        seg.setSegmentReplicationPort(m_port)
-                        mir_port[0]     = min(mir_port[0], port)
-                        mir_port[1]     = max(mir_port[1], port)
-                        mir_rep_port[0] = min(mir_rep_port[0], m_port)
-                        mir_rep_port[1] = max(mir_rep_port[1], m_port)
-                        m_port += 1
-
-                # Check for overlapping port ranges:
-                port_mat = [pri_port, mir_port, pri_rep_port, mir_rep_port]
-                label_mat = ["Primary", "Mirror", "Replication",
-                             "Mirror-Replication"]
-
-                for x in range(0, 4):
-                    for y in range(x+1, 4):
-                        if ((port_mat[x][0] >= port_mat[y][0] and
-                             port_mat[x][0] <= port_mat[y][1]) or
-                            (port_mat[x][1] >= port_mat[y][0] and
-                             port_mat[x][1] <= port_mat[y][1])):
-                            logger.error("Port collision on host %s" %
-                                         host.hostname)
-                            logger.error("... %s port range: [%d-%d]" %
-                                         (label_mat[x],
-                                          port_mat[x][0],
-                                          port_mat[x][1]))
-                            logger.error("... %s port range: [%d-%d]" %
-                                         (label_mat[y],
-                                          port_mat[y][0],
-                                          port_mat[y][1]))
-                            raise UpgradeError("Port collision on host %s" %
-                                               host.hostname)
-
-        # Check for unsupported index types.
-        found = False
-        logger.info("Validating indexes")
-        oids = sorted(self.dbs.keys())
-        for dboid in oids:
-            db = self.dbs[dboid]
-
-            if db == "template0":
-                continue
-            indexes = self.Select('''
-               SELECT quote_ident(n.nspname) || '.' ||
-                      quote_ident(c.relname) as index,
-                      m.amname as kind
-               FROM   pg_class c
-                 join pg_namespace n on (c.relnamespace = n.oid)
-                 join pg_am m on (c.relam = m.oid)
-               WHERE  c.relkind = 'i' and m.amname in ('gin', 'hash')
-            ''', db=db)
-            if len(indexes) > 0:
-                if found == False:
-                    logger.fatal("Unable to upgrade the following indexes")
-                    found = True
-                logger.fatal("  Database: %s" % db)
-                for row in indexes:
-                    logger.fatal("     %s  [%s]" % (row['index'], row['kind']))
-        if found:
-            raise UpgradeError("Deprecated index types must be removed prior "
-                               "to upgrade")
-
-        # Check for unsupported AOCS table.
-        found = False
-        logger.info("Validating no AOCS table")
-        oids = sorted(self.dbs.keys())
-        for dboid in oids:
-            db = self.dbs[dboid]
-
-            #if db == "template0":
-                #continue
-            aocs_tbl = self.Select('''
-                    SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) as table
-                    FROM   pg_class c join pg_namespace n on (c.relnamespace = n.oid)
-                    WHERE  c.relkind = 'r' and c.relstorage='c'
-                ''', db=db)
-            if len(aocs_tbl) > 0:
-                if found == False:
-                    logger.fatal("Unable to upgrade the following AOCS tables")
-                    found = True
-                logger.fatal("  Database: %s" % db)
-                for row in aocs_tbl:
-                    logger.fatal("     %s  " % (row['table']))
-        if found:
-            raise UpgradeError("Deprecated AOCS tables must be converted to AO or Parquet table "
-                               "prior to upgrade.")
-
-        # Check for SQL_ASCII database encoding
-        logger.info("Validating database encoding")
-        encoding = self.Select("SELECT datname FROM pg_database WHERE encoding=0")
-        if len(encoding) > 0:
-            logger.error("Deprecated database encodings found:")
-            for datname in encoding:
-                logger.error("    %s  [SQL_ASCII]" % datname)
-            raise UpgradeError("Deprecated database encodings found - contact "
-                               "Greenplum Support")
-
-        # ARRAY_NAME
-        arrayname = self.Select('SELECT gpname FROM gp_id')
-        self.config['ARRAY_NAME'] = arrayname[0]
-
-        # Determine PORT_BASE, (min of segments - 1)
-        port_base = array.get_min_primary_port() - 1
-        self.config['PORT_BASE'] = port_base
-
-        # Determine MIRROR_PORT_BASE,
-        if len(self.mirrors) > 0:
-            mirror_port_base = array.get_min_mirror_port() - 1
-            self.config['MIRROR_PORT_BASE'] = mirror_port_base
-
-        # MASTER_HOSTNAME, super easy
-        self.config['MASTER_HOSTNAME'] = master.getSegmentHostName()
-
-        # MASTER_DIRECTORY, also easy - we already set it up
-        self.config['MASTER_DIRECTORY'] = self.newmaster
-
-        # MASTER_PORT, super easy
-        self.config['MASTER_PORT'] = master.getSegmentPort()
-
-        # TRUSTED_SHELL, set manually since there's only one supported value
-        self.config['TRUSTED_SHELL'] = 'ssh'
-
-        # CHECK_POINT_SEGMENTS
-        self.config['CHECKPOINT_SEGMENTS'] = \
-            self.Select('show checkpoint_segments')[0]
-
-        # ENCODING
-        self.config['CLIENT_ENCODING'] = \
-            self.Select('show client_encoding')[0]
-
-        # LOCALE
-        self.config['LOCALE'] = \
-            self.Select('show lc_ctype')[0]
-
-        # MAX_CONNECTIONS
-        self.config['MAX_CONNECTIONS'] = \
-            self.Select('show max_connections')[0]
-
-        # Add some config needed for hawq2.0
-        dfs_url = self.Select("SELECT regexp_replace(fselocation , E'/[^/]*$', '/') \
-            FROM pg_filespace, pg_filespace_entry \
-            WHERE fsefsoid=pg_filespace.oid AND fselocation LIKE 'hdfs%0' AND fsname='dfs_system'")
-        if not dfs_url[0]:
-            logger.fatal("Can't get DFS_URL from catalog.")
-            raise UpgradeError("Can't get DFS_URL from catalog.")
-        self.config['DFS_URL'] = dfs_url[0];
-
-        #check dfs_url exists
-        if self.hdfs_cmd.run_hdfs_test(dfs_url[0], '-e')!=0:
-            logger.fatal("hdfs path \"%s\" doesn't exists!"%dfs_url[0])
-            raise UpgradeError("hdfs path \"%s\" doesn't exists!"%dfs_url[0])
-
-        self.config['TEMP_DIR'] = '';
-        temp_dir_file = os.path.join(self.masterdir, 'gp_temporary_files_directories')
-        if os.path.exists(temp_dir_file):
-            temp_dirs = self.RunCmd('cat ' + temp_dir_file)
-            isFirst = True
-            for line in temp_dirs.split('\n'):
-                if(isFirst):
-                    self.config['TEMP_DIR'] += line
-                    isFirst = False
-                else:
-                    self.config['TEMP_DIR'] += ',' + line
-
-
-        logger.info('Configuration acquired')
-
-
-    def copy_rewrite_hba(self, frm, to, upgrdfmt):
-        '''
-        Between 3.2 and 3.3 we have a change in pg_hba.conf. The differences
-        are:
-        - "ident sameuser" becomes "ident"
-        - "ident <map name>" becomes "ident map=<map name>"
-        - "ldap "ldap url" becomes "ldap ldapserver=..."
-        - "pam servicename" becomes "pam pamservice="servicename""
-        '''
-        f = open(frm, 'r')
-        t = open(to, 'w')
-        logger.debug('writing %s to %s for upgrade = %s'
-                 % (frm, to, str(upgrdfmt)))
-        for line in f:
-            # translate to new pg_hba.conf format
-            if upgrdfmt:
-
-                # translate from 3.2 format => 3.3 format:
-                if self.oldversion < '3.3.0.0' and self.newversion >= '3.3.0.0':
-                    # ident
-                    line = re.compile('ident sameuser$').sub('ident', line)
-                    line = re.compile('ident (?!map=)(.*)$').sub('ident map=\\1', line)
-                    # pam
-                    line = re.compile('pam (?!pamservice=)(.*)$').sub('pam pamservice="\\1"', line)
-                    # ldap is a little more complex
-                    # our objective is to parse the following:
-                    # ldap[s]://<server>[:<port>]/<basedn>[;prefix[;suffix]]
-                    logger.debug("upgrading ldap entries")
-                    r1 = re.compile('ldap "?(ldap[s]?)://([^:/]+)(:\d+)?/([^"]*)"?')
-                    match = r1.search(line)
-                    if match:
-                        s = "ldapserver=" + match.group(2).strip()
-                        if match.group(3):
-                            s += " ldapport=" + str(match.group(3)[1:]).strip()
-                        if match.group(4):
-                            fixes = match.group(4).strip().split(';')
-                            suffix = ""
-                            prefix = ""
-                            basedn = ""
-                            if len(fixes) > 3:
-                                raise UpgradeError('cannot rebuild ldap auth ' +
-                                                   'entry: ' + line)
-                            if len(fixes) == 3:
-                                suffix = ' ldapsuffix="' + fixes[2] + '"'
-                            if len(fixes) >= 2:
-                                prefix = ' ldapprefix="' + fixes[1] + '"'
-                            if len(fixes) >= 1:
-                                basedn = "" # not used
-                            s += prefix + suffix
-                        if match.group(1) == "ldaps":
-                            s += " ldaptls=1"
-                        line = re.compile('ldap (.*)$').sub('ldap ' + s, line)
-
-            else:
-                # downgrade
-                if self.newversion < '3.3.0.0' and self.oldversion >= '3.3.0.0':
-                    line = re.compile('ident$').sub('ident sameuser', line)
-                    line = re.compile('ident map=(.*)$').sub('ident \\1', line)
-
-            # logger.debug('writing to %s line %s' % (to, line))
-            t.write(line)
-        f.close()
-        t.close()
-
-    def move_rewrite_hba(self, frm, to, upgrade):
-        self.copy_rewrite_hba(frm, to, upgrade)
-        os.unlink(frm)
-
-    #------------------------------------------------------------
-    def Copy_hawq_config(self, isRevert=False):
-        '''
-         Copy new hawq config (hawq-site.xml from template-hawq-site.xml & slaves) at new hawq2.0 $GPHOME/etc
-         If isRevert, it just revert the change with the backup
-        '''
-        etc_d = os.path.join(self.newhome, 'etc')
-
-        #copy hawq_site.xml file
-        hawq_site   = os.path.join(etc_d, 'hawq-site.xml')
-        hawq_site_orig = os.path.join(etc_d, 'hawq-site.xml'+LOCKEXT)
-        hawq_site_templ= os.path.join(etc_d, 'template-hawq-site.xml')
-
-        if not os.path.exists(hawq_site_templ):
-            if os.path.exists(hawq_site):
-                hawq_site_templ=hawq_site;
-            else:
-                raise UpgradeError("Cannot find the hawq-site.xml or template-hawq-site.xml in path: %", etc_d)
-
-        if isRevert:
-            if os.path.exists(hawq_site_orig):
-                self.RunCmd('cp -f %s %s' % (hawq_site_orig, hawq_site))
-        else:
-            if os.path.exists(hawq_site_templ):
-                self.RunCmd('cp -f %s %s' % (hawq_site, hawq_site_orig))
-                if hawq_site_templ != hawq_site:
-                    self.RunCmd('cp -f %s %s' % (hawq_site_templ, hawq_site))
-
-        # copy slaves file
-        hawq_sla = os.path.join(etc_d, 'slaves')
-        hawq_sla_orig = os.path.join(etc_d, 'slaves'+LOCKEXT)
-        if isRevert:
-            if os.path.exists(hawq_sla_orig):
-                self.RunCmd('cp -f %s %s' % (hawq_sla_orig, hawq_sla))
-        else:
-            if os.path.exists(hawq_sla):
-                self.RunCmd('cp -f %s %s' % (hawq_sla, hawq_sla_orig))
-
-    #------------------------------------------------------------
-    def update_xml_content(self, xmlfile_cont, property_name, property_value=''):
-        '''
-        input:
-            xmlfile_cont: is file content read from file.readlines()
-            property_value: when it is '', just remove this property setting.
-
-        Note that the xml file can not support format as: one line contains several xml tags, or one tag cross several lines
-        '''
-        
-        # first round to find if this property exists, then overwrite this line
-        line_id = 0
-        while(line_id<len(xmlfile_cont)):
-            line = xmlfile_cont[line_id]
-            m = re.match('\s*<name>%s' % property_name, line)
-            if m:
-                l1=line_id
-                line_id+=1;
-                if (line_id<len(xmlfile_cont)):
-                    next_line = xmlfile_cont[line_id]
-                    m2 = re.match('\s*<value>', next_line)
-                    if m2:
-                        l2 = line_id
-                        # if empty string, need to remove this property setting
-                        if(property_value==''): 
-                            # find '<property>' backwords
-                            l0 = l1
-                            while (l0>0):
-                                l0 -= 1;
-                                m3 = re.match('\s*<property>',  xmlfile_cont[l0])
-                                if m3:
-                                    l1=l0
-                                    break
-                            # find '</property>' forwards
-                            l0 = l2
-                            while (l0 < len(xmlfile_cont)-1):
-                                l0 += 1;
-                                m3 = re.match('\s*</property>',  xmlfile_cont[l0])
-                                if m3:
-                                    l2=l0
-                                    break
-                            # remove these lines
-                            for l3 in range(l2, l1-1, -1): #delete lines in reverse order without affect line_id
-                                del xmlfile_cont[l3]
-                            return
-                        # property exists, just change this property value
-                        p = re.compile('\s*<value>(.*)</value>')
-                        #p_next_line = p.match(next_line)
-                        p_value = p.match(next_line).group(1)
-                        next_line_new = re.sub(p_value, str(property_value), next_line)
-                        xmlfile_cont[line_id] = next_line_new
-                        return
-                # Error: property exist, value lost
-                log.fatal('Config file in xml format error.');
-                exit(1);
-            line_id+=1;
-            
-        # propert doesn't exist, then add this setting
-        for line_id in range(len(xmlfile_cont)-1, -1, -1):
-            line = xmlfile_cont[line_id]
-            m = re.match('\s*</configuration>', line)
-            if m:
-                xmlfile_cont.insert(line_id, '    <property>\n'); line_id += 1
-                xmlfile_cont.insert(line_id, '       <name>%s</name>\n' % property_name); line_id += 1
-                xmlfile_cont.insert(line_id, '       <value>%s</value>\n' % str(property_value)); line_id += 1
-                xmlfile_cont.insert(line_id, '    </property>\n'); line_id += 1
-                xmlfile_cont.insert(line_id, '\n'); line_id += 1  #blank line
-
-    def Change_hawq_config(self):
-        '''
-        Change options' values in hawq_site.xml according to old setting
-        '''
-
-        # because running 'hawq config -c xxx -v xxx' has problems here, just need to change xml config directly.
-        f1 = open("%s/etc/hawq-site.xml" % self.newhome)
-        file_cont = f1.readlines()
-
-        if self.config['DFS_URL']:
-            temp_dfs_url = self.config['DFS_URL'].strip()
-            if temp_dfs_url.startswith("hdfs://"):
-                temp_dfs_url = temp_dfs_url[7:]
-            self.update_xml_content(file_cont, 'hawq_dfs_url', temp_dfs_url)
-
-        if self.config['MASTER_HOSTNAME']:
-            self.update_xml_content(file_cont, 'hawq_master_address_host', self.config['MASTER_HOSTNAME'])
-        if self.config['MASTER_PORT']:
-            self.update_xml_content(file_cont, 'hawq_master_address_port', self.config['MASTER_PORT'])
-        if self.config['MASTER_DIRECTORY']:
-            self.update_xml_content(file_cont, 'hawq_master_directory', self.config['MASTER_DIRECTORY'])
-
-        # In hawq1.x, temp dir on master or segment are the same
-        if self.config['TEMP_DIR'] != '':
-            self.update_xml_content(file_cont, 'hawq_master_temp_directory', self.config['TEMP_DIR'])
-            self.update_xml_content(file_cont, 'hawq_segment_temp_directory', self.config['TEMP_DIR'])
-        else:
-            # set /tmp by default
-            self.update_xml_content(file_cont, 'hawq_master_temp_directory', '/tmp')
-            self.update_xml_content(file_cont, 'hawq_segment_temp_directory', '/tmp')
-
-        if self.array.standbyMaster:
-            self.update_xml_content(file_cont, 'hawq_standby_address_host', self.array.standbyMaster.hostname)
-        else:
-            self.update_xml_content(file_cont, 'hawq_standby_address_host')
-
-        if self.config['PORT_BASE']:
-            self.update_xml_content(file_cont, 'hawq_segment_address_port', self.config['PORT_BASE']+1)
-        # in hawq2, now all segments has the same data dir
-        # so here we just set it to the first segment data dir
-        if self.array.segments[0].primaryDB.datadir:
-            self.update_xml_content(file_cont, 'hawq_segment_directory', self.array.segments[0].primaryDB.datadir)
-
-        # yarn related is set to default value: yarn disabled
-        self.update_xml_content(file_cont, 'hawq_global_rm_type', 'none')
-        self.update_xml_content(file_cont, 'hawq_rm_yarn_address', 'localhost:9980')
-        self.update_xml_content(file_cont, 'hawq_rm_yarn_scheduler_address', 'localhost:9981')
-
-        # write buffer back to the xml file
-        f2 = open("%s/etc/hawq-site.xml" % self.newhome, 'w')
-        f2.writelines(file_cont);
-
-        # Add segment hosts into file slaves
-        slaves_str = ''
-        for seg in self.array.segments:
-            newseg = str(seg.primaryDB.hostname) + "\n"
-            # remove duplicate segment hosts
-            if newseg not in slaves_str:
-                slaves_str += newseg
-        slaves_file = os.path.join(self.newhome, 'etc', 'slaves')
-        file = open(slaves_file, 'w')
-        file.write(slaves_str)
-        file.close()
-
-    #------------------------------------------------------------
-    def SetupLockdown(self):
-        '''
-        Change pg_hba.conf to disallow access to everyone except the
-        upgrade user.
-        '''
-
-        os_type = os.uname()[0].lower()
-
-        if self.cmd == 'MASTER':
-            logger.info('Locking database')
-            self.CheckUp()
-            [env, utility, upgrade] = self.dbup;
-            if utility:
-                raise UpgradeError("Cannot lock database in utility mode")
-
-            logger.info('... Creating upgrade user')
-            self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER)
-            self.Update('CREATE USER %s with superuser' % MIGRATIONUSER)
-
-            logger.info('... Creating pg_hba.conf ')
-
-            # To write out a new pg_hba.conf we need to know all the ip
-            # addresses that the master identifies with.
-            #
-            # If we re-enable segment locking then this will need to be
-            # passed to the segments as well.
-            if os_type == 'sunos':
-                cmd = 'ifconfig -a inet'
-            else:
-                cmd = 'ifconfig'
-            ifconf = self.RunCmd(cmd)
-            ipv4_re = re.compile('inet (?:addr:)?(\d+\.\d+\.\d+\.\d+)')
-            ipv6_re = re.compile('inet6 (?:addr: )?([a-zA-Z0-9:\.]+[a-zA-Z0-9:])')
-            ip = []
-            for line in ifconf.split('\n'):
-                m = ipv4_re.search(line)
-                if m:
-                    ip.append(m.group(1))
-                m = ipv6_re.search(line)
-                if m:
-                    ip.append(m.group(1))
-
-            # Currently we only lock the MASTER node
-            #
-            # locking down the segments requires having gpstart
-            # pass the PGUSER to the segments otherwise the segments
-            # don't have the permissions to startup.
-            #
-            # This leaves a "hole" in the lockdown that still allows
-            # utility connections to the segments.
-            hba   = os.path.join(self.masterdir, 'pg_hba.conf')
-            ident = os.path.join(self.masterdir, 'pg_ident.conf')
-
-            if env == self.oldenv or not os.path.exists(hba + LOCKEXT):
-                # pre-upgrade database makes a backup of files and then
-                # writes a new one
-
-                # Solaris doesn't support ident authentication
-                if os_type != 'sunos':
-                    self.RunCmd('mv -f %s %s' % (ident, ident+LOCKEXT))
-                    file = open(ident, 'w')
-                    file.write('%s %s %s\n' %
-                               (MIGRATIONUSER, self.user, MIGRATIONUSER))
-                    file.write('%s %s %s\n' %
-                               (self.user, self.user, self.user))
-                    file.close()
-
-                self.RunCmd('mv -f %s %s' % (hba, hba+LOCKEXT))
-                file = open(hba, 'w')
-                if os_type == 'sunos':
-                    file.write('local all %s trust\n' % MIGRATIONUSER)
-                    file.write('local all %s trust\n' % self.user)
-                else:
-                    file.write('local all %s ident map=%s\n'
-                                % (MIGRATIONUSER, MIGRATIONUSER))
-                    file.write('local all %s ident map=%s\n'
-                                % (self.user, self.user))
-
-                for addr in ip:
-                    cidr_suffix = '/128' if ':' in addr else '/32'  # MPP-15889
-                    file.write('host all %s %s%s trust\n'
-                               % (MIGRATIONUSER, addr, cidr_suffix))
-                    file.write('host all %s %s%s trust\n'
-                               % (self.user, addr, cidr_suffix))
-
-                file.close()
-
-            if env == self.newenv:
-                # upgrade database just copies all locking information
-                # from the pre-upgrade database which should already be
-                # locked
-                dir = self.newmaster
-
-                self.copy_rewrite_hba(hba, os.path.join(dir, 'pg_hba.conf'),
-                                      self.upgrade)
-                self.RunCmd('cp -f %s %s' % (ident, dir))
-
-                # Grab the original versions to (if any)
-                if os.path.exists(hba + LOCKEXT):
-                    self.copy_rewrite_hba(hba+LOCKEXT,
-                                          os.path.join(dir,
-                                                       'pg_hba.conf'+LOCKEXT),
-                                          self.upgrade)
-                if os.path.exists(ident + LOCKEXT):
-                    self.RunCmd('cp -f %s %s' % (ident+LOCKEXT, dir))
-
-            # With the new lockfiles in place force the server to reload
-            logger.info('... Syncing')
-            logger.debug('calling gpstop -u with env: %s' % str(env))
-            self.RunCmd('gpstop -u -a -l %s' % self.logdir, env=env)
-            logger.info('Database Locked')
-
-            # Check if there are any other sessions connected.  Since we
-            # have modified the pg_hba.conf and run gpstop -u no new sessions
-            # will be allowed, but any that were already connected still
-            # need to be booted.
-            active = self.Select("SELECT count(*) FROM pg_stat_activity")
-            if active[0] > 1:
-                [env, utility, upgrade] = self.dbup
-                logger.info('Active sessions detected, restarting server')
-                self.Shutdown()
-                self.Startup(env, utility)
-
-
-    def CheckClusterReady(self, databases, port):
-        '''
-        Pre-flight checks
-        '''
-        for db in databases:
-            xact = self.Select("SELECT count(*)::numeric FROM " +
-                               "pg_prepared_xacts",
-                               db=db['dbname'], port=port);
-            if xact[0] > 0:
-                raise UpgradeError("Database contains prepared transactions");
-
-    #------------------------------------------------------------
-    def ReleaseLockdown(self, env):
-        '''
-        Revert pg_hba.conf to pre-lockdown state
-        '''
-
-        # Two main cases:
-        #   1) Release lockdown on old environment
-        #   2) Release lockdown on newly installed environment
-        # Both cases have the database installed under MASTER_DATA_DIRECTORY
-        # so we actually treat them exactly the same
-
-        if self.cmd == 'MASTER':
-
-            # nothing to do if we haven't setup an array object yet.
-            if not self.array:
-                return
-
-            logger.info('Unlocking database')
-
-            # See comment about disabling lockdown on segments in SetupLockdown()
-            # self.CallSlaves('UNLOCK')
-
-            datadirs = [self.masterdir]
-        else:
-            datadirs = self.datadirs
-
-        for dir in datadirs:
-            hba   = os.path.join(dir, 'pg_hba.conf')
-            ident = os.path.join(dir, 'pg_ident.conf')
-            if os.path.exists(hba + LOCKEXT):
-                forward = False
-                if self.upgrade and env == self.newenv:
-                    forward = True
-                elif not self.upgrade and env == self.oldenv:
-                    forward = True
-                self.move_rewrite_hba(hba+LOCKEXT, hba, forward)
-
-            if os.path.exists(ident + LOCKEXT):
-                self.RunCmd('mv -f %s %s' % (ident+LOCKEXT, ident))
-
-
-        # if we don't think the database is up, double check!
-        if not self.dbup:
-            env = None
-            try:
-                env = self.CheckUp()
-            except UpgradeError:
-                pass
-            if env:
-                self.dbup = [env, False, False]
-
-        # Re-source the conf files if the database is up.
-        if self.dbup:
-            if (env == self.oldenv):
-                self.RunCmd('gpstop -u -l %s' % self.logdir, env=env)
-
-                # MPP-10107 - The server is still running with PGUSER set in its
-                # environment, which is wrong since that user doesn't exist.
-                # Ideally that shouldn't matter, but unfortunately it does.  We
-                # deal with this by restarting the server.
-                #
-                # Note: the step above is still required because otherwise we
-                # can't even run this gpstop due to pg_hba.conf lock.
-                #
-                # A better solution would be to have gpstart make a constrained
-                # environment when it starts and stops the server, but that is
-                # more than a one-line change, and this solves this case with
-                # minimal impact to anything else.
-                self.RunCmd('gpstop -ral %s' % self.logdir, env=env)
-            else:
-                self.RunCmd('hawq stop cluster -a -u -l %s' % self.logdir, env=env)
-                self.RunCmd('hawq restart cluster -al %s' % self.logdir, env=env)
-
-        # We would love to actually delete the user, but it's not actually
-        # safe to do so here because we unlock the source database before we
-        # are done with the upgrade.
-        return
-
-
-    #------------------------------------------------------------
-    def ResetXLogs(self):
-        '''
-        Resets the xlog prior to recreating the schema
-        '''
-        if self.cmd == 'MASTER':
-            logger.info('Syncing XLogs')
-            datadirs = [self.masterdir]
-            self.CallSlaves('RESETXLOG')
-        else:
-            datadirs = self.datadirs
-
-        # We should always have at least one data directory...
-        if len(self.datadirs) < 1:
-            return
-
-        # Check the mapping in all data directories
-        for olddir in datadirs:
-            (d, seg) = os.path.split(olddir)
-            newdir = os.path.join(d, WORKDIR, UPGRADEDIR, seg)
-
-            # Links are in place, but data will not be visible until
-            # we sync the pg_control info
-            oldsync = self.RunCmd('pg_resetxlog -n ' + olddir,
-                                  env=self.oldenv)
-            newsync = self.RunCmd('pg_resetxlog -n ' + newdir,
-                                  env=self.newenv)
-
-            logger.debug('*****************************')
-            logger.debug(' - old xlog -')
-            logger.debug(oldsync)
-            logger.debug('*****************************')
-            logger.debug(' - new xlog -')
-            logger.debug(newsync)
-            logger.debug('*****************************')
-
-
-            # Nasty bit of code to turn the control data into a
-            # nice dict object
-            sync = {}
-            for line in oldsync.split('\n'):
-                if len(line) == 0: continue
-                (field, value) = line.split(':')
-                sync[field] = value.lstrip()
-
-            # Set the next transaction id in new control
-            x = sync["Latest checkpoint's NextXID"].find('/')
-            if x < 0:
-                nextxid = sync["Latest checkpoint's NextXID"]
-            else:
-                nextxid = sync["Latest checkpoint's NextXID"][x+1:]
-
-            self.RunCmd('pg_resetxlog -f -x %s %s' % (nextxid, newdir),
-                        env=self.newenv)
-
-            # Set the WAL archives
-            self.RunCmd('pg_resetxlog -l %s,%s,%s %s' %
-                        (sync["Latest checkpoint's TimeLineID"],
-                         sync["Current log file ID"],
-                         sync["Next log file segment"],
-                         newdir), env=self.newenv)
-
-            # Set next oid
-            self.RunCmd('pg_resetxlog -o %s %s' %
-                        (sync["Latest checkpoint's NextOID"],
-                         newdir), env=self.newenv)
-
-            # Set next multitransaction ID/offset
-            self.RunCmd('pg_resetxlog -m %s %s' %
-                        (sync["Latest checkpoint's NextMultiXactId"],
-                         newdir), env=self.newenv)
-            self.RunCmd('pg_resetxlog -O %s %s' %
-                        (sync["Latest checkpoint's NextMultiOffset"],
-                         newdir), env=self.newenv)
-
-            # Replace the transaction logs with the old ones
-            # since the old logs actually correspond to the data
-            for f in ['pg_clog',
-                      'pg_distributedlog',
-                      'pg_subtrans',
-                      'pg_multixact']:
-                oldlogs = os.path.join(olddir, f)
-                newlogs = os.path.join(newdir, f)
-
-                # the only one that wouldn't exist is pg_distributedlog
-                # and only if we were upgrading/downgrading to/from a
-                # version < 3.1.1.5
-                if (os.path.exists(newlogs)):
-                    self.RunCmd('rm -rf %s' % newlogs)
-                    if (os.path.exists(oldlogs)):
-                        self.RunCmd('cp -rf %s %s' % (oldlogs, newlogs))
-
-    #------------------------------------------------------------
-    def DeepLink(self, style):
-        '''
-        Makes a deep hard link copy of one directory into another
-        '''
-
-        if style == 'CLEANUP':
-            return
-
-        if style not in ['BACKUP', 'INSTALL', 'REVERT', 'SKELETON']:
-            raise UpgradeError('FATAL', ' DeepLink(%s)!?' % style)
-
-        if self.cmd == 'MASTER':
-            fsdirs = self.masterfilespace
-            if style == 'BACKUP':
-                logger.info('Linking backup')
-                logger.info('... This may take a while')
-                includeMirrors = True
-            elif style == 'INSTALL':
-                logger.info('Linking upgrade')
-                logger.info('... This may take a while')
-                includeMirrors = True
-            elif style == 'SKELETON':
-                logger.info('Linking skeleton')
-                logger.info('... This may take a while')
-                includeMirrors = False
-            else:
-                logger.info('Reverting to backup')
-                logger.info('... This may take a while')
-                includeMirrors = True
-
-
-            # Have all hosts perform the link first
-            self.CallSlaves('DEEPLINK', style, includeMirrors=includeMirrors)
-
-        else:
-            fsdirs = self.filespaces
-
-        for dir in fsdirs:
-            (location, content) = os.path.split(dir)
-
-            # For Backup fsdirs is the source, for INSTALL it's the target
-            if style == 'BACKUP':
-                source      = os.path.join(location, content)
-                destination = os.path.join(location, WORKDIR, BACKUPDIR, content)
-            if style == 'SKELETON':
-                source      = os.path.join(location, content)
-                destination = os.path.join(location, WORKDIR, UPGRADEDIR, content)
-            if style == 'INSTALL':
-                source      = os.path.join(location, WORKDIR, UPGRADEDIR, content)
-                destination = os.path.join(location, content)
-            if style == 'REVERT':
-                source      = os.path.join(location, WORKDIR, BACKUPDIR, content)
-                destination = os.path.join(location, content)
-
-                if not os.path.exists(source):
-                    continue
-
-            # Link the log file
-            if os.path.isfile(source + '.log'):
-                oldfile = sou

<TRUNCATED>


Mime
View raw message