hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [7/8] incubator-hawq git commit: HAWQ-121. Remove legacy command line tools.
Date Thu, 05 Nov 2015 03:10:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9932786b/tools/bin/gpexpand
----------------------------------------------------------------------
diff --git a/tools/bin/gpexpand b/tools/bin/gpexpand
deleted file mode 100755
index eda97d6..0000000
--- a/tools/bin/gpexpand
+++ /dev/null
@@ -1,2931 +0,0 @@
-#!/usr/bin/env python
-# Line too long            - pylint: disable=C0301
-# Invalid name             - pylint: disable=C0103
-#
-# Copyright (c) Pivotal Inc 2014. All Rights Reserved.
-#
-from gppylib.mainUtils import getProgramName
-
-import copy
-import datetime
-import os
-import sys
-import socket
-import signal
-import traceback
-from time import strftime, sleep
-
-try:
-    from gppylib.commands.unix import *
-    from gppylib.fault_injection import inject_fault
-    from gppylib.commands.gp import *
-    from gppylib.commands.pg import PgControlData
-    from gppylib.gparray import GpArray, MODE_CHANGELOGGING, STATUS_DOWN, GpDB
-    from gppylib.gpparseopts import OptParser, OptChecker
-    from gppylib.gplog import *
-    from gppylib.db import catalog
-    from gppylib.db import dbconn
-    from gppylib.userinput import *
-    from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS
-    from gppylib.system import configurationInterface, configurationImplGpdb
-    from gppylib.system.environment import GpMasterEnvironment
-    from pygresql.pgdb import DatabaseError
-    from pygresql import pg
-    from gppylib.gpcoverage import GpCoverage
-    from gppylib.gpcatalog import MASTER_ONLY_TABLES
-    from gppylib.operations.package import SyncPackages
-    from gppylib.operations.utils import ParallelOperation
-    from gppylib.parseutils import line_reader, parse_gpexpand_segment_line, \
-        canonicalize_address
-    from gppylib.operations.filespace import PG_SYSTEM_FILESPACE, GP_TRANSACTION_FILES_FILESPACE,\
-         GP_TEMPORARY_FILES_FILESPACE, GetCurrentFilespaceEntries, GetFilespaceEntries, GetFilespaceEntriesDict,\
-         RollBackFilespaceChanges, GetMoveOperationList, FileType, UpdateFlatFiles
-
-except ImportError, e:
-    sys.exit('ERROR: Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))
-
-
-# constants
-MAX_PARALLEL_EXPANDS=16
-MAX_BATCH_SIZE=128
-
-GPDB_STOPPED=1
-GPDB_STARTED=2
-GPDB_UTILITY=3
-
-FILE_SPACES_INPUT_FILENAME_SUFFIX = ".fs"
-SEGMENT_CONFIGURATION_BACKUP_FILE = "gpexpand.gp_segment_configuration"
-FILE_SPACES_INPUT_FILE_LINE_1_PREFIX = "filespaceOrder"
-
-description = ("""
-Adds additional segments to a pre-existing GPDB Array.
-""")
-
-_help = ["""
-The input file should be be a plain text file with a line for each segment
-to add with the format:
-
-  <hostname>:<port>:<data_directory>:<dbid>:<content>:<definedprimary>
-""",
-"""
-If an input file is not specified, gpexpand will ask a series of questions
-and create one.
-""",
-]
-
-_TODO = ["""
-
-Remaining TODO items:
-====================
-""",
-
-"""* smarter heuristics on setting ranks. """,
-
-"""* make sure system isn't in "readonly mode" during setup. """,
-
-"""* need a startup validation where we check the status detail
-    with gp_distribution_policy and make sure that our book
-    keeping matches reality. we don't have a perfect transactional
-    model since the tables can be in a different database from
-    where the gpexpand schema is kept. """,
-
-"""* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of
-     the system.  should get rid of this requirement. """
-]
-
-_usage = """[-f hosts_file] [-D database_name]
-
-gpexpand -i input_file [-D database_name] [-B batch_size] [-V]
-
-gpexpand [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']]
-         [-a] [-n parallel_processes] [-D database_name]
-
-gpexpand -r [-D database_name]
-
-gpexpand -c [-D database_name]
-
-gpexpand -? | -h | --help | --verbose | -v"""
-
-EXECNAME = os.path.split(__file__)[-1]
-
-
-
-#----------------------- Command line option parser ----------------------
-
-def parseargs():
-    parser = OptParser(option_class=OptChecker,
-                description=' '.join(description.split()),
-                version='%prog version $Revision: #174 $')
-    parser.setHelp(_help)
-    parser.set_usage('%prog ' + _usage)
-    parser.remove_option('-h')
-
-    parser.add_option('-c','--clean', action='store_true',
-                        help='remove the expansion schema.')
-    parser.add_option('-r', '--rollback', action='store_true',
-                      help='rollback failed expansion setup.')
-    parser.add_option('-a', '--analyze', action='store_true',
-                      help='Analyze the expanded table after redistribution.')
-    parser.add_option('-d','--duration', type='duration', metavar='[h][:m[:s]]',
-                      help='duration from beginning to end.')
-    parser.add_option('-e','--end', type='datetime', metavar='datetime',
-                       help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.")
-    parser.add_option('-i','--input', dest="filename",
-                        help="input expansion configuration file.", metavar="FILE")
-    parser.add_option('-f', '--hosts-file', metavar='<hosts_file>',
-                      help='file containing new host names used to generate input file')
-    parser.add_option('-D','--database', dest='database',
-                      help='Database to create the gpexpand schema and tables in.  If this ' \
-                      'option is not given, PGDATABASE will be used.  The template1, ' \
-                      'template0 and postgres databases cannot be used.')
-    parser.add_option('-B', '--batch-size', type='int', default=16, metavar="<batch_size>",
-                      help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE)
-    parser.add_option('-n', '--parallel', type="int", default=1, metavar="<parallel_processes>",
-                        help='number of tables to expand at a time. Valid values are 1-%d.' % MAX_PARALLEL_EXPANDS)
-    parser.add_option('-v','--verbose', action='store_true',
-                      help='debug output.')
-    parser.add_option('-h', '-?', '--help', action='help',
-                        help='show this help message and exit.')
-    parser.add_option('-s', '--silent', action='store_true',
-                      help='Do not prompt for confirmation to proceed on warnings')
-    parser.add_option('--usage', action="briefhelp")
-
-    parser.set_defaults(verbose=False,filters=[], slice=(None, None))
-
-    # Parse the command line arguments
-    (options, args) = parser.parse_args()
-
-    if len(args) > 0:
-        logger.error('Unknown argument %s' % args[0])
-        parser.exit(1)
-
-    #-n sanity check
-    if options.parallel > MAX_PARALLEL_EXPANDS or options.parallel < 1:
-        logger.error('Invalid argument.  parallel value must be >= 1 and <= %d' % MAX_PARALLEL_EXPANDS)
-        parser.print_help()
-        parser.exit(1)
-
-    proccount=os.environ.get('GP_MGMT_PROCESS_COUNT')
-    if options.batch_size == 16 and proccount is not None:
-        options.batch_size = int(proccount)
-
-    if options.batch_size < 1 or options.batch_size > 128:
-        logger.error('Invalid argument.  -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE)
-        parser.print_help()
-        parser.exit(1)
-
-    # OptParse can return date instead of datetime so we might need to convert
-    if options.end and not isinstance(options.end, datetime.datetime):
-        options.end = datetime.datetime.combine(options.end, datetime.time(0))
-
-    if options.end and options.end < datetime.datetime.now():
-        logger.error('End time occurs in the past')
-        parser.print_help()
-        parser.exit(1)
-
-    if options.end and options.duration:
-        logger.warn('Both end and duration options were given.')
-        # Both a duration and an end time were given.
-        if options.end > datetime.datetime.now() + options.duration:
-            logger.warn('The duration argument will be used for the expansion end time.')
-            options.end = datetime.datetime.now() + options.duration
-        else:
-            logger.warn('The end argument will be used for the expansion end time.')
-    elif options.duration:
-        options.end = datetime.datetime.now() + options.duration
-
-    # -c and -r options are mutually exclusive
-    if options.rollback and options.clean:
-        rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r"
-        cleanOpt = "--clean" if "--clean" in sys.argv else "-c"
-        logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt))
-        parser.exit(1)
-
-    try:
-        options.master_data_directory = get_masterdatadir()
-        options.gphome = get_gphome()
-    except GpError, msg:
-        logger.error(msg)
-        parser.exit(1)
-
-    if not os.path.exists(options.master_data_directory):
-         logger.error('Master data directory does not exist.')
-         parser.exit(1)
-
-    if options.database and (options.database.lower() == 'template0'
-                             or options.database.lower() == 'template1'
-                             or options.database.lower() == 'postgres'):
-        logger.error('%s cannot be used to store the gpexpand schema and tables' % options.database)
-        parser.exit(1)
-    elif not options.database:
-        options.database = os.getenv('PGDATABASE', os.getenv('LOGNAME'))
-
-    options.pgport = int(os.getenv('PGPORT', 5432))
-
-    return options, args
-
-#-------------------------------------------------------------------------
-# process information functions
-def create_pid_file(master_data_directory):
-    """Creates gpexpand pid file"""
-    try:
-        fp = open(master_data_directory + '/gpexpand.pid', 'w')
-        fp.write(str(os.getpid()))
-    except IOError:
-        raise
-    finally:
-        if fp: fp.close()
-
-def remove_pid_file(master_data_directory):
-    """Removes gpexpand pid file"""
-    try:
-        os.unlink(master_data_directory + '/gpexpand.pid')
-    except:
-        pass
-
-def is_gpexpand_running(master_data_directory):
-    """Checks if there is another instance of gpexpand running"""
-    is_running = False
-    try:
-        fp = open(master_data_directory + '/gpexpand.pid', 'r')
-        pid = int(fp.readline().strip())
-        fp.close()
-        is_running = check_pid(pid)
-    except IOError:
-        pass
-    except Exception, msg:
-        raise
-
-    return is_running
-
-
-def gpexpand_status_file_exists(master_data_directory):
-    """Checks if gpexpand.pid exists"""
-    return os.path.exists(master_data_directory + '/gpexpand.status')
-
-
-#-------------------------------------------------------------------------
-#expansion schema
-
-undone_status = "NOT STARTED"
-start_status = "IN PROGRESS"
-done_status = "COMPLETED"
-does_not_exist_status = 'NO LONGER EXISTS'
-
-gpexpand_schema='gpexpand'
-create_schema_sql = "CREATE SCHEMA " + gpexpand_schema
-drop_schema_sql = "DROP schema IF EXISTS %s CASCADE" % gpexpand_schema
-
-status_table='status'
-status_table_sql="""CREATE TABLE %s.%s
-                        ( status text,
-                          updated timestamp ) """ % (gpexpand_schema,status_table)
-
-status_detail_table='status_detail'
-status_detail_table_sql="""CREATE TABLE %s.%s
-                        ( dbname text,
-                          fq_name text,
-                          schema_oid oid,
-                          table_oid oid,
-                          distribution_policy smallint[],
-                          distribution_policy_names text,
-                          distribution_policy_coloids text,
-                          storage_options text,
-                          rank int,
-                          status text,
-                          expansion_started timestamp,
-                          expansion_finished timestamp,
-                          source_bytes numeric,
-                          updated timestamp) """ % (gpexpand_schema, status_detail_table)
-
-class StatusDetailCols(object):
-    """
-    StatusDetailCols represents columns for status_detail table.
-    When updating the table definition, this class also needs to update.
-    """
-
-    class Column(object):
-        def __init__(self, colorder, default):
-            self.order = colorder
-            self.value = default
-
-        def set_value(self, val):
-            self.value = val
-
-        def get_value(self):
-            return self.value
-
-    def __init__(self, **kwargs):
-        """
-        @param kwargs with key column name and value select expression.
-        """
-        cols = [
-            'dbname',
-            'fq_name',
-            'schema_oid',
-            'table_oid',
-            'distribution_policy',
-            'distribution_policy_names',
-            'distribution_policy_coloids',
-            'storage_options',
-            'rank',
-            'status',
-            'expansion_started',
-            'expansion_finished',
-            'source_bytes',
-            'updated'
-            ]
-        colmap = {}
-        for i, col in enumerate(cols):
-            colmap[col] = StatusDetailCols.Column(i, col)
-        # default value for updated.  We need to use python datetime.now()
-        # instead of database now(), as we compare it user input elsewhere.
-        colmap['updated'].set_value(quote(datetime.datetime.now()))
-        for k, v in kwargs.iteritems():
-            colmap[k].set_value(v)
-        self.cols = cols
-        self.colmap = colmap
-
-    def view_list(self):
-        """
-        view_list returns a list of columns used in view definition.
-        """
-        return ', '.join(filter(lambda x: x != 'updated', self.cols))
-
-    def into_list(self):
-        """
-        into_list returns a list of column names used in INSERT INTO.
-        """
-        return ', '.join(self.cols)
-
-    def value_list(self):
-        """
-        value_list returns a list of values referenced in SELECT targets.
-        """
-        return ', '.join(map(lambda x: self.colmap[x].get_value(), self.cols))
-
-def quote(arg):
-    """Does someone care escapes?"""
-    return "'" + str(arg) + "'"
-
-
-logical_status_view='logical_status'
-logical_status_view_sql="""CREATE VIEW {schema}.{viewname} AS
-SELECT
-    {select}
-FROM(
-    SELECT
-        *,
-        max(updated) OVER (PARTITION BY dbname, schema_oid, table_oid) AS last_updated
-    FROM
-        {schema}.{table}
-)s
-WHERE updated = last_updated
-""".format(
-        schema=gpexpand_schema,
-        viewname=logical_status_view,
-        table=status_detail_table,
-        select=StatusDetailCols().view_list())
-
-# gpexpand views
-progress_view='expansion_progress'
-progress_view_sql="""CREATE VIEW %s.%s AS
-SELECT
-    CASE status
-        WHEN '%s' THEN 'Tables Expanded'
-        WHEN '%s' THEN 'Tables Left'
-        WHEN '%s' THEN 'Tables In Progress'
-    END AS Name,
-    count(*)::text AS Value
-FROM %s.%s GROUP BY status
-
-UNION
-
-SELECT
-    CASE status
-        WHEN '%s' THEN 'Bytes Done'
-        WHEN '%s' THEN 'Bytes Left'
-        WHEN '%s' THEN 'Bytes In Progress'
-    END AS Name,
-    SUM(source_bytes)::text AS Value
-FROM %s.%s GROUP BY status
-
-UNION
-
-SELECT
-    'Estimated Expansion Rate' AS Name,
-    (SUM(source_bytes) / (1 + extract(epoch FROM (max(expansion_finished) - min(expansion_started)))) / 1024 / 1024)::text || ' MB/s' AS Value
-FROM %s.%s
-WHERE status = '%s'
-AND
-expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY updated DESC LIMIT 1)
-
-UNION
-
-SELECT
-'Estimated Time to Completion' AS Name,
-CAST((SUM(source_bytes) / (
-SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(expansion_finished) - min(expansion_started)))))
-FROM %s.%s
-WHERE status = '%s'
-AND
-expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY
-updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value
-FROM %s.%s
-WHERE status = '%s'
-  OR status = '%s'""" % (gpexpand_schema, progress_view,
-                         done_status, undone_status, start_status,
-                         gpexpand_schema, logical_status_view,
-                         done_status, undone_status, start_status,
-                         gpexpand_schema, logical_status_view,
-                         gpexpand_schema, logical_status_view,
-                         done_status,
-                         gpexpand_schema, status_table,
-                         'EXPANSION STARTED',
-                         gpexpand_schema, logical_status_view,
-                         done_status,
-                         gpexpand_schema, status_table,
-                         'EXPANSION STARTED',
-                         gpexpand_schema, logical_status_view,
-                         start_status, undone_status)
-
-unalterable_table_sql = """
-SELECT
-    current_database() AS database,
-    pg_catalog.quote_ident(nspname) || '.' ||
-    pg_catalog.quote_ident(relname) AS table,
-    attnum,
-    attlen,
-    attbyval,
-    attstorage,
-    attalign,
-    atttypmod,
-    attndims,
-    reltoastrelid != 0 AS istoasted
-FROM
-    pg_catalog.pg_attribute,
-    pg_catalog.pg_class,
-    pg_catalog.pg_namespace
-WHERE
-    attisdropped
-    AND attnum >= 0
-    AND attrelid = pg_catalog.pg_class.oid
-    AND relnamespace = pg_catalog.pg_namespace.oid
-    AND (attlen, attbyval, attalign, attstorage) NOT IN
-        (SELECT typlen, typbyval, typalign, typstorage
-        FROM pg_catalog.pg_type
-        WHERE typisdefined AND typtype='b' )
-ORDER BY
-    attrelid, attnum
-"""
-
-has_unique_index_sql = """
-SELECT
-    current_database() || '.' || pg_catalog.quote_ident(nspname) || '.' || pg_catalog.quote_ident(relname) AS table
-FROM
-    pg_class c,
-    pg_namespace n,
-    pg_index i
-WHERE
-  i.indrelid = c.oid
-  AND c.relnamespace = n.oid
-  AND i.indisunique
-  AND n.nspname not in ('pg_catalog', 'information_schema', 'pg_toast',
-                        'pg_bitmapindex', 'pg_aoseg')
-"""
-
-#-------------------------------------------------------------------------
-class InvalidStatusError(Exception): pass
-class ValidationError(Exception): pass
-
-#-------------------------------------------------------------------------
-class GpExpandStatus():
-    """Class that manages gpexpand status file.
-
-    The status file is placed in the master data directory on both the master and
-    the standby master.  it's used to keep track of where we are in the progression.
-    """
-    def __init__(self, logger, master_data_directory, master_mirror=None):
-        self.logger=logger
-
-        self._status_values = { 'UNINITIALIZED': 1,
-                                'EXPANSION_PREPARE_STARTED': 2,
-                                'BUILD_SEGMENT_TEMPLATE_STARTED': 3,
-                                'BUILD_SEGMENT_TEMPLATE_DONE': 4,
-                                'BUILD_SEGMENTS_STARTED': 5,
-                                'BUILD_SEGMENTS_DONE': 6,
-                                'UPDATE_OLD_SEGMENTS_STARTED': 7,
-                                'UPDATE_OLD_SEGMENTS_DONE': 8,
-                                'UPDATE_CATALOG_STARTED': 9,
-                                'UPDATE_CATALOG_DONE': 10,
-                                'SETUP_EXPANSION_SCHEMA_STARTED': 11,
-                                'SETUP_EXPANSION_SCHEMA_DONE': 12,
-                                'PREPARE_EXPANSION_SCHEMA_STARTED': 13,
-                                'PREPARE_EXPANSION_SCHEMA_DONE': 14,
-                                'EXPANSION_PREPARE_DONE': 15
-                                }
-        self._status = []
-        self._status_info = []
-        self._master_data_directory = master_data_directory
-        self._master_mirror = master_mirror
-        self._status_filename = master_data_directory + '/gpexpand.status'
-        self._status_standby_filename = master_data_directory + '/gpexpand.standby.status'
-        self._fp = None
-        self._fp_standby = None
-        self._temp_dir = None
-        self._input_filename = None
-        self._original_primary_count = None
-        self._gp_segment_configuration_backup = None
-
-        if os.path.exists(self._status_filename):
-            self._read_status_file()
-
-    def _read_status_file(self):
-        """Reads in an existing gpexpand status file"""
-        self.logger.debug("Trying to read in a pre-existing gpexpand status file")
-        try:
-            self._fp = open(self._status_filename, 'a+')
-            self._fp.seek(0)
-
-            for line in self._fp:
-                (status, status_info) = line.rstrip().split(':')
-                if status == 'BUILD_SEGMENT_TEMPLATE_STARTED':
-                    self._temp_dir = status_info
-                elif status == 'BUILD_SEGMENTS_STARTED':
-                    self._tar_filename = status_info
-                elif status == 'BUILD_SEGMENTS_DONE':
-                    self._number_new_segments = status_info
-                elif status == 'EXPANSION_PREPARE_STARTED':
-                    self._input_filename = status_info
-                elif status == 'UPDATE_OLD_SEGMENTS_STARTED':
-                    self._original_primary_count = int(status_info)
-                elif status == 'UPDATE_CATALOG_STARTED':
-                    self._gp_segment_configuration_backup = status_info
-
-                self._status.append(status)
-                self._status_info.append(status_info)
-        except IOError:
-            raise
-
-        if not self._status_values.has_key(self._status[-1]):
-            raise InvalidStatusError('Invalid status file.  Unknown status %s' % self._status)
-
-    def create_status_file(self):
-        """Creates a new gpexpand status file"""
-        try:
-            self._fp = open(self._status_filename, 'w')
-            if self._master_mirror:
-                self._fp_standby = open(self._status_standby_filename, 'w')
-                self._fp_standby.write('UNINITIALIZED:None\n')
-                self._fp_standby.flush()
-            self._fp.write('UNINITIALIZED:None\n')
-            self._fp.flush()
-            self._status.append('UNINITIALIZED')
-            self._status_info.append('None')
-        except IOError:
-            raise
-
-        self._sync_status_file()
-
-
-    def _sync_status_file(self):
-        """Syncs the gpexpand status file with the master mirror"""
-        if self._master_mirror:
-            cpCmd = RemoteCopy('gpexpand copying status file to master mirror',
-                               self._status_standby_filename, self._master_mirror.getSegmentHostName(),
-                               self._status_filename)
-            cpCmd.run(validateAfter=True)
-
-    def set_status(self, status, status_info=None):
-        """Sets the current status.  gpexpand status must be set in
-           proper order.  Any out of order status result in an
-           InvalidStatusError exception"""
-        self.logger.debug("Transitioning from %s to %s" % (self._status[-1],status))
-
-        if not self._fp:
-            raise InvalidStatusError('The status file is invalid and cannot be written to')
-        if not self._status_values.has_key(status):
-            raise InvalidStatusError('%s is an invalid gpexpand status' % status)
-        # Only allow state transitions forward or backward 1
-        if self._status and \
-           self._status_values[status] != self._status_values[self._status[-1]] + 1:
-                raise InvalidStatusError('Invalid status transition from %s to %s' % (self._status[-1], status))
-        if self._master_mirror:
-            self._fp_standby.write('%s:%s\n' % (status, status_info))
-            self._fp_standby.flush()
-            self._sync_status_file()
-        self._fp.write('%s:%s\n' % (status, status_info))
-        self._fp.flush()
-        self._status.append(status)
-        self._status_info.append(status_info)
-
-    def get_current_status(self):
-        """Gets the current status that has been written to the gpexpand
-           status file"""
-        if (len(self._status) > 0 and len(self._status_info) > 0):
-            return (self._status[-1], self._status_info[-1])
-        else:
-            return (None, None)
-
-    def get_status_history(self):
-        """Gets the full status history"""
-        return zip(self._status, self._status_info)
-
-    def remove_status_file(self):
-        """Closes and removes the gpexand status file"""
-        if self._fp:
-            self._fp.close()
-            self._fp = None
-        if self._fp_standby:
-            self._fp_standby.close()
-            self._fp_standby = None
-        if os.path.exists(self._status_filename):
-            os.unlink(self._status_filename)
-        if os.path.exists(self._status_standby_filename):
-            os.unlink(self._status_standby_filename)
-        if self._master_mirror:
-            RemoveFiles.remote('gpexpand master mirror status file cleanup',
-                               self._master_mirror.getSegmentHostName(),
-                               self._status_filename)
-
-    def remove_segment_configuration_backup_file(self):
-        """ Remove the segment configuration backup file """
-        self.logger.debug("Removing segment configuration backup file")
-        if self._gp_segment_configuration_backup != None and os.path.exists(self._gp_segment_configuration_backup) == True:
-           os.unlink(self._gp_segment_configuration_backup)
-
-    def get_temp_dir(self):
-        """Gets temp dir that was used during template creation"""
-        return self._temp_dir
-
-    def get_input_filename(self):
-        """Gets input file that was used by expansion setup"""
-        return self._input_filename
-
-    def get_tar_filename(self):
-        """Gets tar file that was used during template creation"""
-        return self._tar_filename
-
-    def get_number_new_segments(self):
-        """ Gets the number of new segments added """
-        return self._number_new_segments
-
-    def get_original_primary_count(self):
-        """Returns the original number of primary segments"""
-        return self._original_primary_count
-
-    def get_gp_segment_configuration_backup(self):
-        """Gets the filename of the gp_segment_configuration backup file
-        created during expansion setup"""
-        return self._gp_segment_configuration_backup
-
-    def set_gp_segment_configuration_backup(self, filename):
-        """Sets the filename of the gp_segment_configuration backup file"""
-        self._gp_segment_configuration_backup = filename
-
-    def is_standby(self):
-        """Returns True if running on standby"""
-        return os.path.exists(self._master_data_directory + self._status_standby_filename)
-
-#-------------------------------------------------------------------------
-
-class ExpansionError(Exception): pass
-class SegmentTemplateError(Exception): pass
-
-#-------------------------------------------------------------------------
-class SegmentTemplate:
-    """Class for creating, distributing and deploying new segments to an
-    existing GPDB array"""
-
-    def __init__(self, logger, statusLogger, pool,
-                 gparray, masterDataDirectory,
-                 dburl,conn, tempDir,
-                 schemaTarFile='gpexpand_schema.tar'):
-        self.logger = logger
-        self.statusLogger = statusLogger
-        self.pool = pool
-        self.gparray = gparray
-        self.tempDir = tempDir
-        self.dburl = dburl
-        self.conn = conn
-        self.gparray = gparray
-        self.masterDataDirectory = masterDataDirectory
-        self.schema_tar_file = schemaTarFile
-        self.maxDbId = self.gparray.get_max_dbid()
-
-        hosts = []
-        for seg in gparray.getExpansionSegDbList():
-            hosts.append(seg.getSegmentHostName())
-        self.hosts = SegmentTemplate.consolidate_hosts(pool, hosts)
-        logger.debug('Hosts: %s' % self.hosts)
-
-    @staticmethod
-    def consolidate_hosts(pool, hosts):
-        tmpHosts = {}
-        consolidatedHosts = []
-
-        for host in hosts:
-            tmpHosts[host] = 0
-
-        for host in tmpHosts.keys():
-            hostnameCmd = Hostname('gpexpand associating hostnames with segments', ctxt=REMOTE, remoteHost=host)
-            pool.addCommand(hostnameCmd)
-
-        pool.join()
-
-        finished_cmds = pool.getCompletedItems()
-
-        for cmd in finished_cmds:
-            if not cmd.was_successful():
-                raise SegmentTemplateError(cmd.get_results())
-            if cmd.get_hostname() not in consolidatedHosts:
-                logger.debug('Adding %s to host list' % cmd.get_hostname())
-                consolidatedHosts.append(cmd.get_hostname())
-
-        return consolidatedHosts
-
-
-    def build_segment_template(self):
-        """Builds segment template tar file"""
-        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_STARTED', self.tempDir)
-        self._create_template()
-        self._fixup_template()
-        self._tar_template()
-        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_DONE')
-
-
-    def build_new_segments(self):
-        """Deploys the template tar file and configures the new segments"""
-        self.statusLogger.set_status('BUILD_SEGMENTS_STARTED', os.path.abspath(self.schema_tar_file))
-        self._distribute_template()
-        self._configure_new_segments()
-        numNewSegments = len(self.gparray.getExpansionSegDbList())
-        self.statusLogger.set_status('BUILD_SEGMENTS_DONE', numNewSegments)
-
-
-    def _create_template(self):
-        """Creates the schema template that is used by new segments"""
-        self.logger.info('Creating segment template')
-
-        MakeDirectory.local('gpexpand create temp dir',self.tempDir)
-
-        self.oldSegCount = self.gparray.get_segment_count()
-
-        GpStop.local('gpexpand _create_template stop gpdb', masterOnly=True, fast=True)
-
-        # Verify that we actually stopped
-        self.logger.debug('Validating array state')
-        pgControlDataCmd = PgControlData('Validate stopped', self.masterDataDirectory)
-        state = None
-        try:
-            pgControlDataCmd.run(validateAfter=True)
-        except Exception, e:
-            raise SegmentTemplateError(e)
-        state = pgControlDataCmd.get_value('Database cluster state')
-        if state != 'shut down':
-            raise SegmentTemplateError('Failed to stop the array.  pg_controldata return state of %s' % state)
-
-        try:
-            srcSeg = self._select_src_segment()
-            srcSeg.createTemplate(dstDir=self.tempDir)
-        except Exception, msg:
-            raise SegmentTemplateError(msg)
-
-
-    def _select_src_segment(self):
-        """Gets a segment to use as a source for pg_hba.conf
-        and postgresql.conf files"""
-        seg = self.gparray.segments[0]
-        if seg.primaryDB.valid:
-            self.srcSegHostname = seg.primaryDB.getSegmentHostName()
-            self.srcSegDataDir = seg.primaryDB.getSegmentDataDirectory()
-            return seg.primaryDB
-        else:
-            raise SegmentTemplateError("no valid segdb for content=0 to use as a template")
-
-
-    def _distribute_template(self):
-        """Distributes the template tar file to the new segments and expands it"""
-        self.logger.info('Distributing template tar file to new hosts')
-
-        self._distribute_tarfile()
-
-    def _distribute_tarfile(self):
-        """Distributes template tar file to hosts"""
-        for host in self.hosts:
-            logger.debug('Copying tar file to %s' % host)
-            cpCmd = RemoteCopy('gpexpand distribute tar file to new hosts', self.schema_tar_file, host, '.')
-            self.pool.addCommand(cpCmd)
-
-        self.pool.join()
-        self.pool.check_results()
-
-
-    def _configure_new_segments(self):
-        """Configures new segments.  This includes modifying the postgresql.conf file
-        and setting up the gp_id table"""
-
-        self.logger.info('Configuring new segments (primary)')
-        new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(), primaryMirror = 'primary')
-        for host in iter(new_segment_info):
-            segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
-                                           tarFile=self.schema_tar_file, newSegments=True,
-                                           verbose=gplog.logging_is_verbose(), batchSize=options.batch_size,
-                                           ctxt=REMOTE, remoteHost=host)
-            self.pool.addCommand(segCfgCmd)
-
-        self.pool.join()
-        self.pool.check_results()
-
-    def _get_transaction_filespace_dir(self, transaction_flat_file): 
-        filespace_dir = None
-
-        with open(transaction_flat_file) as file:
-            for line in file:
-                fs_info = line.split()
-                if len(fs_info) != 2:
-                    continue
-                filespace_dir = fs_info[1]
-
-        return filespace_dir
-    
-    def _fixup_template(self):
-        """Copies postgresql.conf and pg_hba.conf files from a valid segment on the system.
-        Then modifies the template copy of pg_hba.conf"""
-
-        self.logger.info('Copying postgresql.conf from existing segment into template')
-
-        localHostname = self.gparray.master.getSegmentHostName()
-        cpCmd = RemoteCopy('gpexpand copying postgresql.conf to %s:%s/postgresql.conf' % (self.srcSegHostname, self.srcSegDataDir),
-                           self.srcSegDataDir + '/postgresql.conf', localHostname,
-                           self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
-        cpCmd.run(validateAfter=True)
-
-
-        self.logger.info('Copying pg_hba.conf from existing segment into template')
-        cpCmd = RemoteCopy('gpexpand copy pg_hba.conf to %s:%s/pg_hba.conf' % (self.srcSegHostname, self.srcSegDataDir),
-                           self.srcSegDataDir + '/pg_hba.conf', localHostname,
-                           self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
-        cpCmd.run(validateAfter=True)
-
-        #Copy the transaction directories into template
-        pg_system_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray, 
-                                                                                  PG_SYSTEM_FILESPACE).run()).run()
-        transaction_flat_file = os.path.join(pg_system_filespace_entries[1][2], GP_TRANSACTION_FILES_FILESPACE)
-        filespace_dir = None
-        if os.path.exists(transaction_flat_file):
-            filespace_dir = self._get_transaction_filespace_dir(transaction_flat_file) 
-            logger.debug('Filespace location = %s' % filespace_dir)
-
-            if filespace_dir:
-                transaction_files_dir = ['pg_xlog', 'pg_multixact', 'pg_subtrans', 'pg_clog',
-                                         'pg_distributedlog', 'pg_distributedxidmap']
-                for directory in transaction_files_dir:
-                    dst_dir = os.path.join(self.tempDir, directory)
-                    src_dir = os.path.join(filespace_dir, directory)
-
-                    mkCmd = MakeDirectory('gpexpand creating transaction directories in template', dst_dir)
-                    mkCmd.run(validateAfter=True)
-                    cpCmd = LocalDirCopy('gpexpand copying dir %s' % src_dir, src_dir, dst_dir)
-                    cpCmd.run(validateAfter=True)
-    
-        # Don't need log files and gpperfmon files in template.
-        rmCmd = RemoveFiles('gpexpand remove gppermfon data from template',
-                            self.tempDir + '/gpperfmon/data')
-        rmCmd.run(validateAfter=True)
-        rmCmd = RemoveFiles('gpexpand remove logs from template',
-                            self.tempDir + '/pg_log/*')
-        rmCmd.run(validateAfter=True)
-
-        #other files not needed
-        rmCmd = RemoveFiles('gpexpand remove postmaster.opt from template',
-                            self.tempDir + '/postmaster.opts')
-        rmCmd.run(validateAfter=True)
-        rmCmd = RemoveFiles('gpexpand remove postmaster.pid from template',
-                            self.tempDir + '/postmaster.pid')
-        rmCmd.run(validateAfter=True)
-        rmCmd = RemoveFiles('gpexpand remove gpexpand files from template',
-                            self.tempDir + '/gpexpand.*')
-        rmCmd.run(validateAfter=True)
-
-        #We dont need the flat files 
-        rmCmd = RemoveFiles('gpexpand remove transaction flat file from template',
-                            self.tempDir + '/' + GP_TRANSACTION_FILES_FILESPACE)
-        rmCmd.run(validateAfter=True)
-        rmCmd = RemoveFiles('gpexpand remove temporary flat file from template',
-                            self.tempDir + '/' + GP_TEMPORARY_FILES_FILESPACE)
-        rmCmd.run(validateAfter=True)
-
-        self.logger.info('Adding new segments into template pg_hba.conf')
-        try:
-            fp = open(self.tempDir + '/pg_hba.conf', 'a')
-            try:
-                new_host_set = set()
-                for newSeg in self.gparray.getExpansionSegDbList():
-                    address = newSeg.getSegmentAddress()
-                    host = newSeg.getSegmentHostName()
-                    new_host_set.add(host)
-                    addrinfo = socket.getaddrinfo(address, None)
-                    ipaddrlist = list(set([ (ai[0], ai[4][0]) for ai in addrinfo]))
-                    fp.write('# %s\n' % address)
-                    for addr in ipaddrlist:
-                        fp.write('host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
-
-                for new_host in new_host_set:
-                    addrinfo = socket.getaddrinfo(new_host, None)
-                    ipaddrlist = list(set([ (ai[0], ai[4][0]) for ai in addrinfo]))
-                    fp.write('# %s\n' % new_host)
-                    for addr in ipaddrlist:
-                        fp.write('host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
-
-            finally:
-                fp.close()
-        except IOError, msg:
-            raise SegmentTemplateError('Failed to open %s/pg_hba.conf' % self.tempDir)
-        except Exception, msg:
-            raise SegmentTemplateError('Failed to add new segments to template pg_hba.conf')
-
-
-    def _tar_template(self):
-        """Tars up the template files"""
-        self.logger.info('Creating schema tar file')
-        tarCmd = CreateTar('gpexpand tar segment template', self.tempDir, self.schema_tar_file)
-        tarCmd.run(validateAfter=True)
-
-
-    @staticmethod
-    def cleanup_build_segment_template(tarFile, tempDir):
-        """Reverts the work done by build_segment_template.  Deletes the temp
-        directory and local tar file"""
-        rmCmd = RemoveFiles('gpexpand remove temp dir: %s' % tempDir, tempDir)
-        rmCmd.run(validateAfter=True)
-        rmCmd = RemoveFiles('gpexpand remove segment template file', tarFile)
-        rmCmd.run(validateAfter=True)
-
-
-    @staticmethod
-    def cleanup_build_new_segments(pool, tarFile, gparray, hosts=None, removeDataDirs=False):
-        """Cleans up the work done by build_new_segments.  Deletes remote tar files and
-        and removes remote data directories"""
-
-        if not hosts:
-            hosts = []
-            for seg in gparray.getExpansionSegDbList():
-                hosts.append(seg.getSegmentHostName())
-
-        # Remove template tar file
-        for host in hosts:
-            rmCmd = RemoveFiles('gpexpand remove segment template file on host: %s' % host,
-                                tarFile, ctxt=REMOTE, remoteHost=host)
-            pool.addCommand(rmCmd)
-
-        if removeDataDirs:
-            for seg in gparray.getExpansionSegDbList():
-                hostname = seg.getSegmentHostName()
-                filespaces = seg.getSegmentFilespaces()
-                for oid in filespaces:
-                    datadir = filespaces[oid]
-                    rmCmd = RemoveFiles('gpexpand remove new segment data directory: %s:%s' % (hostname, datadir),
-                                        datadir, ctxt=REMOTE, remoteHost=hostname)
-                    pool.addCommand(rmCmd)
-        pool.join()
-        pool.check_results()
-
-
-    def cleanup(self):
-        """Cleans up temporary files from the local system and new segment hosts"""
-
-        self.logger.info('Cleaning up temporary template files')
-        SegmentTemplate.cleanup_build_segment_template(self.schema_tar_file, self.tempDir)
-        SegmentTemplate.cleanup_build_new_segments(self.pool, self.schema_tar_file, self.gparray, self.hosts)
-
-
-
-
-
-#------------------------------------------------------------------------------------------------------
-#------------------------------------------------------------------------------------------------------
-class NewSegmentInput:
-    
-    def __init__(self, hostname, address, port, datadir, dbid, contentId, role, replicationPort = None, fileSpaces = None):
-        self.hostname = hostname
-        self.address = address
-        self.port = port
-        self.datadir = datadir
-        self.dbid = dbid
-        self.contentId = contentId
-        self.role = role
-        self.replicationPort = replicationPort
-        self.fileSpaces = fileSpaces
-        
-
-#------------------------------------------------------------------------------------------------------
-#------------------------------------------------------------------------------------------------------
-class gpexpand:
-
-    def __init__(self,logger,gparray,dburl,parallel=1):
-        self.pastThePointOfNoReturn = False
-        self.logger = logger
-        self.dburl = dburl
-        self.numworkers=parallel
-        self.gparray = gparray
-        self.unique_index_tables = {}
-        self.conn = dbconn.connect(self.dburl,utility=True, encoding='UTF8', allowSystemTableMods = 'dml')
-        self.old_segments = gparray.getSegDbList()
-
-        if dburl.pgdb == 'template0' or dburl.pgdb == 'template1' or dburl.pgdb == 'postgres':
-            raise ExpansionError("Invalid database '%s' specified.  Cannot use a template database.\n"
-                                 "Please set the environment variable PGDATABASE to a different "
-                                 "database or use the -D option to specify a database and re-run" % dburl.pgdb)
-
-
-        datadir = gparray.master.getSegmentDataDirectory()
-        self.statusLogger = GpExpandStatus(logger, datadir, gparray.standbyMaster)
-
-        # Adjust batch size if it's too high given the number of segments
-        seg_count = len(gparray.getSegDbList())
-        if options.batch_size > seg_count:
-            options.batch_size = seg_count
-        self.pool = WorkerPool(numWorkers=options.batch_size)
-
-        self.tempDir = self.statusLogger.get_temp_dir()
-        if not self.tempDir:
-            self.tempDir = createTempDirectoryName(options.master_data_directory, "gpexpand")
-        self.queue = None
-        self.segTemplate = None
-        pass
-
-    @staticmethod
-    def prepare_gpdb_state(logger, dburl):
-        """ Gets GPDB in the appropriate state for an expansion.
-        This state will depend on if this is a new expansion setup,
-        a continuation of a previous expansion or a rollback """
-        # Get the database in the expected state for the expansion/rollback
-        status_file_exists = os.path.exists(options.master_data_directory + '/gpexpand.status')
-        gpexpand_db_status = None
-
-        if status_file_exists:
-            # gpexpand status file exists so the last run of gpexpand didn't finish properly
-            gpexpand.get_gpdb_in_state(GPDB_UTILITY)
-        else:
-            gpexpand.get_gpdb_in_state(GPDB_STARTED)
-
-            logger.info('Querying gpexpand schema for current expansion state')
-            try:
-                gpexpand_db_status = gpexpand.get_status_from_db(dburl)
-            except Exception, e:
-                raise Exception('Error while trying to query the gpexpand schema: %s' % e)
-            logger.debug('Expansion status returned is %s' % gpexpand_db_status)
-
-            if (not gpexpand_db_status and options.filename) and not options.clean:
-                # New expansion, need to be in master only
-                logger.info('Readying HAWQ for a new expansion')
-                gpexpand.get_gpdb_in_state(GPDB_UTILITY)
-
-        return gpexpand_db_status
-
-    @staticmethod
-    def get_gpdb_in_state(state):
-        runningStatus = chk_local_db_running(options.master_data_directory, options.pgport)
-        gpdb_running = runningStatus[0] and runningStatus[1] and runningStatus[2] and runningStatus[3]
-        if gpdb_running:
-            gpdb_mode = get_local_db_mode(options.master_data_directory)
-
-        if state == GPDB_STARTED:
-            if gpdb_running:
-                 if gpdb_mode != 'UTILITY':
-                     return
-                 else:
-                     GpStop.local('Stop GPDB', masterOnly=True, fast=True)
-            GpStart.local('Start GPDB')
-        elif state == GPDB_STOPPED:
-            if gpdb_running:
-                if gpdb_mode != 'UTILITY':
-                    GpStop.local('Stop GPDB', fast=True)
-                else:
-                    GpStop.local('Stop GPDB', masterOnly=True, fast=True)
-        elif state == GPDB_UTILITY:
-            if gpdb_running:
-                if gpdb_mode == 'UTILITY':
-                    return
-                GpStop.local('Stop GPDB', fast=True)
-            GpStart.local('Start GPDB in master only mode', masterOnly=True)
-        else:
-            raise Exception('Unkown gpdb state')
-
-    @staticmethod
-    def get_status_from_db(dburl):
-        """Gets gpexpand status from the gpexpand schema"""
-        status_conn = None
-        gpexpand_db_status = None
-        if get_local_db_mode(options.master_data_directory) == 'NORMAL':
-            try:
-                status_conn = dbconn.connect(dburl, encoding='UTF8')
-                # Get the last status entry
-                cursor = dbconn.execSQL(status_conn,'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
-                if cursor.rowcount == 1:
-                    gpexpand_db_status = cursor.fetchone()[0]
-
-            except Exception, e:
-                # expansion schema doesn't exists or there was a connection failure.
-                pass
-            finally:
-                if status_conn: status_conn.close()
-
-        # make sure gpexpand schema doesn't exist since it wasn't in DB provided
-        if not gpexpand_db_status:
-            """
-            MPP-14145 - If there's no discernable status, the schema must not exist.
-
-            The checks in get_status_from_db claim to look for existence of the 'gpexpand' schema, but more accurately they're
-            checking for non-emptiness of the gpexpand.status table. If the table were empty, but the schema did exist, gpexpand would presume 
-            a new expansion was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out. 
-
-            Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpexpand schema.
-            """
-            with dbconn.connect(dburl, encoding='UTF8', utility=True) as conn:
-                count = dbconn.execSQLForSingleton(conn, "select count(n.nspname) from pg_catalog.pg_namespace n where n.nspname = 'gpexpand'")
-                if count > 0:
-                    raise ExpansionError("Existing expansion state could not be determined, but a gpexpand schema already exists. Cannot proceed.")
-
-            # now determine whether gpexpand schema merely resides in another DB
-            status_conn = dbconn.connect(dburl, encoding='UTF8')
-            db_list = catalog.getDatabaseList(status_conn)
-            status_conn.close()
-
-            for db in db_list:
-                dbname=db[0]
-                if dbname in ['template0', 'template1', 'postgres', dburl.pgdb]:
-                    continue
-                logger.debug('Looking for gpexpand schema in %s' % dbname.decode('utf-8'))
-                test_url = copy.deepcopy(dburl)
-                test_url.pgdb = dbname
-                c = dbconn.connect(test_url, encoding='UTF8')
-                try:
-                    cursor = dbconn.execSQL(c, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
-                except:
-                    # Not in here
-                    pass
-                else:
-                    raise ExpansionError("""gpexpand schema exists in database %s, not in %s.
-Set PGDATABASE or use the -D option to specify the correct database to use.""" % (dbname.decode('utf-8'), options.database))
-                finally:
-                    if c:
-                        c.close()
-
-        return gpexpand_db_status
-
-    def validate_max_connections(self):
-        try:
-            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
-            max_connections = int(catalog.getSessionGUC(conn, 'max_connections'))
-        except DatabaseError,ex:
-            if options.verbose:
-                logger.exception(ex)
-            logger.error('Failed to check max_connections GUC')
-            if conn: conn.close()
-            raise ex
-
-        if max_connections < options.parallel * 2 + 1:
-            self.logger.error('max_connections is too small to expand %d tables at' % options.parallel)
-            self.logger.error('a time.  This will lead to connection errors.  Either')
-            self.logger.error('reduce the value for -n passed to gpexpand or raise')
-            self.logger.error('max_connections in postgresql.conf')
-            return False
-
-        return True
-
-    def validate_unalterable_tables(self):
-        conn = None
-        max_connections = 0
-        unalterable_tables = []
-
-        try:
-            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
-            databases = catalog.getDatabaseList(conn)
-            conn.close()
-
-            tempurl = copy.deepcopy(self.dburl)
-            for db in databases:
-                if db[0] == 'template0':
-                    continue
-                self.logger.info('Checking database %s for unalterable tables...' % db[0].decode('utf-8'))
-                tempurl.pgdb = db[0]
-                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
-                cursor = dbconn.execSQL(conn, unalterable_table_sql)
-                for row in cursor:
-                    unalterable_tables.append(row)
-                cursor.close()
-                conn.close()
-
-        except DatabaseError,ex:
-            if options.verbose:
-                logger.exception(ex)
-            logger.error('Failed to check for unalterable tables.')
-            if conn: conn.close()
-            raise ex
-
-        if len(unalterable_tables) > 0:
-            self.logger.error('The following tables cannot be altered because they contain')
-            self.logger.error('dropped columns of user defined types:')
-            for t in unalterable_tables:
-                self.logger.error('\t%s.%s' % (t[0].decode('utf-8'), t[1].decode('utf-8')))
-            self.logger.error('Please consult the documentation for instructions on how to')
-            self.logger.error('correct this issue, then run gpexpand again')
-            return False
-
-        return True
-
-    def check_unique_indexes(self):
-        """ Checks if there are tables with unique indexes.
-        Returns true if unique indexes exist"""
-
-        conn = None
-        has_unique_indexes = False
-
-        try:
-            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
-            databases = catalog.getDatabaseList(conn)
-            conn.close()
-
-            tempurl = copy.deepcopy(self.dburl)
-            for db in databases:
-                if db[0] == 'template0':
-                    continue
-                self.logger.info('Checking database %s for tables with unique indexes...' % db[0].decode('utf-8'))
-                tempurl.pgdb = db[0]
-                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
-                cursor = dbconn.execSQL(conn, has_unique_index_sql)
-                for row in cursor:
-                    has_unique_indexes = True
-                    self.unique_index_tables[row[0]] = True
-                cursor.close()
-                conn.close()
-
-        except DatabaseError,ex:
-            if options.verbose:
-                logger.exception(ex)
-            logger.error('Failed to check for unique indexes.')
-            if conn: conn.close()
-            raise ex
-
-        return has_unique_indexes
-
-    def rollback(self):
-        """Rolls back and expansion setup that didn't successfully complete"""
-        cleanSchema = False
-        status_history = self.statusLogger.get_status_history()
-        if not status_history:
-            raise ExpansionError('No status history to rollback.')
-
-        if (status_history[-1])[0] == 'EXPANSION_PREPARE_DONE':
-            raise ExpansionError('Expansion preparation complete.  Nothing to rollback')
-
-        for status in reversed(status_history):
-            if status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
-                if self.statusLogger.is_standby():
-                    self.logger.info('Running on standby master, skipping segment template rollback')
-                    continue
-                self.logger.info('Rolling back segment template build')
-                SegmentTemplate.cleanup_build_segment_template('gpexpand_schema.tar', status[1])
-
-            elif status[0] == 'BUILD_SEGMENTS_STARTED':
-                self.logger.info('Rolling back building of new segments')
-                newSegList = self.read_input_files(self.statusLogger.get_input_filename())
-                self.addNewSegments(newSegList)
-                SegmentTemplate.cleanup_build_new_segments(self.pool, self.statusLogger.get_tar_filename(),
-                                                           self.gparray, removeDataDirs=True)
-
-            elif status[0] == 'UPDATE_OLD_SEGMENTS_STARTED':
-                self.logger.info('Rolling back update of original segments')
-                self.restore_original_segments()
-
-            elif status[0] == 'UPDATE_CATALOG_STARTED':
-                self.logger.info('Rolling back master update')
-                self.restore_master()
-                self.gparray = GpArray.initFromCatalog(dburl, utility=True, useAllSegmentFileSpaces=True)
-
-            elif status[0] == 'SETUP_EXPANSION_SCHEMA_STARTED':
-                cleanSchema = True
-            else:
-                self.logger.debug('Skipping %s' % status[0])
-
-        GpStop.local('gpexpand rollback', masterOnly=True, fast=True)
-
-        if cleanSchema:
-            GpStart.local('gpexpand rollback start database restricted', restricted=True)
-            self.logger.info('Dropping expansion expansion schema')
-            schema_conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
-            try:
-                dbconn.execSQL(schema_conn, drop_schema_sql)
-                schema_conn.commit()
-                schema_conn.close()
-            except:
-                pass # schema wasn't created yet.
-            GpStop('gpexpand rollback stop database', fast=True)
-
-        self.statusLogger.remove_status_file()
-        self.statusLogger.remove_segment_configuration_backup_file()
-
-    def get_state(self):
-        """Returns expansion state from status logger"""
-        return self.statusLogger.get_current_status()[0]
-
-    def generate_inputfile(self):
-        """Writes a gpexpand input file based on expansion segments
-        added to gparray by the gpexpand interview"""
-        outputfile = 'gpexpand_inputfile_' + strftime("%Y%m%d_%H%M%S")
-        outfile = open(outputfile, 'w')
-
-        logger.info("Generating input file...")
-
-        for db in gparray.getExpansionSegDbList():
-            tempStr = "%s:%s:%d:%s:%d:%d:%s" % ( canonicalize_address( db.getSegmentHostName() )
-                                               , canonicalize_address( db.getSegmentAddress() )
-                                               , db.getSegmentPort()
-                                               , db.getSegmentDataDirectory()
-                                               , db.getSegmentDbId()
-                                               , db.getSegmentContentId()
-                                               , db.getSegmentPreferredRole()
-                                               )
-            if db.getSegmentReplicationPort() != None:
-               tempStr = tempStr + ':' + str(db.getSegmentReplicationPort())                              
-            outfile.write(tempStr + "\n")
-            
-        outfile.close()
-
-        return outputfile
-
-    #------------------------------------------------------------------------
-    def generate_filespaces_inputfile(self, outFileNamePrefix):
-        """
-        Writes a gpexpand filespace input file based on expansion segments
-        added to gparray by the gpexpand interview. If the new segments 
-        contain filespaces, then return the name of the file, else return
-        None.
-        """
-        filespaces = self.gparray.getNonSystemFilespaces()
-        if filespaces != None and len(filespaces) > 0:
-           outputfile = outFileNamePrefix + FILE_SPACES_INPUT_FILENAME_SUFFIX
-        else:
-           outputfile = None
-       
-        if outputfile != None:
-            outfileFD = open(outputfile, 'w')
-    
-            logger.info("Generating filespaces input file...")
-    
-            firstLine = FILE_SPACES_INPUT_FILE_LINE_1_PREFIX + "="
-            firstFs = True
-            for fs in filespaces:
-                if firstFs == True:
-                   firstLine = firstLine + fs.getName()
-                   firstFs = False
-                else:
-                   firstLine = firstLine + ":" + fs.getName()
-            outfileFD.write(firstLine + '\n')
-            
-            for db in gparray.getExpansionSegDbList():
-                dbid = db.getSegmentDbId()
-                outLine = str(dbid)
-                segmentFilespaces = db.getSegmentFilespaces()
-                for fs in filespaces:
-                    oid = fs.getOid()
-                    path = segmentFilespaces[oid]
-                    outLine = outLine + "|" + path
-                outfileFD.write(outLine + '\n')
-            
-            outfileFD.close()
-        
-        return outputfile
-    
-    
-    def addNewSegments(self, inputFileEntryList):
-        for seg in inputFileEntryList:
-            self.gparray.addExpansionSeg( content = int(seg.contentId)
-                                        , preferred_role = seg.role
-                                        , dbid = int(seg.dbid)
-                                        , role = seg.role
-                                        , hostname = seg.hostname.strip()
-                                        , address = seg.address.strip()
-                                        , port = int(seg.port)
-                                        , datadir = os.path.abspath(seg.datadir.strip())
-                                        , replication_port = seg.replicationPort
-                                        , fileSpaces = seg.fileSpaces
-                                        )
-        try:
-            self.gparray.validateExpansionSegs()
-        except Exception, e:
-            raise ExpansionError('Invalid input file: %s' % e)
-
-
-    def read_input_files(self, inputFilename=None):
-        """Reads and validates line format of the input file passed
-        in on the command line via the -i arg"""
-        
-        retValue = []
-        
-        if not options.filename and not inputFilename:
-            raise ExpansionError('Missing input file')
-
-        if options.filename:
-            inputFilename = options.filename
-        fsInputFilename = inputFilename + FILE_SPACES_INPUT_FILENAME_SUFFIX
-        fsOidList = []
-        fsDictionary = {}
-        f = None
-        try:
-           existsCmd = FileDirExists(name = "gpexpand see if .fs file exists", directory = fsInputFilename)
-           existsCmd.run(validateAfter = True)
-           exists = existsCmd.filedir_exists()
-           if exists == False and len(self.gparray.getNonSystemFilespaces()) != 0:
-              raise ExpansionError("Expecting filespaces input file: " + fsInputFilename)
-           if exists == True:
-              # We'll save filespace paths for segment 0 to verify if the Filespace paths in the input .fs file
-              # match with corrosponding shared filesystem filespace location prefixes
-              # E.g. Seg 0 filespace location -> /gpsql/gpseg0
-              # If Input Input file contains /gpsql/ as the filespace location for new segment that's correct
-              # But instead if Input file contains /xyz/ as the filespace location that is incorrect
-              fsPaths = []
-              filespaceDict = gparray.segments[0].primaryDB.getSegmentFilespaces()
-
-              f = open(fsInputFilename, 'r')
-              for lineNumber, l in line_reader(f):
-                  if lineNumber == 1:
-                     fsNameString = l.strip().split("=")
-                     fsNameList = fsNameString[1].strip().split(":")
-                     for name in fsNameList:
-                         oid = self.gparray.getFileSpaceOid(name)
-                         if oid == None:
-                            raise ExpansionError("Unknown filespace name: " + str(name))
-                         fsOidList.append(oid)
-
-                         # These prefix paths will be used for matching prefix paths given in the '.fs' file
-                         fsPaths.append(GpDB.getDataDirPrefix(filespaceDict[oid]))
-
-                     # Make sure all the filepace names are specified.
-                     if len(fsNameList) != len(self.gparray.getNonSystemFilespaces()):
-                        missingFsNames = []
-                        filespaces = self.gparray.getAllFilespaces()
-                        for fs in filespaces:
-                            if fs.getName() not in fsNameList:
-                               missingFsNames.append(fs.getName())
-                        raise ExpansionError("Missing filespaces: " + str(missingFsNames))
-                     
-                  else:
-                     fsLine = l.strip().split("|")
-
-                     # Verify if filespace path prefixs match input file entries
-                     for idx in range(1, len(fsLine)):
-                         if (fsPaths[idx - 1] != GpDB.getDataDirPrefix(fsLine[idx])):
-                             raise ExpansionError("Prefix mismatch for input filespace location "
-                                                  "(Expected = " + fsPaths[idx-1] +
-                                                  " Actual = " + fsLine[idx] + ")")
-                     try:
-                        fsDbid = int(fsLine[0])
-                        fsDictionary[fsLine[0]] = fsLine[1:]
-                     except Exception, e:
-                        raise ExpansionError("Problem with inputfile %s, line number %s, exceptin %s." % \
-                                             (fsInputFilename, str(lineNumber), str(e)))
-
-        except IOError, ioe:
-           raise ExpansionError('Problem with filespace input file: %s. Exception: %s' % (fsInputFilename, str(ioe)))
-        finally:
-           if f != None:
-              f.close()
-
-        try:
-            f = open(inputFilename, 'r')
-            try:
-                for line, l in line_reader(f):
-
-                    hostname, address, port, datadir, dbid, contentId, role, replicationPort \
-                        = parse_gpexpand_segment_line(inputFilename, line, l)
-
-                    filespaces = {}
-                    if len(fsDictionary) > 0:
-                       fileSpacesPathList = fsDictionary[dbid]
-                    else:
-                       fileSpacesPathList = []
-                    index = 0
-                    for oid in fsOidList:
-                        filespaces[oid] = fileSpacesPathList[index]
-                        index = index + 1 
-
-                    # Check that input values look reasonable.
-                    if hostname == None or len(hostname) == 0:
-                       raise ExpansionError("Invalid host name on line " + str(line))
-                    if address == None or len(address) == 0:
-                       raise ExpansionError("Invaid address on line " + str(line))
-                    if port == None or str(port).isdigit() == False or int(port) < 0: 
-                       raise ExpansionError("Invalid port number on line " + str(line))
-                    if datadir == None or len(datadir) == 0:
-                       raise ExpansionError("Invalid data directory on line " + str(line))
-                    if dbid == None or str(dbid).isdigit() == False or int(dbid) < 0:
-                       raise ExpansionError("Invalid dbid on line " + str(line))
-                    if contentId == None or str(contentId).isdigit() == False or int(contentId) < 0:
-                       raise ExpansionError("Invalid contentId on line " + str(line))
-                    if role == None or len(role) > 1 or (role != 'p' and role != 'm'):
-                       raise ExpansionError("Invalid role on line " + str(line))
-                    if replicationPort != None and int(replicationPort) < 0:
-                       raise ExpansionError("Invalid replicationPort on line " + str(line))
-
-                    retValue.append(NewSegmentInput( hostname = hostname
-                                                   , port = port
-                                                   , address = address
-                                                   , datadir = datadir
-                                                   , dbid = dbid
-                                                   , contentId = contentId
-                                                   , role = role
-                                                   , replicationPort = replicationPort
-                                                   , fileSpaces = filespaces
-                                                   ) )
-            except ValueError:
-                raise ExpansionError('Missing or invalid value on line %d.' % line)
-            except Exception, e:
-                raise ExpansionError('Invalid input file on line %d: %s' % (line, str(e)))
-            finally:
-                f.close()
-            return retValue
-        except IOError:
-            raise ExpansionError('Input file %s not found' % options.filename)
-
-
-    def add_segments(self):
-        """Starts the process of adding the new segments to the array"""
-        self.segTemplate = SegmentTemplate(self.logger, self.statusLogger, self.pool,
-                                           self.gparray, options.master_data_directory,
-                                           self.dburl,self.conn, self.tempDir)
-        try:
-            self.segTemplate.build_segment_template()
-            self.segTemplate.build_new_segments()
-        except SegmentTemplateError, msg:
-            raise ExpansionError(msg)
-
-
-    def update_original_segments(self):
-        """Updates the pg_hba.conf file and updates the gp_id catalog table
-        of existing hosts"""
-        self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_STARTED', self.gparray.get_primary_count())
-
-        self.logger.info('Backing up pg_hba.conf file on original segments')
-
-        # backup pg_hba.conf file on original segments
-        for seg in self.old_segments:
-            if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
-                continue
-
-            hostname = seg.getSegmentHostName()
-            datadir = seg.getSegmentDataDirectory()
-
-            srcFile = datadir + '/pg_hba.conf'
-            dstFile = datadir + '/pg_hba.gpexpand.bak'
-            cpCmd = RemoteCopy('gpexpand back up pg_hba.conf file on original segments',
-                               srcFile, hostname, dstFile, ctxt=REMOTE, remoteHost=hostname)
-
-            self.pool.addCommand(cpCmd)
-
-        self.pool.join()
-
-        try:
-            self.pool.check_results()
-        except ExecutionError, msg:
-            raise ExpansionError('Failed to configure original segments: %s' % msg)
-
-
-        # Copy the new pg_hba.conf file to original segments
-        self.logger.info('Copying new pg_hba.conf file to original segments')
-        for seg in self.old_segments:
-            if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
-                continue
-
-            hostname = seg.getSegmentHostName()
-            datadir = seg.getSegmentDataDirectory()
-
-            cpCmd = RemoteCopy('gpexpand copy new pg_hba.conf file to original segments',
-                               self.tempDir + '/pg_hba.conf', hostname, datadir)
-
-            self.pool.addCommand(cpCmd)
-
-        self.pool.join()
-
-        try:
-            self.pool.check_results()
-        except ExecutionError, msg:
-            raise ExpansionError('Failed to configure original segments: %s' % msg)
-
-        # Update the gp_id of original segments
-        self.newPrimaryCount = 0;
-        for seg in self.gparray.getExpansionSegDbList():
-            if seg.isSegmentPrimary(False):
-                self.newPrimaryCount += 1
-
-        self.newPrimaryCount += self.gparray.get_primary_count()
-
-        self.logger.info('Configuring original segments')
-
-        orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getSegDbList())
-
-        if self.segTemplate:
-            self.segTemplate.cleanup()
-
-        self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_DONE')
-
-
-    def restore_original_segments(self):
-        """ Restores the original segments back to their state prior the expansion
-        setup.  This is only possible if the expansion setup has not completed
-        successfully."""
-        self.logger.info('Restoring original segments')
-        gp_segment_configuration_backup_file = self.statusLogger.get_gp_segment_configuration_backup();
-        if gp_segment_configuration_backup_file:
-            originalArray = GpArray.initFromFile(self.statusLogger.get_gp_segment_configuration_backup())
-        else:
-            originalArray = self.gparray
-
-        originalPrimaryCount = self.statusLogger.get_original_primary_count()
-
-        # Restore pg_hba.conf file from backup
-        self.logger.info('Restoring pg_hba.conf file on original segments')
-        for seg in originalArray.getSegDbList():
-
-            datadir = seg.getSegmentDataDirectory()
-            hostname = seg.getSegmentHostName()
-
-            srcFile = datadir + '/pg_hba.gpexpand.bak'
-            dstFile = datadir + '/pg_hba.conf'
-            cpCmd = RemoteCopy('gpexpand restore of pg_hba.conf file on original segments',
-                               srcFile, hostname, dstFile, ctxt=REMOTE,
-                               remoteHost=hostname)
-
-            self.pool.addCommand(cpCmd)
-
-        self.pool.join()
-
-        try:
-            self.pool.check_results()
-        except:
-            # Setup didn't get this far so no backup to restore.
-            self.pool.empty_completed_items()
-
-        # note: this code may not be needed -- it will NOT change gp_id
-        #       However, the call to gpconfigurenewsegment may still be doing some needed work (stopping the segment)
-        #       which could be unnecessary or could be moved here)
-        self.logger.info('Restoring original segments catalog tables')
-        orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(originalArray.getSegDbList())
-        for host in iter(orig_segment_info):
-            segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', orig_segment_info[host],
-                                           verbose=gplog.logging_is_verbose(), batchSize=options.batch_size,
-                                           ctxt=REMOTE, remoteHost=host)
-            self.pool.addCommand(segCfgCmd)
-
-        self.pool.join()
-
-        try:
-            self.pool.check_results()
-        except ExecutionError:
-            raise ExpansionError('Failed to restore original segments')
-
-
-    def _construct_filespace_parameter(self, seg, gpFSobjList):
-        """ return a string containing a filespace parameter appropriate for use in sql functions. """
-        filespaces = []
-        segFilespaces = seg.getSegmentFilespaces()
-        filespaceNames = []
-        filespaceLocations = []
-        for entry in gpFSobjList:
-            name = entry.getName()
-            oid = entry.getOid()
-            location = segFilespaces[oid]
-            filespaceNames.append(name)
-            filespaceLocations.append(location)
-        for i in range(len(filespaceNames)):
-            entry = [filespaceNames[i] , filespaceLocations[i]]
-            filespaces.append(entry)
-        return str(filespaces)
-
-
-    def update_catalog(self):
-        """ 
-        Starts the database, calls updateSystemConfig() to setup 
-        the catalog tables and get the actual dbid and content id 
-        for the new segments. 
-        """
-        self.statusLogger.set_gp_segment_configuration_backup(options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE)
-        gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup())
-        self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup())
-
-        self.logger.info('Starting HAWQ in restricted mode')
-        startCmd = GpStart('gpexpand update master start database restricted mode', restricted = True, verbose = True)
-        startCmd.run(validateAfter=True)
-
-        # Update the catalog
-        configurationInterface.getConfigurationProvider().updateSystemConfig(
-            self.gparray,
-            "%s: segment config for resync" % getProgramName(),
-            dbIdToForceMirrorRemoveAdd = {},
-            useUtilityMode = True,
-            allowPrimary = True
-        )
-
-        # The content IDs may have changed, so we must make sure the array is in proper order.
-        self.gparray.reOrderExpansionSegs()
-
-        # Issue checkpoint due to forced shutdown below
-        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
-        dbconn.execSQL(self.conn, "CHECKPOINT")
-        self.conn.close()
-
-        self.logger.info('Stopping database')
-        stopCmd = GpStop('gpexpand update master stop database', verbose = True, ctxt = LOCAL, force=True)
-        # We do not check the results of GpStop becuase we will get errors for all the new segments.
-        stopCmd.run(validateAfter = False)
-
-        self.statusLogger.set_status('UPDATE_CATALOG_DONE')
-
-    #--------------------------------------------------------------------------
-    def restore_master(self):
-        """Restores the gp_segment_configuration catalog table for rollback"""
-        originalPrimaryCount = self.statusLogger.get_original_primary_count()
-        backupFile = self.statusLogger.get_gp_segment_configuration_backup()
-
-        if not os.path.exists(backupFile):
-            raise ExpansionError('gp_segment_configuration backup file %s does not exist' % backupFile)
-
-        # Create a new gpArray from the backup file
-        array = GpArray.initFromFile(backupFile)
-        arrayLen = len(array.getSegDbList())
-
-        originalDbIds = ""
-        originalDbIdsList = []
-        first = True
-        for seg in array.getDbList():
-            originalDbIdsList.append(int(seg.getSegmentDbId()))
-            if first == False:
-               originalDbIds += ", "
-            first = False
-            originalDbIds += str(seg.getSegmentDbId())
-        
-        if len(originalDbIds) > 0:
-           # Update the catalog with the contents of the backup
-           restore_conn = None
-           try:
-               restore_conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods='dml')
-
-               # Get a list of all the expand primary segments
-               sqlStr = "select dbid from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'p'" % str(originalDbIds)
-               curs = dbconn.execSQL(restore_conn, sqlStr)
-               deleteDbIdList = []
-               rows = curs.fetchall()
-               for row in rows:
-                   deleteDbIdList.append(int(row[0])) 
-
-               #
-               # The following is a sanity check to make sure we don't do something bad here.
-               #
-               if len(originalDbIdsList) < 2:
-                  self.logger.error("The original DB DIS list is to small to be correct: %s " % str(len(originalDbIdsList)))
-                  raise Exception("Unable to complete rollback")
-
-               totalToDelete = len(deleteDbIdList)
-               if int(totalToDelete) > int(self.statusLogger.get_number_new_segments()):
-                  self.logger.error("There was a discrepancy between the number of expand segments to rollback (%s), and the expected number of segment to rollback (%s)" \
-                                    % (str(totalToDelete), str(self.statusLogger.get_number_new_segments())))
-                  self.logger.error("  Expanded segment dbids = %s", str(deleteDbIdList))
-                  raise Exception("Unable to complete rollback")
-
-               for dbid in deleteDbIdList:
-                  sqlStr = "select * from gp_remove_segment(%s::smallint)" % str(dbid)
-                  dbconn.execSQL(restore_conn, sqlStr)      
-
-               restore_conn.commit()
-           except Exception, e:
-               raise Exception("Unable to restore master. Exception: " + str(e))
-           finally:
-               if restore_conn != None:
-                  restore_conn.close()
-
-    def start_prepare(self):
-        """Inserts into gpexpand.status that expansion preparation has started."""
-        if options.filename:
-            self.statusLogger.create_status_file()
-            self.statusLogger.set_status('EXPANSION_PREPARE_STARTED', os.path.abspath(options.filename))
-
-    def finalize_prepare(self):
-        """Removes the gpexpand status file and segment configuration backup file"""
-        self.statusLogger.remove_status_file()
-        self.statusLogger.remove_segment_configuration_backup_file()
-        self.pastThePointOfNoReturn = True;
-
-    def setup_schema(self):
-        """Used to setup the gpexpand schema"""
-        startCmd = GpStart('gpexpand update master start database restricted', restricted=True, verbose=True)
-        startCmd.run(validateAfter=True)
-
-        # Need to restore the connection used by the expansion
-        self.conn = dbconn.connect(self.dburl, encoding='UTF8')
-        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_STARTED')
-        self.logger.info('Creating expansion schema')
-        dbconn.execSQL(self.conn,create_schema_sql)
-        dbconn.execSQL(self.conn,status_table_sql)
-        dbconn.execSQL(self.conn,status_detail_table_sql)
-
-        # views
-        dbconn.execSQL(self.conn, logical_status_view_sql)
-        dbconn.execSQL(self.conn,progress_view_sql)
-
-        self.conn.commit()
-
-        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_DONE')
-
-    def prepare_schema(self):
-        """Prepares the gpexpand schema"""
-        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_STARTED')
-
-        if not self.conn:
-            self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
-            self.gparray = GpArray.initFromCatalog(self.dburl, useAllSegmentFileSpaces=True)
-
-        nowStr = datetime.datetime.now()
-        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP', '%s' ) " % (gpexpand_schema,status_table,nowStr)
-
-        dbconn.execSQL(self.conn,statusSQL)
-
-        db_list = catalog.getDatabaseList(self.conn)
-
-        for db in db_list:
-            dbname=db[0]
-            if dbname == 'template0':
-                continue
-            self.logger.info('Populating %s.%s with data from database %s' % (gpexpand_schema, status_detail_table, dbname.decode('utf-8')) )
-            self._populate_regular_tables(dbname)
-            self._populate_partitioned_tables(dbname)
-            inject_fault('gpexpand MPP-14620 fault injection')
-            self._update_distribution_policy(dbname)
-
-        nowStr = datetime.datetime.now()
-        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP DONE', '%s' ) " % (gpexpand_schema,status_table,nowStr)
-        dbconn.execSQL(self.conn, statusSQL)
-
-        self.conn.commit()
-
-
-        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_DONE')
-        self.statusLogger.set_status('EXPANSION_PREPARE_DONE')
-
-        # At this point, no rollback is possible and the the system
-        # including new segments has been started once before so finalize
-        self.finalize_prepare()
-
-        self.logger.info('Stopping HAWQ')
-        GpStop.local('gpexpand setup complete', fast=True)
-
-
-    def _populate_regular_tables(self,dbname):
-        """ we don't do 3.2+ style partitioned tables here, but we do
-            all other table types.
-        """
-
-        sql="""SELECT
-    n.nspname || '.' || c.relname as fq_name,
-    n.oid as schemaoid,
-    c.oid as tableoid,
-    p.attrnums as distribution_policy,
-    now() as last_updated,
-    pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))
-FROM
-            pg_class c
-    JOIN pg_namespace n ON (c.relnamespace=n.oid)
-    JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid)
-WHERE
-    c.oid NOT IN ( SELECT parrelid as oid FROM pg_partition
-                    UNION
-                   SELECT parchildrelid as oid FROM pg_partition_rule
-                 )
-    AND n.nspname != 'gpexpand'
-    AND n.nspname != 'pg_bitmapindex'
-    AND c.relstorage != 'x';
-
-                  """
-        self.logger.debug(sql)
-        table_conn = self.connect_database(dbname)
-        curs = dbconn.execSQL(table_conn, sql)
-        rows = curs.fetchall()
-        try:
-            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
-            self.logger.debug('status_detail data file: %s' % sql_file)
-            fp = open(sql_file, 'w')
-            for row in rows:
-                fqname = row[0]
-                schema_oid = row[1]
-                table_oid = row[2]
-                if row[3]:
-                    self.logger.debug("dist policy raw: %s " % row[3].decode('utf-8'))
-                else:
-                    self.logger.debug("dist policy raw: NULL")
-                dist_policy = row[3]
-                (policy_name,policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
-                ts = datetime.datetime.now()
-                rel_bytes = int(row[5])
-
-                if dist_policy is None:
-                    dist_policy = 'NULL'
-
-                full_name = '%s.%s' % (dbname, fqname)
-                rank = 1 if self.unique_index_tables.has_key(full_name) else 2
-
-                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\t%s\n""" % (dbname,fqname,schema_oid,table_oid,
-                                                                                             dist_policy,policy_name,policy_oids,
-                                                                                             rank, undone_status, rel_bytes, ts))
-        except Exception, e:
-            raise ExpansionError(e)
-        finally:
-            if fp: fp.close()
-
-        try:
-            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)
-
-            self.logger.debug(copySQL)
-            dbconn.execSQL(self.conn,copySQL)
-        except Exception, e:
-            raise ExpansionError(e)
-        finally:
-            os.unlink(sql_file)
-
-        table_conn.commit()
-        table_conn.close()
-
-
-    def _populate_partitioned_tables(self,dbname):
-        """population of status_detail for partitioned tables. """
-        sql="""
-SELECT
-    p.partitionschemaname || '.' || p.partitiontablename as fq_name,
-    n.oid as schemaoid,
-    c2.oid as tableoid,
-    d.attrnums as distributed_policy,
-    now() as last_updated,
-    pg_relation_size(quote_ident(p.partitionschemaname) || '.' || quote_ident(p.partitiontablename)),
-    partitiontype,partitionlevel,partitionrank,partitionposition,
-    partitionrangestart
-FROM
-    pg_partitions p,
-    pg_class c,
-    pg_class c2,
-    pg_namespace n,
-    pg_namespace n2,
-    gp_distribution_policy d
-WHERE
-    quote_ident(p.tablename) = quote_ident(c.relname)
-    AND    d.localoid = c2.oid
-    AND quote_ident(p.schemaname) = quote_ident(n.nspname)
-    AND c.relnamespace = n.oid
-    AND p.partitionlevel = (select max(parlevel) FROM pg_partition WHERE parrelid = c.oid)
-    AND quote_ident(p.partitionschemaname) = quote_ident(n2.nspname)
-    AND quote_ident(p.partitiontablename) = quote_ident(c2.relname)
-    AND c2.relnamespace = n2.oid
-ORDER BY tablename, c2.oid desc;
-                  """
-        self.logger.debug(sql)
-        table_conn = self.connect_database(dbname)
-        curs = dbconn.execSQL(table_conn, sql)
-        rows = curs.fetchall()
-
-        try:
-            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
-            self.logger.debug('status_detail data file: %s' % sql_file)
-            fp = open(sql_file, 'w')
-
-            for row in rows:
-                fqname = row[0]
-                schema_oid = row[1]
-                table_oid = row[2]
-                if row[3]:
-                    self.logger.debug("dist policy raw: %s " % row[3])
-                else:
-                    self.logger.debug("dist policy raw: NULL")
-                dist_policy = row[3]
-                (policy_name,policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
-                ts = datetime.datetime.now()
-                rel_bytes = int(row[5])
-
-                if dist_policy is None:
-                    dist_policy = 'NULL'
-
-                full_name = '%s.%s' % (dbname, fqname)
-                rank = 1 if self.unique_index_tables.has_key(full_name) else 2
-
-                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\t%s\n""" % (dbname,fqname,schema_oid,table_oid,
-                                                                                             dist_policy,policy_name,policy_oids,
-                                                                                             rank, undone_status, rel_bytes, ts))
-        except Exception:
-            raise
-        finally:
-            if fp: fp.close()
-
-        try:
-            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)
-
-            self.logger.debug(copySQL)
-            dbconn.execSQL(self.conn,copySQL)
-        except Exception, e:
-            raise ExpansionError(e)
-        finally:
-            os.unlink(sql_file)
-
-        table_conn.commit()
-        table_conn.close()
-
-
-    def _update_distribution_policy(self, dbname):
-        """ NULL out the distribution policy for both
-            regular and paritioned table before expansion
-        """
-        
-        table_conn = self.connect_database(dbname)
-        #null out the dist policies
-        sql = """
-UPDATE  gp_distribution_policy
-  SET attrnums = NULL
-FROM pg_class c
-    JOIN pg_namespace n ON (c.relnamespace=n.oid)
-WHERE
-    localoid = c.oid
-    AND c.oid NOT IN ( SELECT parrelid as oid FROM pg_partition
-                    UNION
-                   SELECT parchildrelid as oid FROM pg_partition_rule
-                 )
-    AND n.nspname != 'gpexpand';
-        """
-
-        self.logger.debug(sql)
-        dbconn.execSQL(table_conn,sql)
-
-        sql = """
-UPDATE gp_distribution_policy
-    SET attrnums = NULL
-    FROM
-        ( SELECT pp.parrelid AS table

<TRUNCATED>


Mime
View raw message