hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/3] incubator-hawq git commit: HAWQ-991. Rewrite hawqregister to support registering from yaml file.
Date Fri, 19 Aug 2016 02:59:13 GMT
HAWQ-991. Rewrite hawqregister to support registering from yaml file.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2596be6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2596be6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2596be6e

Branch: refs/heads/master
Commit: 2596be6e9c87da3d23a13f92e85307b785bee5d5
Parents: 7661dec
Author: xunzhang <xunzhangthu@gmail.com>
Authored: Tue Aug 9 19:39:13 2016 +0800
Committer: rlei <rlei@pivotal.io>
Committed: Fri Aug 19 10:57:09 2016 +0800

----------------------------------------------------------------------
 .../ManagementTool/test_hawq_register.cpp       |  20 +-
 tools/bin/hawqextract                           |   0
 tools/bin/hawqregister                          | 256 +++++++++----------
 3 files changed, 130 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/src/test/feature/ManagementTool/test_hawq_register.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register.cpp b/src/test/feature/ManagementTool/test_hawq_register.cpp
index a7982b3..afc2cb4 100644
--- a/src/test/feature/ManagementTool/test_hawq_register.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register.cpp
@@ -27,7 +27,7 @@ TEST_F(TestHawqRegister, TestSingleHawqFile) {
 	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_hawq.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq
hawqregister"));
 
 	util.query("select * from hawqregister;", 3);
 	util.execute("insert into hawqregister values(1);");
@@ -46,7 +46,7 @@ TEST_F(TestHawqRegister, TestSingleHiveFile) {
 	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_hive.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hive.paq
hawqregister"));
 
 	util.query("select * from hawqregister;", 1);
 	util.execute("insert into hawqregister values(1);");
@@ -67,7 +67,7 @@ TEST_F(TestHawqRegister, TestDataTypes) {
 	util.execute("create table hawqregister(a bool, b int2, c int2, d int4, e int8, f date,
g float4, h float8, i varchar, j bytea, k char, l varchar) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_data_types.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq
hawqregister"));
 
 	util.query("select * from hawqregister;", 1);
 	util.execute("drop table hawqregister;");
@@ -87,7 +87,7 @@ TEST_F(TestHawqRegister, TestAllNULL) {
 	util.execute("create table hawqregister(a bool, b int2, c int2, d int4, e int8, f date,
g float4, h float8, i varchar, j bytea, k char, l varchar) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_data_types.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_data_types.paq
hawqregister"));
 
 	util.query("select * from hawqregister;", 1);
 	util.execute("drop table hawqregister;");
@@ -113,7 +113,7 @@ TEST_F(TestHawqRegister, TestFiles) {
 	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(0, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_test"));
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test
hawqregister"));
 
 	util.query("select * from hawqregister;", 12);
 	util.execute("insert into hawqregister values(1);");
@@ -133,7 +133,7 @@ TEST_F(TestHawqRegister, TestHashDistributedTable) {
 	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet)
distributed by (i);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_hawq.paq"));
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq
hawqregister"));
 	util.query("select * from hawqregister;", 0);
 
 	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq"));
@@ -151,7 +151,7 @@ TEST_F(TestHawqRegister, TestNotParquetFile) {
 	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_test_not_paq"));
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_test_not_paq
hawqregister"));
 	util.query("select * from hawqregister;", 0);
 
 	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_test_not_paq"));
@@ -169,7 +169,7 @@ TEST_F(TestHawqRegister, TestNotParquetTable) {
 	util.execute("create table hawqregister(i int);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
hdfs://localhost:8020/hawq_register_hawq.paq"));
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f hdfs://localhost:8020/hawq_register_hawq.paq
hawqregister"));
 	util.query("select * from hawqregister;", 0);
 
 	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq"));
@@ -182,7 +182,7 @@ TEST_F(TestHawqRegister, TestFileNotExist) {
 	util.execute("create table hawqregister(i int);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + " hawqregister
/hdfs://localhost:8020hawq_register_file_not_exist"));
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f /hdfs://localhost:8020hawq_register_file_not_exist
hawqregister"));
 	util.query("select * from hawqregister;", 0);
 
 	util.execute("drop table hawqregister;");
@@ -199,7 +199,7 @@ TEST_F(TestHawqRegister, TestNotHDFSPath) {
 	util.execute("create table hawqregister(i int);");
 	util.query("select * from hawqregister;", 0);
 
-	EXPECT_EQ(1, Command::getCommandStatus("hawq register " + (string) HAWQ_DB + "hawqregister
/hawq_register_hawq.paq"));
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -f /hawq_register_hawq.paq
hawqregister"));
 	util.query("select * from hawqregister;", 0);
 
 	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm hdfs://localhost:8020/hawq_register_hawq.paq"));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2596be6e/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 380a548..6700f54 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,9 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-'''
-hawq register [options] database_name table_name file_or_dir_path_in_hdfs
-'''
+# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath]
tablename
+# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] tablename
 import os, sys, optparse, getpass, re, urlparse
 try:
     from gppylib.commands.unix import getLocalHostname, getUserName
@@ -40,133 +39,137 @@ EXECNAME = os.path.split(__file__)[-1]
 setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
 
 
-def create_opt_parser(version):
+def option_parser():
     parser = OptParser(option_class=OptChecker,
-                       usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs',
-                       version=version)
+                       usage='usage: %prog [options] table_name',
+                       version='%prog version $Revision: #1 $')
     parser.remove_option('-h')
     parser.add_option('-?', '--help', action='help')
-    parser.add_option('-h', '--host', help="host of the target DB")
-    parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0)
-    parser.add_option('-U', '--user', help="username of the target DB")
-    return parser
-
-
-def check_hadoop_command():
-    hdfscmd = "hadoop"
-    result = local_ssh(hdfscmd);
-    if result != 0:
-        logger.error("command 'hadoop' is not available, please set environment variable
$PATH to fix this")
+    parser.add_option('-h', '--host', help='host of the target DB')
+    parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0)
+    parser.add_option('-U', '--user', help='username of the target DB')
+    parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database
name')
+    parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS')
+    parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration
file in YAML format')
+    return parser.parse_args()
+
+
+def option_parser_yml(yml_file):
+    import yaml
+    with open(yml_file, 'r') as f:
+        params = yaml.load(f)
+    if params['FileFormat'] == 'Parquet':
+        offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
+        filepath = params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset]
if len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path']
+        return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy']
+    offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
+    filepath = params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset]
if len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path']
+    return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy']
+
+
+def create_table(dburl, tablename, schema_info, fmt, distrbution_policy):
+    try:
+        schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
+        fmt = 'ROW' if fmt == 'AO' else fmt
+        query = 'create table %s(%s) with (appendonly=true, orientation=%s) %s;' % (tablename,
schema, fmt, distrbution_policy)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+        conn.commit()
+    except DatabaseError, ex:
+        logger.error('Failed to execute query ""%s"' % query)
         sys.exit(1)
 
 
-def get_seg_name(options, databasename, tablename):
+def get_seg_name(dburl, tablename, database):
     try:
-        relfilenode = 0
-        relname = ""
-        query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class
as pg_class2 where pg_class1.relname ='%s' "
-                 "and  pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;")
% tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        relname = ''
+        query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class
as pg_class2 "
+                 "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and
pg_appendonly.segrelid = pg_class2.oid;") % tablename
         conn = dbconn.connect(dburl, True)
         rows = dbconn.execSQL(conn, query)
-	conn.commit()
-        if rows.rowcount == 0:
-            logger.error("table '%s' not found in db '%s'" % (tablename, databasename));
+        conn.commit()
+        if not rows.rowcount:
+            logger.error('table "%s" not found in db "%s"' % (tablename, database))
             sys.exit(1)
         for row in rows:
             relname = row[0]
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be run when the
database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
-        sys.exit(1)
-
-    # check whether the target table is parquet format
-    if relname.find("paq") == -1:
-        logger.error("table '%s' is not parquet format" % tablename)
+        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
         sys.exit(1)
-
     return relname
 
 
-def check_hash_type(options, databasename, tablename):
+def check_hash_type(dburl, tablename):
+    '''Check whether target table is hash-typed, in that case simple insertion does not work'''
     try:
         query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname
= '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-	conn.commit()
-        if rows.rowcount == 0:
-            logger.error("target not found in table gp_distribution_policy")
+        conn.commit()
+        if not rows.rowcount:
+            logger.error('Target not found in table gp_distribution_policy.')
             sys.exit(1)
         for row in rows:
-            if row[0] != None:
-                logger.error("Cannot register file(s) to a table which is hash-typed")
+            if row[0]:
+                logger.error('Cannot register file(s) to a table which is hash-typed.')
                 sys.exit(1)
-
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be run when the
database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
 
 
-def get_metadata_from_database(options, databasename, tablename, seg_name):
+def get_metadata_from_database(dburl, tablename, seg_name):
+    '''Get the metadata to be inserted from hdfs'''
     try:
-        query = "select segno from pg_aoseg.%s;" % seg_name
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        query = 'select segno from pg_aoseg.%s;' % seg_name
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-	conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be run when the
database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
 
     firstsegno = rows.rowcount + 1
 
-    # get the full path of correspoding file for target table
     try:
+        # get the full path of correspoding file for target table
         query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid,
relfilenode from pg_class, gp_persistent_relation_node, "
-                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname
= '%s' and pg_class.relfilenode = "
-                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid
= gp_persistent_tablespace_node.tablespace_oid "
-                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;")
% tablename
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+             "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname =
'%s' and pg_class.relfilenode = "
+             "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid
= gp_persistent_tablespace_node.tablespace_oid "
+             "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;")
% tablename
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
-	conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be run when the
database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
-
     for row in rows:
         tabledir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3])
+ "/"
-
+        #tabledir = '/'.join([row[0], str(row[1]), str(row[2]), str(row[3]), ''])
     return firstsegno, tabledir
 
 
 def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
+    '''Check whether all the files refered by 'filepath' and the location corresponding to
the table are in the same hdfs cluster'''
     # check whether the files to be registered is in hdfs
     filesystem = filepath.split('://')
     if filesystem[0] != 'hdfs':
-        logger.error("Only support to register file(s) in hdfs")
+        logger.error('Only support to register file(s) in hdfs')
         sys.exit(1)
     fileroot = filepath.split('/')
     tableroot = tabledir.split('/')
     # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check
'hdfs://localohst:8020'
-    if fileroot[0] != tableroot[0] or fileroot[1] != tableroot[1] or fileroot[2] != tableroot[2]:
+    if fileroot[0:3] != tableroot[0:3]:
         logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s)
to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
         sys.exit(1)
 
 
 def get_files_in_hdfs(filepath):
+    '''Get all the files refered by 'filepath', which could be a file or a directory containing
all the files'''
     files = []
     sizes = []
     hdfscmd = "hadoop fs -test -e %s" % filepath
@@ -174,52 +177,52 @@ def get_files_in_hdfs(filepath):
     if result != 0:
         logger.error("Path '%s' does not exist in hdfs" % filepath)
         sys.exit(1)
-
     hdfscmd = "hadoop fs -ls -R %s" % filepath
+    print filepath
     result, out, err = local_ssh_output(hdfscmd)
     outlines = out.splitlines()
-
     # recursively search all the files under path 'filepath'
-    i = 0
     for line in outlines:
         lineargs = line.split()
         if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
             files.append(lineargs[7])
             sizes.append(int(lineargs[4]))
-
     if len(files) == 0:
         logger.error("Dir '%s' is empty" % filepath)
         sys.exit(1)
-
     return files, sizes
 
 
-def check_parquet_format(options, files):
-    # check whether the files are parquet format by checking the first and last four bytes
-    for file in files:
-        hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file
+def check_parquet_format(files):
+    '''Check whether the file to be registered is parquet format'''
+    for f in files:
+        hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f
+        rc, out, err = local_ssh_output(hdfscmd)
+        if out == '0':
+            continue
+        hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f
         result1 = local_ssh(hdfscmd)
-        hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file
+        hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f
         result2 = local_ssh(hdfscmd)
         if result1 or result2:
-            logger.error("File %s is not parquet format" % file)
+            logger.error('File %s is not parquet format' % f)
             sys.exit(1)
 
 
-def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, normal):
-    # move file(s) in src path into the folder correspoding to the target table
-    if (normal == True):
+def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, normal):
+    '''Move file(s) in src path into the folder correspoding to the target table'''
+    if normal:
         segno = firstsegno
         for file in files:
             srcfile = file
             dstfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
-                sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd)
+                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd)
                 if result != 0:
-                    logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile))
+                    logger.error('Fail to move %s to %s' % (srcfile, dstfile))
                     sys.exit(1)
     else:
         segno = firstsegno
@@ -228,79 +231,60 @@ def move_files_in_hdfs(options, databasename, tablename, files, firstsegno,
tabl
             srcfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
-                sys.stdout.write("hdfscmd: '%s'\n" % hdfscmd)
+                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd)
                 if result != 0:
-                    logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile))
+                    logger.error('Fail to move "%s" to "%s"' % (srcfile, dstfile))
                     sys.exit(1)
 
 
-def insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno,
tabledir, eofs):
+def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir,
eofs):
+    '''Insert the metadata into database'''
     try:
         query = "SET allow_system_table_mods='dml';"
         segno = firstsegno
         for eof in eofs:
             query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % (seg_name, segno,
eof, -1, -1)
             segno += 1
-
-        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
         conn = dbconn.connect(dburl, True)
         rows = dbconn.execSQL(conn, query)
-	conn.commit()
+        conn.commit()
         conn.close()
-
     except DatabaseError, ex:
-        logger.error("Failed to connect to database, this script can only be run when the
database is up")
-        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
-	move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, False)
-
+        logger.error('Failed to connect to database, this script can only be run when the
database is up')
+        move_files_in_hdfs(options.database, options.tablename, files, firstsegno, tabledir,
False)
         sys.exit(1)
 
 
-def main(args=None):
-    parser = create_opt_parser('%prog version $Revision: #1 $')
-    options, args = parser.parse_args(args)
-    if len(args) != 3:
-        sys.stderr.write('Incorrect number of arguments\n\n')
-        parser.print_help(sys.stderr)
-        return 1
-
-    databasename = args[0]
-    tablename = args[1]
-    filepath = args[2]
-
-    # 1. check whether the path of shell command 'hadoop' is set.
-    check_hadoop_command()
-
-    # 2. get the seg_name from database
-    seg_name = get_seg_name(options, databasename, tablename)
+if __name__ == '__main__':
+    options, args = option_parser()
+    if len(args) != 1 or (options.yml_config and options.filepath):
+        logger.error('Incorrect usage!\n Correct usage: "hawq register [-h hostname] [-p
port] [-U username] [-d database] [-f filepath] tablename"\n or "hawq register [-h hostname]
[-p port] [-U username] [-d database] [-c config] tablename"\n')
+        sys.exit(1)
+    if local_ssh('hadoop'):
+        logger.error('command "hadoop" is not available.')
+        sys.exit(1)
 
-    # 3. check whether target table is hash-typed, in that case simple insertion does not
work
-    result = check_hash_type(options, databasename, tablename)
+    dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=options.database)
+    filepath, database, tablename = options.filepath, options.database, args[0]
 
-    # 4. get the metadata to be inserted from hdfs
-    firstsegno, tabledir = get_metadata_from_database(options, databasename, tablename, seg_name)
+    if options.yml_config: # Usage2
+        fileformat, filepath, schema, distribution_policy = option_parser_yml(options.yml_config)
+        create_table(dburl, tablename, schema, fileformat, distribution_policy)
+    else:
+        fileformat = 'Parquet'
+        check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table
 
-    # 5. check whether all the files refered by 'filepath' and the location corresponding
to the table are in the same hdfs cluster
+    seg_name = get_seg_name(dburl, tablename, database)
+    firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name)
     check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
 
-    # 6. get all the files refered by 'filepath', which could be a file or a directory containing
all the files
     files, sizes = get_files_in_hdfs(filepath)
-    print "File(s) to be registered:"
+    print 'File(s) to be registered:', files
+    if fileformat == 'Parquet':
+        check_parquet_format(files)
     print files
-
-    # 7. check whether the file to be registered is parquet format
-    check_parquet_format(options, files)
-
-    # 8. move the file in hdfs to proper location
-    move_files_in_hdfs(options, databasename, tablename, files, firstsegno, tabledir, True)
-
-    # 9. insert the metadata into database
-    insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno,
tabledir, sizes)
-
-    # 10. report the final status of hawq register
-    logger.info("Hawq register succeed.")
-
-if __name__ == '__main__':
-    sys.exit(main())
+    move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
+    insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir,
sizes)
+    logger.info('Hawq Register Succeed.')


Mime
View raw message