hawq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oliver-Luo <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request: HAWQ-760. Add hawq register to suppor...
Date Mon, 30 May 2016 01:27:56 GMT
Github user Oliver-Luo commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/679#discussion_r65015976
  
    --- Diff: tools/bin/hawqregister ---
    @@ -0,0 +1,272 @@
    +#!/usr/bin/env python
    +# -*- coding: utf-8 -*-
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +# 
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +# 
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +'''
    +hawq register [options] database_name table_name file_or_dir_path_in_hdfs
    +'''
    +import os, sys, optparse, getpass, re, urlparse
    +try:
    +    from gppylib.commands.unix import getLocalHostname, getUserName
    +    from gppylib.db import dbconn
    +    from gppylib.gplog import get_default_logger, setup_tool_logging
    +    from gppylib.gpparseopts import OptParser, OptChecker
    +    from pygresql import pg
    +    from pygresql.pgdb import DatabaseError
    +    from hawqpylib.hawqlib import local_ssh, local_ssh_output
    +except ImportError, e:
    +    print e
    +    sys.stderr.write('cannot import module, please check that you have source greenplum_path.sh\n')
    +    sys.exit(2)
    +
    +# setup logging
    +logger = get_default_logger()
    +EXECNAME = os.path.split(__file__)[-1]
    +setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
    +
    +def create_opt_parser(version):
    +    parser = OptParser(option_class=OptChecker,
    +                       usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs',
    +                       version=version)
    +    parser.remove_option('-h')
    +    parser.add_option('-?', '--help', action='help')
    +    parser.add_option('-h', '--host', help="host of the target DB")
    +    parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0)
    +    parser.add_option('-U', '--user', help="username of the target DB")
    +    return parser
    +
    +
    +def get_seg_name(options, databasename, tablename):
    +    try:
    +        relfilenode = 0
    +        relname = ""
    +        query = "select pg_class2.relname from pg_class as pg_class1, pg_appendonly,
pg_class as pg_class2 where pg_class1.relname ='%s' and  pg_class1.oid = pg_appendonly.relid
and pg_appendonly.segrelid = pg_class2.oid;" % tablename
    +        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
    +        conn = dbconn.connect(dburl, True)
    +        rows = dbconn.execSQL(conn, query)
    +	conn.commit()
    +        if rows.rowcount == 0:
    +            logger.error("table '%s' not found in db '%s'" % (tablename, databasename));
    +            sys.exit(1)
    +        for row in rows:
    +            relname = row[0]
    +        conn.close()
    +
    +    except DatabaseError, ex:
    +        logger.error("Failed to connect to database, this script can only be run when
the database is up")
    +        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
    +        sys.exit(1)
    +
    +    # check whether the target table is parquet format
    +    if relname.find("paq") == -1:
    +        logger.error("table '%s' is not parquet format" % tablename)
    +        sys.exit(1)
    +
    +    return relname
    +
    +
    +def check_hash_type(options, databasename, tablename):
    +    try:
    +        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname
= '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    +        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
    +        conn = dbconn.connect(dburl, False)
    +        rows = dbconn.execSQL(conn, query)
    +	conn.commit()
    +        if rows.rowcount == 0:
    +            logger.error("target not found in table gp_distribution_policy")
    +            sys.exit(1)
    +        for row in rows:
    +            if row[0] != None:
    +                logger.error("Cannot register file(s) to a table which is hash-typed")
    +                sys.exit(1)
    +
    +        conn.close()
    +
    +    except DatabaseError, ex:
    +        logger.error("Failed to connect to database, this script can only be run when
the database is up")
    +        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
    +        sys.exit(1)
    +
    +
    +def get_files_in_hdfs(filepath):
    +    files = []
    +    sizes = []
    +    hdfscmd = "hadoop fs -test -e %s" % filepath
    +    result = local_ssh(hdfscmd)
    +    if result != 0:
    +        logger.error("Path '%s' does not exist in hdfs" % filepath)
    +        sys.exit(1)
    +    
    +    hdfscmd = "hadoop fs -ls -R %s" % filepath
    +    result, out, err = local_ssh_output(hdfscmd)
    +    outlines = out.splitlines()
    +
    +    # recursively search all the files under path 'filepath'
    +    i = 0
    +    for line in outlines:
    +        lineargs = line.split()
    +        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
    +            files.append(lineargs[7])
    +            sizes.append(int(lineargs[4]))
    +
    +    if len(files) == 0:
    +        logger.error("Dir '%s' is empty" % filepath)
    +        sys.exit(1)
    +
    +    return files, sizes
    +
    +
    +def check_parquet_format(options, files):
    +    # check whether the files are parquet format by checking the first and last four
bytes
    +    for file in files:
    +        hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file
    +        result1 = local_ssh(hdfscmd)
    +        hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file
    +        result2 = local_ssh(hdfscmd)
    +        if result1 or result2:
    +            logger.error("File %s is not parquet format" % file)
    +            sys.exit(1)
    +
    +
    +def get_metadata_from_database(options, databasename, seg_name):
    +    try:
    +        query = "select segno from pg_aoseg.%s;" % seg_name
    +        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
    +        conn = dbconn.connect(dburl, False)
    +        rows = dbconn.execSQL(conn, query)
    +	conn.commit()
    +        conn.close()
    +
    +    except DatabaseError, ex:
    +        logger.error("Failed to connect to database, this script can only be run when
the database is up")
    +        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
    +        sys.exit(1)
    +
    --- End diff --
    
    > The main hawq register steps are as follows:
    > get_files_in_hdfs(filepath)
    > check_parquet_format(options, files)
    > firstsegno = get_metadata_from_database(options, databasename, seg_name)
    > move_files_in_hdfs(options, databasename, tablename, files, firstsegno, True)
    > insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno,
sizes)
    
    If one of the reads fails, we just exit cause no file has been changed. If move_files_in_hdfs
fails, the metadata will not be inserted. If insert_metadata_into_database fails, we will
move the files back to their original location.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message