hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lil...@apache.org
Subject incubator-hawq git commit: HAWQ-760. Add hawq register function and tests
Date Thu, 02 Jun 2016 02:29:22 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master ec098032e -> 2b50f7fee


HAWQ-760. Add hawq register function and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2b50f7fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2b50f7fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2b50f7fe

Branch: refs/heads/master
Commit: 2b50f7fee9a71ade3ca0e569e50562dde9a64bbc
Parents: ec09803
Author: Yancheng Luo <yluo@pivotal.io>
Authored: Thu Jun 2 10:13:30 2016 +0800
Committer: Lili Ma <ictmalili@gmail.com>
Committed: Thu Jun 2 10:28:56 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbparquetfooterserializer.c    | 129 ++++++---
 src/backend/cdb/cdbparquetstoragewrite.c        |   5 +-
 src/backend/executor/execMain.c                 |   5 +-
 src/include/cdb/cdbparquetstoragewrite.h        |   5 +
 .../testhawqregister/test-hawq-register.cpp     | 149 ++++++++++
 src/test/feature/testhawqregister/test_hawq.paq | Bin 0 -> 657 bytes
 src/test/feature/testhawqregister/test_hive.paq | Bin 0 -> 212 bytes
 src/test/feature/testhawqregister/test_not_paq  | Bin 0 -> 48 bytes
 tools/bin/hawq                                  |   3 +
 tools/bin/hawqregister                          | 272 +++++++++++++++++++
 tools/doc/hawqregister_help                     | 135 +++++++++
 11 files changed, 669 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetfooterserializer.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbparquetfooterserializer.c b/src/backend/cdb/cdbparquetfooterserializer.c
index 802793f..b142600 100644
--- a/src/backend/cdb/cdbparquetfooterserializer.c
+++ b/src/backend/cdb/cdbparquetfooterserializer.c
@@ -25,6 +25,7 @@
  */
 
 #include "cdb/cdbparquetfooterserializer.h"
+#include "cdb/cdbparquetstoragewrite.h"
 #include "access/parquetmetadata_c++/MetadataInterface.h"
 #include "lib/stringinfo.h"
 #include "postgres.h"
@@ -245,7 +246,21 @@ readParquetFileMetadata(
 				break;
 			}
 			break;
+		case 5:
+			/* Skip this optional field now */
+			if (ftype == T_LIST) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
+		case 6:
+			/* Skip this optional field now */
+			if (ftype == T_STRING) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata field not recognized with
fid: %d", fid)));
 			break;
 		}
 
@@ -491,7 +506,27 @@ readSchemaElement_Single(
 				xfer += skipType(prot, ftype);
 			}
 			break;
+		case 7:
+			/* Skip this optional field now */
+			if (ftype == T_I32) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
+		case 8:
+			/* Skip this optional field now */
+			if (ftype == T_I32) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
+		case 9:
+			/* Skip this optional field now */
+			if (ftype == T_I32) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: schema element field not
recognizd with fid: %d", fid)));
 			break;
 		}
 	}
@@ -567,7 +602,15 @@ readRowGroupInfo(
 				isset_num_rows = true;
 			}
 			break;
+		case 4:
+			/* Skip this optional field now */
+			if (ftype == T_LIST) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: row group field not recognized
with fid: %d", fid)));
 			break;
 		}
 	}
@@ -633,6 +676,8 @@ readColumnChunk(
 			}
 			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: column chunk field not recognized
with fid: %d", fid)));
 			break;
 		}
 	}
@@ -773,7 +818,21 @@ readColumnMetadata(
 				xfer += skipType(prot, ftype);
 			}
 			break;
+		case 12:
+			/* Skip this optional field now */
+			if (ftype == T_STRUCT) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
+		case 13:
+			/* Skip this optional field now */
+			if (ftype == T_LIST) {
+				xfer += skipType(prot, ftype);
+			}
+			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: column metadata field not
recognized with fid: %d", fid)));
 			break;
 		}
 	}
@@ -864,14 +923,15 @@ uint32_t readKeyValue(
 			}
 			break;
 		default:
+			ereport(ERROR,
+				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata: key value field not recognized
with fid: %d", fid)));
 			break;
 		}
 	}
 	readStructEnd(prot);
 
 	if (!isset_key)
-		ereport(ERROR,
-				(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata key value: key not set")));
+		ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("file metadata key value: key
not set")));
 	return xfer;
 }
 
@@ -912,39 +972,43 @@ endDeserializerFooter(
 	int16_t fid;
 	int xfer = 0;
 
-	xfer += readFieldBegin(*prot, &ftype, &fid);
-	if (ftype != T_STOP) {
-		/*should remain the last field, keyvalue*/
-		if (fid != 5) {
-			ereport(ERROR,
-					(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format")));
-		}
 
-		/* process keyvalue part*/
-		if (ftype == T_LIST) {
-			uint32_t lsize;
-			TType etype;
-			xfer += readListBegin(*prot, &etype, &lsize);
-
-			for (int i = 0; i < lsize; i++) {
-				char *key = NULL;
-				char *value = NULL;
-				xfer += readKeyValue(*prot, &key, &value);
-
-				if ((key != NULL) && (strcmp(key, "hawq.schema") == 0)) {
-					int schemaLen = strlen(value);
-					parquetMetadata->hawqschemastr =
-							(char*) palloc0(schemaLen + 1);
-					strcpy(parquetMetadata->hawqschemastr, value);
+	while (true) {
+		xfer += readFieldBegin(*prot, &ftype, &fid);
+		if (ftype == T_STOP) {
+			break;
+		}
+		switch (fid) {
+		case 5:
+			if (ftype == T_LIST) {
+				uint32_t lsize;
+				TType etype;
+				xfer += readListBegin(*prot, &etype, &lsize);
+
+				for (int i = 0; i < lsize; i++) {
+					char *key = NULL;
+					char *value = NULL;
+					xfer += readKeyValue(*prot, &key, &value);
+
+					if ((key != NULL) && (strcmp(key, "hawq.schema") == 0)) {
+						int schemaLen = strlen(value);
+						parquetMetadata->hawqschemastr =
+								(char*) palloc0(schemaLen + 1);
+						strcpy(parquetMetadata->hawqschemastr, value);
+					}
 				}
 			}
-		}
-
-		/*Then should read the T_STOP*/
-		readFieldBegin(*prot, &ftype, &fid);
-		if (ftype != T_STOP) {
+			break;
+		case 6:
+			/* Skip this optional field now */
+			if (ftype == T_STRING) {
+				xfer += skipType(*prot, ftype);
+			}
+			break;
+		default:
 			ereport(ERROR,
-					(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format")));
+					(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("incorrect file metadata format with fid:
%d", fid)));
+			break;
 		}
 	}
 
@@ -1295,6 +1359,9 @@ writeEndofParquetMetadata(
 	xfer += writeString(prot, "hawq.schema", strlen("hawq.schema"));
 	/*write out value*/
 	xfer += writeFieldBegin(prot, T_STRING, 2);
+	if (parquetMetadata->hawqschemastr == NULL)
+		parquetMetadata->hawqschemastr = generateHAWQSchemaStr(parquetMetadata->pfield,
+							parquetMetadata->fieldCount);
 	xfer += writeString(prot, parquetMetadata->hawqschemastr, strlen(parquetMetadata->hawqschemastr));
 	/*write out end of key value*/
 	xfer += writeFieldStop(prot);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/cdb/cdbparquetstoragewrite.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbparquetstoragewrite.c b/src/backend/cdb/cdbparquetstoragewrite.c
index 6b5f847..e57e875 100644
--- a/src/backend/cdb/cdbparquetstoragewrite.c
+++ b/src/backend/cdb/cdbparquetstoragewrite.c
@@ -173,7 +173,8 @@ static int appendParquetColumn_Circle(
 		int r,
 		int d);
 
-static char *generateHAWQSchemaStr(
+/* Used by cdbparquetfooterserializer.c */
+char *generateHAWQSchemaStr(
 		ParquetFileField pfields,
 		int fieldCount);
 
@@ -216,7 +217,7 @@ static bool ensureBufferCapacity(ParquetDataPage page,
 
  What about Array?
  */
-static char *
+char *
 generateHAWQSchemaStr(ParquetFileField pfields,
 					  int fieldCount)
 {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/backend/executor/execMain.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 3a43cec..578b9e8 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -2383,10 +2383,13 @@ initResultRelInfo(ResultRelInfo *resultRelInfo,
 						 RelationGetRelationName(resultRelationDesc))));
 			break;
 		case RELKIND_AOSEGMENTS:
-			ereport(ERROR,
+			/* Relax the constraint here to allow hawq register */
+			if (!allowSystemTableModsDML && IsSystemRelation(resultRelationDesc)) {
+				ereport(ERROR,
 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 					 errmsg("cannot change AO segment listing relation \"%s\"",
 						 RelationGetRelationName(resultRelationDesc))));
+			}
 			break;
 		case RELKIND_AOBLOCKDIR:
 			ereport(ERROR,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/include/cdb/cdbparquetstoragewrite.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbparquetstoragewrite.h b/src/include/cdb/cdbparquetstoragewrite.h
index c0832c0..c64a105 100644
--- a/src/include/cdb/cdbparquetstoragewrite.h
+++ b/src/include/cdb/cdbparquetstoragewrite.h
@@ -235,4 +235,9 @@ void estimateColumnWidth(int *columnWidths,
 					Form_pg_attribute att,
 					bool expandEmbeddingType);
 
+/*
+ * Generate key_value_metadata filed used in FileMetaData.
+ */
+char * generateHAWQSchemaStr(ParquetFileField pfields,
+					int fieldCount);
 #endif /* CDBPARQUETSTORAGEWRITE_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test-hawq-register.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test-hawq-register.cpp b/src/test/feature/testhawqregister/test-hawq-register.cpp
new file mode 100644
index 0000000..eb15d32
--- /dev/null
+++ b/src/test/feature/testhawqregister/test-hawq-register.cpp
@@ -0,0 +1,149 @@
+#include <string>
+
+#include "lib/command.h"
+#include "lib/common.h"
+#include "lib/sql-util.h"
+
+#include "gtest/gtest.h"
+
+using std::string;
+
+/* This test suite may consume more than 80 seconds. */
+class TestHawqRegister : public ::testing::Test {
+ public:
+  TestHawqRegister() {}
+  ~TestHawqRegister() {}
+};
+
+TEST_F(TestHawqRegister, TestSingleHawqFile) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_hawq.paq");
+	string filePath = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq"));
+
+	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq"));
+
+	util.query("select * from hawqregister;", 3);
+	util.execute("insert into hawqregister values(1);");
+	util.query("select * from hawqregister;", 4);
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestSingleHiveFile) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_hive.paq");
+	string filePath = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hive.paq"));
+
+	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hive.paq"));
+
+	util.query("select * from hawqregister;", 1);
+	util.execute("insert into hawqregister values(1);");
+	util.query("select * from hawqregister;", 2);
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestFiles) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_hawq.paq");
+	string filePath1 = rootPath + relativePath;
+	relativePath = "/testhawqregister/test_hive.paq";
+	string filePath2 = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -mkdir -p /hawq_register_test/t/t"));
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/hawq1.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/hawq2.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath1 + " /hawq_register_test/t/hawq.paq"));
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/hive1.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/hive2.paq"));
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath2 + " /hawq_register_test/t/hive.paq"));
+
+	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_test"));
+
+	util.query("select * from hawqregister;", 12);
+	util.execute("insert into hawqregister values(1);");
+	util.query("select * from hawqregister;", 13);
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm -r /hawq_register_test"));
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestHashDistributedTable) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_hawq.paq");
+	string filePath = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq"));
+
+	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet)
distributed by (i);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq"));
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_hawq.paq"));
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestNotParquetFile) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_not_paq");
+	string filePath = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_test_not_paq"));
+
+	util.execute("create table hawqregister(i int) with (appendonly=true, orientation=parquet);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_test_not_paq"));
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_test_not_paq"));
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestNotParquetTable) {
+	SQLUtility util;
+	string rootPath(util.getTestRootPath());
+	string relativePath("/testhawqregister/test_hawq.paq");
+	string filePath = rootPath + relativePath;
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -put " + filePath + " /hawq_register_hawq.paq"));
+
+	util.execute("create table hawqregister(i int);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_hawq.paq"));
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(0, Command::getCommandStatus("hadoop fs -rm /hawq_register_hawq.paq"));
+	util.execute("drop table hawqregister;");
+}
+
+TEST_F(TestHawqRegister, TestFileNotExist) {
+	SQLUtility util;
+
+	util.execute("create table hawqregister(i int);");
+	util.query("select * from hawqregister;", 0);
+
+	EXPECT_EQ(1, Command::getCommandStatus("hawq register postgres hawqregister /hawq_register_file_not_exist"));
+	util.query("select * from hawqregister;", 0);
+
+	util.execute("drop table hawqregister;");
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hawq.paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_hawq.paq b/src/test/feature/testhawqregister/test_hawq.paq
new file mode 100644
index 0000000..f2adb4b
Binary files /dev/null and b/src/test/feature/testhawqregister/test_hawq.paq differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_hive.paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_hive.paq b/src/test/feature/testhawqregister/test_hive.paq
new file mode 100644
index 0000000..a356fc7
Binary files /dev/null and b/src/test/feature/testhawqregister/test_hive.paq differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/src/test/feature/testhawqregister/test_not_paq
----------------------------------------------------------------------
diff --git a/src/test/feature/testhawqregister/test_not_paq b/src/test/feature/testhawqregister/test_not_paq
new file mode 100644
index 0000000..dc75c44
Binary files /dev/null and b/src/test/feature/testhawqregister/test_not_paq differ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawq
----------------------------------------------------------------------
diff --git a/tools/bin/hawq b/tools/bin/hawq
index 08c30e4..41130e9 100755
--- a/tools/bin/hawq
+++ b/tools/bin/hawq
@@ -175,6 +175,9 @@ def main():
     elif hawq_command == "scp":
         cmd = "%s; gpscp %s" % (source_hawq_env, sub_args)
         result = local_ssh(cmd)
+    elif hawq_command == "register":
+        cmd = "%s; hawqregister %s" % (source_hawq_env, sub_args)
+        result = local_ssh(cmd)
     elif hawq_command == "version" or hawq_command == "--version":
         print_version()
     else:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
new file mode 100755
index 0000000..876aed7
--- /dev/null
+++ b/tools/bin/hawqregister
@@ -0,0 +1,272 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+'''
+hawq register [options] database_name table_name file_or_dir_path_in_hdfs
+'''
+import os, sys, optparse, getpass, re, urlparse
+try:
+    from gppylib.commands.unix import getLocalHostname, getUserName
+    from gppylib.db import dbconn
+    from gppylib.gplog import get_default_logger, setup_tool_logging
+    from gppylib.gpparseopts import OptParser, OptChecker
+    from pygresql import pg
+    from pygresql.pgdb import DatabaseError
+    from hawqpylib.hawqlib import local_ssh, local_ssh_output
+except ImportError, e:
+    print e
+    sys.stderr.write('cannot import module, please check that you have source greenplum_path.sh\n')
+    sys.exit(2)
+
+# setup logging
+logger = get_default_logger()
+EXECNAME = os.path.split(__file__)[-1]
+setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
+
+def create_opt_parser(version):
+    parser = OptParser(option_class=OptChecker,
+                       usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs',
+                       version=version)
+    parser.remove_option('-h')
+    parser.add_option('-?', '--help', action='help')
+    parser.add_option('-h', '--host', help="host of the target DB")
+    parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0)
+    parser.add_option('-U', '--user', help="username of the target DB")
+    return parser
+
+
+def get_seg_name(options, databasename, tablename):
+    try:
+        relfilenode = 0
+        relname = ""
+        query = "select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class
as pg_class2 where pg_class1.relname ='%s' and  pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid
= pg_class2.oid;" % tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        conn = dbconn.connect(dburl, True)
+        rows = dbconn.execSQL(conn, query)
+	conn.commit()
+        if rows.rowcount == 0:
+            logger.error("table '%s' not found in db '%s'" % (tablename, databasename));
+            sys.exit(1)
+        for row in rows:
+            relname = row[0]
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be run when the
database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        sys.exit(1)
+
+    # check whether the target table is parquet format
+    if relname.find("paq") == -1:
+        logger.error("table '%s' is not parquet format" % tablename)
+        sys.exit(1)
+
+    return relname
+
+
+def check_hash_type(options, databasename, tablename):
+    try:
+        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname
= '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+	conn.commit()
+        if rows.rowcount == 0:
+            logger.error("target not found in table gp_distribution_policy")
+            sys.exit(1)
+        for row in rows:
+            if row[0] != None:
+                logger.error("Cannot register file(s) to a table which is hash-typed")
+                sys.exit(1)
+
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be run when the
database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        sys.exit(1)
+
+
+def get_files_in_hdfs(filepath):
+    files = []
+    sizes = []
+    hdfscmd = "hadoop fs -test -e %s" % filepath
+    result = local_ssh(hdfscmd)
+    if result != 0:
+        logger.error("Path '%s' does not exist in hdfs" % filepath)
+        sys.exit(1)
+
+    hdfscmd = "hadoop fs -ls -R %s" % filepath
+    result, out, err = local_ssh_output(hdfscmd)
+    outlines = out.splitlines()
+
+    # recursively search all the files under path 'filepath'
+    i = 0
+    for line in outlines:
+        lineargs = line.split()
+        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
+            files.append(lineargs[7])
+            sizes.append(int(lineargs[4]))
+
+    if len(files) == 0:
+        logger.error("Dir '%s' is empty" % filepath)
+        sys.exit(1)
+
+    return files, sizes
+
+
+def check_parquet_format(options, files):
+    # check whether the files are parquet format by checking the first and last four bytes
+    for file in files:
+        hdfscmd = "hadoop fs -cat %s | head -c 4 | grep PAR1" % file
+        result1 = local_ssh(hdfscmd)
+        hdfscmd = "hadoop fs -cat %s | tail -c 4 | grep PAR1" % file
+        result2 = local_ssh(hdfscmd)
+        if result1 or result2:
+            logger.error("File %s is not parquet format" % file)
+            sys.exit(1)
+
+
+def get_metadata_from_database(options, databasename, seg_name):
+    try:
+        query = "select segno from pg_aoseg.%s;" % seg_name
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+	conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be run when the
database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        sys.exit(1)
+
+    return rows.rowcount + 1
+
+
+def move_files_in_hdfs(options, databasename, tablename, files, firstsegno, normal):
+    # get the full path of correspoding file for target table
+    try:
+        query = "select location, gp_persistent_tablespace_node.tablespace_oid, database_oid,
relfilenode from pg_class, gp_persistent_relation_node, gp_persistent_tablespace_node, gp_persistent_filespace_node
where relname = '%s' and pg_class.relfilenode = gp_persistent_relation_node.relfilenode_oid
and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid
and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;"
% tablename
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        conn = dbconn.connect(dburl, False)
+        rows = dbconn.execSQL(conn, query)
+	conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be run when the
database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+        sys.exit(1)
+
+
+    # move file(s) in src path into the folder correspoding to the target table
+    if (normal == True):
+        for row in rows:
+            destdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3])
+ "/"
+
+        segno = firstsegno
+        for file in files:
+            srcfile = file
+            dstfile = destdir + str(segno)
+            segno += 1
+            if srcfile != dstfile:
+                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
+                sys.stdout.write("hafscmd: '%s'\n" % hdfscmd)
+                result = local_ssh(hdfscmd)
+                if result != 0:
+                    logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile))
+                    sys.exit(1)
+    else:
+        for row in rows:
+            srcdir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3])
+ "/"
+
+        segno = firstsegno
+        for file in files:
+            dstfile = file
+            srcfile = srcdir + str(segno)
+            segno += 1
+            if srcfile != dstfile:
+                hdfscmd = "hadoop fs -mv %s %s" % (srcfile, dstfile)
+                sys.stdout.write("hafscmd: '%s'\n" % hdfscmd)
+                result = local_ssh(hdfscmd)
+                if result != 0:
+                    logger.error("Fail to move '%s' to '%s'" % (srcfile, dstfile))
+                    sys.exit(1)
+
+
+def insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno,
eofs):
+    try:
+        query = "SET allow_system_table_mods='dml';"
+        segno = firstsegno
+        for eof in eofs:
+            query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % (seg_name, segno,
eof, -1, -1)
+            segno += 1
+
+        dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user,
dbname=databasename)
+        conn = dbconn.connect(dburl, True)
+        rows = dbconn.execSQL(conn, query)
+	conn.commit()
+        conn.close()
+
+    except DatabaseError, ex:
+        logger.error("Failed to connect to database, this script can only be run when the
database is up")
+        logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host,
options.port, options.user, databasename, query))
+	move_files_in_hdfs(options, databasename, tablename, files, firstsegno, False)
+
+        sys.exit(1)
+
+
+def main(args=None):
+    parser = create_opt_parser('%prog version $Revision: #1 $')
+    options, args = parser.parse_args(args)
+    if len(args) != 3:
+        sys.stderr.write('Incorrect number of arguments\n\n')
+        parser.print_help(sys.stderr)
+        return 1
+
+    databasename = args[0]
+    tablename = args[1]
+    filepath = args[2]
+
+    # 1. get the seg_name from database
+    seg_name = get_seg_name(options, databasename, tablename)
+
+    # 2. check whether target table is hash-typed, in that case simple insertion does not
work
+    result = check_hash_type(options, databasename, tablename)
+
+    # 3. get all the files refered by 'filepath', which could be a file or a directory containing
all the files
+    files, sizes = get_files_in_hdfs(filepath)
+    print "File(s) to be registered:"
+    print files
+
+    # 4. check whether the file to be registered is parquet format
+    check_parquet_format(options, files)
+
+    # 5. get the metadata to be inserted from hdfs
+    firstsegno = get_metadata_from_database(options, databasename, seg_name)
+
+    # 6. move the file in hdfs to proper location
+    move_files_in_hdfs(options, databasename, tablename, files, firstsegno, True)
+
+    # 7. insert the metadata into database
+    insert_metadata_into_database(options, databasename, tablename, seg_name, firstsegno,
sizes)
+
+if __name__ == '__main__':
+    sys.exit(main())

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2b50f7fe/tools/doc/hawqregister_help
----------------------------------------------------------------------
diff --git a/tools/doc/hawqregister_help b/tools/doc/hawqregister_help
new file mode 100644
index 0000000..d5a5a0d
--- /dev/null
+++ b/tools/doc/hawqregister_help
@@ -0,0 +1,135 @@
+COMMAND NAME: hawq register
+
+Register parquet files generated by other system into the corrsponding table in HAWQ
+
+*****************************************************
+SYNOPSIS
+*****************************************************
+
+hawq register [-h hostname] [-p port] [-U username] <databasename> <tablename>
<hdfspath>
+
+hawq register help
+hawq register -?
+
+hawq register --version
+
+*****************************************************
+DESCRIPTION
+*****************************************************
+
+"hawq register" is a utility to register file(s) on HDFS into
+the table in HAWQ. It moves the file in the path(if path
+refers to a file) or files under the path(if path refers to a
+directory) into the table directory corresponding to the table,
+and then update the table meta data to include the files.
+
+To use "hawq register", HAWQ must have been started.
+
+Currently "hawq register" supports parquet tables only.
+User have to make sure that the meta data of the parquet file(s)
+and the table are consistent.
+The table to be registered into should not be hash distributed, which
+is created by using "distributed by" statement when creating that table.
+
+*****************************************************
+Arguments
+*****************************************************
+
+<databasename>
+
+Name of the database to be operated on.
+
+<tablename>
+
+Name of the table to be registered into.
+
+<hdfspath>
+
+The path of the file or the directory containing the files
+that will be registered.
+
+*****************************************************
+OPTIONS
+*****************************************************
+
+-? (help)
+
+Displays the online help.
+
+--version
+
+Displays the version of this utility.
+
+*****************************************************
+CONNECTION OPTIONS
+*****************************************************
+
+-h hostname
+
+  Specifies the host name of the machine on which the HAWQ master
+  database server is running. If not specified, reads from the
+  environment variable $PGHOST which defaults to localhost.
+
+-p port
+
+  Specifies the TCP port on which the HAWQ master database server
+  is listening for connections. If not specified, reads from the
+  environment variable $PGPORT which defaults to 5432.
+
+-U username
+
+  The database role name to connect as. If not specified, reads
+  from the environment variable $PGUSER which defaults to the current
+  system user name.
+
+*****************************************************
+EXAMPLES
+*****************************************************
+
+Run "hawq register" to register a parquet file with path '/temp/hive.paq'
+in hdfs generated by hive into table 'parquet_table' in HAWQ, which is in the
+database named 'postgres'.
+Assume the location of the database is 'hdfs://localhost:8020/hawq_default',
+tablespace id is '16385', database id is '16387', table filenode id is '77160',
+last file under the filenode numbered '7'.
+
+$ hawq register postgres parquet_table /temp/hive.paq
+
+This will move the file '/temp/hive.paq' into the corresponding new place
+'hdfs://localhost:8020/hawq_default/16385/16387/77160/8' in hdfs, then
+update the meta data of the table 'parquet_table' in HAWQ which is in the
+table 'pg_aoseg.pg_paqseg_77160'.
+
+*****************************************************
+DATA TYPES
+*****************************************************
+The data types used in HAWQ and parquet format are not the same, so there is a
+mapping between them, concluded as follow:
+
+Data types in HAWQ              Data types in parquet
+bool                            boolean
+int2                            int32
+int4                            int32
+date                            int32
+int8                            int64
+time                            int64
+timestamptz                     int64
+timestamp                       int64
+money                           int64
+float4                          float
+float8                          double
+bit                             byte_array
+varbit                          byte_array
+byte                            byte_array
+numeric                         byte_array
+name                            byte_array
+char                            byte_array
+bpchar                          byte_array
+varchar                         byte_array
+text                            byte_array
+xml                             byte_array
+timetz                          byte_array
+interval                        byte_array
+macaddr                         byte_array
+inet                            byte_array
+cidr                            byte_array


Mime
View raw message