Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48E1C200B95 for ; Tue, 13 Sep 2016 05:41:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4782F160AC8; Tue, 13 Sep 2016 03:41:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E4F4C160AB8 for ; Tue, 13 Sep 2016 05:41:13 +0200 (CEST) Received: (qmail 25403 invoked by uid 500); 13 Sep 2016 03:41:09 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 25394 invoked by uid 99); 13 Sep 2016 03:41:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 03:41:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 6C292C0B4A for ; Tue, 13 Sep 2016 03:41:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 2KtLJVO8VHAe for ; Tue, 13 Sep 2016 03:41:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id EC8D55F256 for ; Tue, 13 Sep 2016 03:41:02 +0000 (UTC) Received: (qmail 25313 invoked by uid 99); 13 Sep 2016 03:41:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 03:41:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1BA9AE089E; Tue, 13 Sep 2016 03:41:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hubertzhang@apache.org To: commits@hawq.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-hawq git commit: HAWQ-1033. Add --force option for hawq register, refactor related case using matrix style. Date: Tue, 13 Sep 2016 03:41:02 +0000 (UTC) archived-at: Tue, 13 Sep 2016 03:41:15 -0000 Repository: incubator-hawq Updated Branches: refs/heads/master 4fbe8e2dd -> 0664852a8 HAWQ-1033. Add --force option for hawq register, refactor related case using matrix style. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0664852a Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0664852a Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0664852a Branch: refs/heads/master Commit: 0664852a8e6fd8fb2ce9fed8316d10f3857e67e2 Parents: 4fbe8e2 Author: xunzhang Authored: Tue Sep 13 10:55:07 2016 +0800 Committer: hzhang2 Committed: Tue Sep 13 11:39:28 2016 +0800 ---------------------------------------------------------------------- src/test/feature/Makefile | 2 +- .../ManagementTool/data/parquet200/dat.paq | Bin 0 -> 1229 bytes .../ManagementTool/data/parquet200sum/a.paq | Bin 0 -> 596 bytes .../ManagementTool/data/parquet200sum/b.paq | Bin 0 -> 596 bytes .../ManagementTool/test_hawq_register.cpp | 70 +----------- .../feature/ManagementTool/test_hawq_register.h | 20 ++++ .../test_hawq_register_usage1.cpp | 111 +++++++++++++++++++ tools/bin/hawqregister | 86 +++++++++++--- 8 files changed, 205 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/Makefile ---------------------------------------------------------------------- diff --git a/src/test/feature/Makefile b/src/test/feature/Makefile index 7202ac3..d12b751 100644 --- a/src/test/feature/Makefile +++ b/src/test/feature/Makefile @@ -8,7 +8,7 @@ gmock_lib_path = $(abs_top_srcdir)/depends/thirdparty/googletest/build/googlemoc override CXX = g++ override CXXFLAGS = -Wall -O0 -g -std=c++11 -override CPPFLAGS := -I/usr/include -I/usr/local/include -I/usr/include/libxml2 -I$(abs_top_srcdir)/src/test/feature/ -I$(abs_top_srcdir)/src/test/feature/lib/ -I$(abs_top_srcdir)/src/interfaces/libpq -I$(abs_top_srcdir)/src/interfaces -I$(abs_top_srcdir)/src/include -I$(gtest_include) -I$(gmock_include) +override CPPFLAGS := -I/usr/include -I/usr/local/include -I/usr/include/libxml2 -I$(abs_top_srcdir)/src/test/feature/ -I$(abs_top_srcdir)/src/test/feature/ManagementTool/ -I$(abs_top_srcdir)/src/test/feature/lib/ -I$(abs_top_srcdir)/src/interfaces/libpq -I$(abs_top_srcdir)/src/interfaces -I$(abs_top_srcdir)/src/include -I$(gtest_include) -I$(gmock_include) override LIBS := $(LIBS) -lgtest -lpq -lxml2 -ltest override LDFLAGS += -L/usr/local/lib -L/usr/lib -L$(abs_top_srcdir)/src/test/feature/ -L$(abs_top_srcdir)/src/test/feature/lib/ -L$(abs_top_srcdir)/src/interfaces/libpq -L$(gtest_lib_path) -L$(gmock_lib_path) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/data/parquet200/dat.paq ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/data/parquet200/dat.paq b/src/test/feature/ManagementTool/data/parquet200/dat.paq new file mode 100644 index 0000000..0fefc6d Binary files /dev/null and b/src/test/feature/ManagementTool/data/parquet200/dat.paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/data/parquet200sum/a.paq ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/data/parquet200sum/a.paq b/src/test/feature/ManagementTool/data/parquet200sum/a.paq new file mode 100644 index 0000000..c42ade9 Binary files /dev/null and b/src/test/feature/ManagementTool/data/parquet200sum/a.paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/data/parquet200sum/b.paq ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/data/parquet200sum/b.paq b/src/test/feature/ManagementTool/data/parquet200sum/b.paq new file mode 100644 index 0000000..6ccdc83 Binary files /dev/null and b/src/test/feature/ManagementTool/data/parquet200sum/b.paq differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/test_hawq_register.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register.cpp b/src/test/feature/ManagementTool/test_hawq_register.cpp index 385f959..791b522 100644 --- a/src/test/feature/ManagementTool/test_hawq_register.cpp +++ b/src/test/feature/ManagementTool/test_hawq_register.cpp @@ -1,30 +1,17 @@ #include +#include "gtest/gtest.h" #include "lib/command.h" #include "lib/sql_util.h" #include "lib/string_util.h" #include "lib/hdfs_config.h" - -#include "gtest/gtest.h" +#include "test_hawq_register.h" using std::string; using hawq::test::SQLUtility; using hawq::test::Command; using hawq::test::HdfsConfig; -/* This test suite may consume more than 80 seconds. */ -class TestHawqRegister : public ::testing::Test { - public: - TestHawqRegister() {} - ~TestHawqRegister() {} - string getHdfsLocation() { - HdfsConfig hc; - string namenodehost = ""; - EXPECT_EQ(true, hc.getNamenodeHost(namenodehost)); - return hawq::test::stringFormat("hdfs://%s", namenodehost.c_str()); - } -}; - TEST_F(TestHawqRegister, TestUsage1SingleHawqFile) { SQLUtility util; string rootPath(util.getTestRootPath()); @@ -245,40 +232,6 @@ TEST_F(TestHawqRegister, TestUsage1NotHDFSPath) { util.execute("drop table hawqregister;"); } -TEST_F(TestHawqRegister, TestUsage1ParquetRandomly) { - SQLUtility util; - string rootPath(util.getTestRootPath()); - string relativePath("/ManagementTool/test_hawq_register_hawq.paq"); - string filePath = rootPath + relativePath; - auto cmd = hawq::test::stringFormat("hadoop fs -put -f %s %s/hawq_register_hawq.paq", filePath.c_str(), getHdfsLocation().c_str()); - EXPECT_EQ(0, Command::getCommandStatus(cmd)); - util.execute("drop table if exists nt;"); - util.execute("create table nt(i int) with (appendonly=true, orientation=parquet);"); - cmd = hawq::test::stringFormat("hawq register -d %s -f %s/hawq_register_hawq.paq nt", HAWQ_DB, getHdfsLocation().c_str()); - EXPECT_EQ(0, Command::getCommandStatus(cmd)); - util.query("select * from nt;", 3); - util.execute("insert into nt values(1);"); - util.query("select * from nt;", 4); - util.execute("drop table nt;"); -} - -TEST_F(TestHawqRegister, TestUsage1ParquetRandomly2) { - SQLUtility util; - string rootPath(util.getTestRootPath()); - string relativePath("/ManagementTool/test_hawq_register_hawq.paq"); - string filePath = rootPath + relativePath; - auto cmd = hawq::test::stringFormat("hadoop fs -put -f %s %s/hawq_register_hawq.paq", filePath.c_str(), getHdfsLocation().c_str()); - EXPECT_EQ(0, Command::getCommandStatus(cmd)); - util.execute("drop table if exists nt;"); - util.execute("create table nt(i int) with (appendonly=true, orientation=parquet) distributed randomly;"); - cmd = hawq::test::stringFormat("hawq register -d %s -f %s/hawq_register_hawq.paq nt", HAWQ_DB, getHdfsLocation().c_str()); - EXPECT_EQ(0, Command::getCommandStatus(cmd)); - util.query("select * from nt;", 3); - util.execute("insert into nt values(1);"); - util.query("select * from nt;", 4); - util.execute("drop table nt;"); -} - TEST_F(TestHawqRegister, TestUsage2ParquetRandomly) { SQLUtility util; util.execute("drop table if exists t;"); @@ -392,22 +345,3 @@ TEST_F(TestHawqRegister, TestDismatchFileNumber) { string filePath = util.getTestRootPath() + "/ManagementTool/"; EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "files_incomplete.yml xx")); } - -TEST_F(TestHawqRegister, TestUsage2Behavior2) { - SQLUtility util; - util.execute("drop table if exists simple_register_table;"); - util.execute("create table simple_register_table(i int) with (appendonly=true, orientation=row) distributed randomly;"); - util.execute("insert into simple_register_table values(1), (2), (3);"); - - EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o tmp.yml testhawqregister_testusage2behavior2.simple_register_table")); - EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c tmp.yml testhawqregister_testusage2behavior2.new_simple_register_table")); - util.query("select * from new_simple_register_table;", 3); - - EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o new_tmp.yml testhawqregister_testusage2behavior2.new_simple_register_table")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c new_tmp.yml testhawqregister_testusage2behavior2.new_simple_register_table")); - - EXPECT_EQ(0, Command::getCommandStatus("rm -rf tmp.yml")); - EXPECT_EQ(0, Command::getCommandStatus("rm -rf new_tmp.yml")); - util.execute("drop table simple_register_table;"); - util.execute("drop table new_simple_register_table;"); -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/test_hawq_register.h ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register.h b/src/test/feature/ManagementTool/test_hawq_register.h new file mode 100644 index 0000000..3cdc288 --- /dev/null +++ b/src/test/feature/ManagementTool/test_hawq_register.h @@ -0,0 +1,20 @@ +#ifndef TEST_HAWQ_REGISTER_H +#define TEST_HAWQ_REGISTER_H + +#include +#include "lib/hdfs_config.h" +#include "gtest/gtest.h" + +class TestHawqRegister : public ::testing::Test { + public: + TestHawqRegister() {} + ~TestHawqRegister() {} + std::string getHdfsLocation() { + hawq::test::HdfsConfig hc; + std::string namenodehost = ""; + EXPECT_EQ(true, hc.getNamenodeHost(namenodehost)); + return hawq::test::stringFormat("hdfs://%s", namenodehost.c_str()); + } +}; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/src/test/feature/ManagementTool/test_hawq_register_usage1.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage1.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage1.cpp new file mode 100644 index 0000000..1843876 --- /dev/null +++ b/src/test/feature/ManagementTool/test_hawq_register_usage1.cpp @@ -0,0 +1,111 @@ +#include +#include + +#include "gtest/gtest.h" +#include "lib/command.h" +#include "lib/sql_util.h" +#include "lib/string_util.h" +#include "lib/hdfs_config.h" +#include "test_hawq_register.h" + +using std::vector; +using std::string; +using hawq::test::SQLUtility; +using hawq::test::Command; +using hawq::test::HdfsConfig; + +TEST_F(TestHawqRegister, TestUsage1ExpectSuccess) { + // Register file/folder into HAWQ by specific file/folder name + + SQLUtility util; + string rootPath(util.getTestRootPath()); + string filePath = rootPath + "/ManagementTool/data/parquet200/dat.paq"; + string folderPath = rootPath + "/ManagementTool/data/parquet200sum/"; + + vector ddl_orientation_matrix = {"parquet"}; + vector distribution_policy_matrix = {"", "DISTRIBUTED RANDOMLY"}; + vector folder_matrix = {"/usage1tmp/", "/usage1tmp"}; + + for(int i = 0; i < ddl_orientation_matrix.size() * distribution_policy_matrix.size() * 2 + ddl_orientation_matrix.size() * distribution_policy_matrix.size() * folder_matrix.size(); ++i) { + util.execute(hawq::test::stringFormat("drop table if exists t_%s;", std::to_string(i).c_str())); + } + auto register_lambda = [&] () { + int suffix = 0; + // hawq register -d hawq_feature_test -f hdfs://localhost:8020/usage1dat.paq t_# + for(auto & ddl : ddl_orientation_matrix) { + for(auto & policy : distribution_policy_matrix) { + auto cmd = hawq::test::stringFormat("hdfs dfs -put -f %s %s/usage1dat.paq", filePath.c_str(), getHdfsLocation().c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + auto sql = hawq::test::stringFormat("CREATE TABLE t_%s(i int) with (appendonly=true, orientation=%s) %s;", std::to_string(suffix).c_str(), ddl.c_str(), policy.c_str()); + util.execute(sql); util.query(hawq::test::stringFormat("SELECT * from t_%s", std::to_string(suffix).c_str()), 0); + + cmd = hawq::test::stringFormat("hawq register -d %s -f %s/usage1dat.paq t_%s", HAWQ_DB, getHdfsLocation().c_str(), std::to_string(suffix).c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 200); + util.execute(hawq::test::stringFormat("insert into t_%s values(201);", std::to_string(suffix).c_str())); + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 201); + + suffix ++; + } + } + + // hawq register -d hawq_feature_test -f hdfs://localhost:8020/usage1dat.paq -e eof t_# + for(auto & ddl : ddl_orientation_matrix) { + for(auto & policy : distribution_policy_matrix) { + auto cmd = hawq::test::stringFormat("hdfs dfs -put -f %s %s/usage1dat.paq", filePath.c_str(), getHdfsLocation().c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + auto sql = hawq::test::stringFormat("CREATE TABLE t_%s(i int) with (appendonly=true, orientation=%s) %s;", std::to_string(suffix).c_str(), ddl.c_str(), policy.c_str()); + util.execute(sql); util.query(hawq::test::stringFormat("SELECT * from t_%s", std::to_string(suffix).c_str()), 0); + + cmd = hawq::test::stringFormat("hawq register -d %s -f %s/usage1dat.paq -e 596 t_%s", HAWQ_DB, getHdfsLocation().c_str(), std::to_string(suffix).c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 100); + util.execute(hawq::test::stringFormat("insert into t_%s values(101);", std::to_string(suffix).c_str())); + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 101); + + suffix ++; + } + } + + // hawq register -d hawq_feature_test -f hdfs://localhost:8020/usage1tmp/ t_# + // hawq register -d hawq_feature_test -f hdfs://localhost:8020/usage1tmp t_# + for(auto & ddl : ddl_orientation_matrix) { + for(auto & policy : distribution_policy_matrix) { + for(auto & folder : folder_matrix) { + auto cmd = hawq::test::stringFormat("hdfs dfs -mkdir -p %s/usage1tmp/", getHdfsLocation().c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + cmd = hawq::test::stringFormat("hdfs dfs -put -f %s/*.paq %s/usage1tmp/", folderPath.c_str(), getHdfsLocation().c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + auto sql = hawq::test::stringFormat("CREATE TABLE t_%s(i int) with (appendonly=true, orientation=%s) %s;", std::to_string(suffix).c_str(), ddl.c_str(), policy.c_str()); + util.execute(sql); util.query(hawq::test::stringFormat("SELECT * from t_%s", std::to_string(suffix).c_str()), 0); + + cmd = hawq::test::stringFormat("hawq register -d %s -f %s%s t_%s", HAWQ_DB, getHdfsLocation().c_str(), folder.c_str(), std::to_string(suffix).c_str()); + EXPECT_EQ(0, Command::getCommandStatus(cmd)); + + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 200); + util.execute(hawq::test::stringFormat("insert into t_%s values(201);", std::to_string(suffix).c_str())); + util.query(hawq::test::stringFormat("select * from t_%s;", std::to_string(suffix).c_str()), 201); + + suffix ++; + } + } + } + + }; // register_lambda + + auto gc_lambda = [&] () { + auto sql = hawq::test::stringFormat("hdfs dfs -rm -r %s/usage1tmp/", getHdfsLocation().c_str()); + EXPECT_EQ(0, Command::getCommandStatus(sql)); + for(int i = 0; i < ddl_orientation_matrix.size() * distribution_policy_matrix.size() * 2 + ddl_orientation_matrix.size() * distribution_policy_matrix.size() * folder_matrix.size(); ++i) { + util.execute(hawq::test::stringFormat("drop table t_%s;", std::to_string(i).c_str())); + } + }; + + register_lambda(); + gc_lambda(); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0664852a/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 2b6fce5..b3e3493 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -128,10 +128,11 @@ def option_parser_yml(yml_file): def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number): try: - query = "select count(*) from pg_class where relname = '%s'" % tablename.split('.')[-1].lower() + query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) conn.commit() + conn.close() for row in rows: if row[0] != 0: return False @@ -151,6 +152,7 @@ def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_lo conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) conn.commit() + conn.close() return True except DatabaseError, ex: print DatabaseError, ex @@ -192,7 +194,7 @@ def check_hash_type(dburl, tablename): rows = dbconn.execSQL(conn, query) conn.commit() if not rows.rowcount: - logger.error('Table not found in table gp_distribution_policy.' % tablename) + logger.error('Table %s not found in table gp_distribution_policy.' % tablename) sys.exit(1) for row in rows: if row[0]: @@ -323,21 +325,43 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor sys.exit(1) -def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir, eofs): +def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir, eofs, fmt): '''Insert the metadata into database''' try: - query = "SET allow_system_table_mods='dml';" - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) + query = "set allow_system_table_mods='dml';" + if fmt == 'Parquet': + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) + else: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) query += ';' conn = dbconn.connect(dburl, True) rows = dbconn.execSQL(conn, query) conn.commit() conn.close() except DatabaseError, ex: - logger.error('Failed to connect to database, this script can only be run when the database is up') - move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, False) + logger.error('Failed to execute query "%s"' % query) + move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False) + sys.exit(1) + +def update_metadata_into_database(dburl, seg_name, files, eofs): + '''Update the catalog table in --force case''' + try: + query = "set allow_system_table_mods='dml';" + query += "begin transaction;" + segno_lst = [f.split('/')[-1] for f in files] + for i, eof in enumerate(eofs): + query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i]) + query += "end transaction;" + conn = dbconn.connect(dburl, True) + rows = dbconn.execSQL(conn, query) + conn.commit() + conn.close() + except DatabaseError, ex: + logger.error('Failed to execute query "%s"' % query) sys.exit(1) @@ -366,16 +390,24 @@ if __name__ == '__main__': second_normal_mode = True fileformat, files, sizes, schema, distribution_policy, file_locations, bucket_number = option_parser_yml(options.yml_config) filepath = files[0][:files[0].rfind('/')] if files else '' + # check conflicting distributed policy if distribution_policy.startswith('DISTRIBUTED BY'): if len(files) % bucket_number != 0: logger.error('Files to be registered must be multiple times to the bucket number of hash table.') sys.exit(1) - if not create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations, bucket_number): - second_normal_mode, second_exist_mode = False, True + if not force_mode: + if not create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations, bucket_number): + second_normal_mode, second_exist_mode = False, True else: fileformat = 'Parquet' check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table + if repair_mode: + # check distribution policy consistency + # check bucketnum, pagesize, rowgroupsize, etc + # check filesize smaller + pass + # check filepath if not filepath: sys.exit(0) @@ -388,24 +420,48 @@ if __name__ == '__main__': logger.error('Files to be registered in this case should not be the same with table path.') sys.exit(1) + do_not_move, files_update, sizes_update = False, [], [] + if force_mode: + existed_files, _ = get_files_in_hdfs(tabledir) + if len(files) == len(existed_files): + if sorted(files) != sorted(existed_files): + logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.') + sys.exit(1) + else: + do_not_move, files_update, sizes_update = True, files, sizes + files, sizes = [], [] + else: + for k, f in enumerate(files): + if f in existed_files: + files_update.append(files[k]) + sizes_update.append(sizes[k]) + files.remove(files[k]) + sizes.remove(sizes[k]) + check_files_and_table_in_same_hdfs_cluster(filepath, tabledir) if not options.yml_config: files, sizes = get_files_in_hdfs(filepath) - print 'File(s) to be registered:', files + print 'New file(s) to be registered:', files + if files_update: + print 'Files(s) catalog info need to be update:', files_update # set specified eofs if options.filesize: - if options.filesize != len(files): + if len(files) != 1: logger.error('-e option is only supported with single file case.') sys.exit(1) sizes = [options.filesize] if fileformat == 'Parquet': check_parquet_format(files) - move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) + if not do_not_move: + move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) # update catalog table - insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes) + if not do_not_move: + insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat) + if force_mode: + update_metadata_into_database(dburl, seg_name, files_update, sizes_update) logger.info('Hawq Register Succeed.')