hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject incubator-hawq git commit: HAWQ-991. Bugfix for hawq register under --force mode.
Date Tue, 27 Sep 2016 07:58:55 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 9b7f90b74 -> fd0b25dd7


HAWQ-991. Bugfix for hawq register under --force mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fd0b25dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fd0b25dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fd0b25dd

Branch: refs/heads/master
Commit: fd0b25dd7aa9ce964dad8453d341b8ab97ec8f7c
Parents: 9b7f90b
Author: xunzhang <xunzhangthu@gmail.com>
Authored: Tue Sep 27 15:34:36 2016 +0800
Committer: rlei <rlei@pivotal.io>
Committed: Tue Sep 27 15:58:22 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 32 +++++++++++++++++++++-----------
 1 file changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd0b25dd/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 6c01b77..cc65491 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -117,7 +117,7 @@ def register_yaml_dict_check(D, table_column_num, src_tablename):
         yml_column_num = len(D['AO_Schema'])
     if table_column_num != yml_column_num and table_column_num > 0:
         logger.error('Column number of table in yaml file is not equals to the column number
of table %s.' % src_tablename)
-        sys.exit(1) 
+        sys.exit(1)
 
 
 class FailureHandler(object):
@@ -178,7 +178,7 @@ class GpRegisterAccessor(object):
     def get_table_existed(self, tablename):
         qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
         return self.exec_query(qry)[0]['count'] == 1
-        
+
     def get_table_column_num(self, tablename):
         qry = """select count(*) from pg_attribute ,pg_class where pg_class.relname = '%s'
and pg_class.oid = pg_attribute.attrelid and attnum > 0;""" % tablename.split('.')[-1].lower()
         return self.exec_query(qry)[0]['count']
@@ -277,6 +277,11 @@ class GpRegisterAccessor(object):
         tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']),
str(D['relfilenode']), ''])
         return firstsegno, tabledir
 
+    def get_metadata_from_seg_name(self, seg_name):
+        query = 'select segno, eof from pg_aoseg.%s;' % seg_name
+        rows = self.exec_query(query)
+        return [str(row['segno']) for row in rows], [int(row['eof']) for row in rows]
+
     def get_database_encoding_indx(self, database):
         query = "select encoding from pg_database where datname = '%s';" % database
         return self.exec_query(query)[0]['encoding']
@@ -361,6 +366,9 @@ class HawqRegister(object):
         def get_metadata():
             return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
 
+        def get_metadata_from_table():
+            return self.accessor.get_metadata_from_seg_name(self.seg_name)
+
         def get_distribution_policy():
             return self.accessor.get_distribution_policy_info(self.tablename)
 
@@ -544,7 +552,8 @@ class HawqRegister(object):
                 logger.error("In repair mode, file path from yaml file should be the same
with table's path.")
                 self.failure_handler.rollback()
                 sys.exit(1)
-            existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
+            seg_list, existed_sizes = get_metadata_from_table()
+            existed_files = [self.tabledir + seg for seg in seg_list]
             existed_info = {}
             for k, fn in enumerate(existed_files):
                 existed_info[fn] = existed_sizes[k]
@@ -567,11 +576,12 @@ class HawqRegister(object):
         if not self.yml:
             check_no_regex_filepath([self.filepath])
             self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
-       
+
         self.do_not_move, self.files_update, self.sizes_update = False, [], []
         self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes]
         if self.mode == 'force':
-            existed_files, _ = self._get_files_in_hdfs(self.tabledir)
+            seg_list, _ = get_metadata_from_table()
+            existed_files = [self.tabledir + seg for seg in seg_list]
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files
in yaml configuration file. Otherwise you should drop the previous table before register --force.')
@@ -605,7 +615,7 @@ class HawqRegister(object):
 
         self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
 
-        print 'New file(s) to be registered: ', self.files
+        print 'New file(s) to be registered: ', self.newfiles
         if self.files_update:
             print 'Catalog info need to be updated for these files: ', self.files_update
 
@@ -734,15 +744,15 @@ class HawqRegister(object):
             query += "begin transaction;"
             segno_lst = [f.split('/')[-1] for f in self.files]
             query += "delete from pg_aoseg.%s;" % (self.seg_name)
-            self.firstsegno = 1
+            firstsegno = 1
             if self.file_format == 'Parquet':
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
self.firstsegno, eofs[0], -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
firstsegno, eofs[0], -1, -1)
                 for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
+                    query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
             else:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
self.firstsegno, eofs[0], -1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
firstsegno, eofs[0], -1, -1, -1)
                 for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1,
-1, -1)
+                    query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1,
-1)
             query += ';'
             query += "end transaction;"
         elif mode == 'update':


Mime
View raw message