hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lil...@apache.org
Subject [1/2] incubator-hawq git commit: HAWQ-1024. Add rollback before all necessary exit.
Date Fri, 23 Sep 2016 06:19:10 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 21b867a10 -> 921b908ef


HAWQ-1024. Add rollback before all necessary exit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31363940
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31363940
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31363940

Branch: refs/heads/master
Commit: 313639400b5a5bdf8afcb8c3123f141b580f238b
Parents: 21b867a
Author: xunzhang <xunzhangthu@gmail.com>
Authored: Fri Sep 23 11:23:58 2016 +0800
Committer: xunzhang <xunzhangthu@gmail.com>
Committed: Fri Sep 23 11:23:58 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31363940/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 08041b4..570253f 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -136,6 +136,7 @@ class FailureHandler(object):
         return ' '.join(lst[:-2] + [lst[-1], lst[-2]])
 
     def rollback(self):
+        logger.info('Error found, Hawqregister starts to rollback...')
         for (typ, cmd) in reversed(self.operations):
             if typ == 'SQL':
                 sql = self.assemble_SQL(cmd)
@@ -220,9 +221,11 @@ class GpRegisterAccessor(object):
         rows = self.exec_query(qry)
         if len(rows) == 0:
             logger.error('Table %s is not an append-only table. There is no record in gp_distribution_policy
table.' % tablename)
+            self.failure_handler.rollback()
             sys.exit(1)
         if rows[0]['attrnums']:
             logger.error('Cannot register file(s) to a table which is hash distributed.')
+            self.failure_handler.rollback()
             sys.exit(1)
 
     # pg_paqseg_#
@@ -233,11 +236,13 @@ class GpRegisterAccessor(object):
         rows = self.exec_query(query)
         if len(rows) == 0:
             logger.error('table "%s" not found in db "%s"' % (tablename, database))
+            self.failure_handler.rollback()
             sys.exit(1)
         relname = rows[0]['relname']
         if fmt == 'Parquet':
             if relname.find('paq') == -1:
                 logger.error("table '%s' is not parquet format" % tablename)
+                self.failure_handler.rollback()
                 sys.exit(1)
         return relname
 
@@ -331,6 +336,7 @@ class HawqRegister(object):
             if self.distribution_policy.startswith('DISTRIBUTED BY'):
                 if len(self.files) % self.bucket_number != 0:
                     logger.error('Files to be registered must be multiple times to the bucket
number of hash table.')
+                    self.failure_handler.rollback()
                     sys.exit(1)
 
         def check_database_encoding():
@@ -366,6 +372,7 @@ class HawqRegister(object):
                     return
                 else:
                     logger.error('Distribution policy of %s from yaml file is not consistent
with the policy of existing table.' % self.tablename)
+                    self.failure_handler.rollback()
                     sys.exit(1)
             tmp_dict = {}
             for i, d in enumerate(self.schema):
@@ -375,6 +382,7 @@ class HawqRegister(object):
             original_policy = ','.join([str(tmp_dict[col]) for col in cols])
             if policy.strip('{').strip('}') != original_policy:
                 logger.error('Distribution policy of %s from yaml file is not consistent
with the policy of existing table.' % self.tablename)
+                self.failure_handler.rollback()
                 sys.exit(1)
 
         def check_bucket_number():
@@ -383,6 +391,7 @@ class HawqRegister(object):
 
             if self.bucket_number != get_bucket_number():
                 logger.error('Bucket number of %s is not consistent with previous bucket
number.' % self.tablename)
+                self.failure_handler.rollback()
                 sys.exit(1)
 
         def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy,
file_locations,\
@@ -476,12 +485,14 @@ class HawqRegister(object):
             for sz in self.sizes:
                 if sz < 0:
                     logger.error('File size(%s) in yaml configuration file should not be
less than 0.' % sz)
+                    self.failure_handler.rollback()
                     sys.exit(1)
             for k, fn in enumerate(self.files):
                 hdfscmd = 'hdfs dfs -du %s' % fn
                 _, out, _ = local_ssh_output(hdfscmd)
                 if self.sizes[k] > int(out.strip().split()[0]):
                     logger.error('File size(%s) in yaml configuration file should not exceed
actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn))
+                    self.failure_handler.rollback()
                     sys.exit(1)
 
         def check_no_regex_filepath(files):
@@ -490,10 +501,12 @@ class HawqRegister(object):
                 for v in tmp_lst:
                     if v == '.':
                         logger.error('Hawq register does not support file path with regex:
%s.' % fn)
+                        self.failure_handler.rollback()
                         sys.exit(1)
                 for ch in ['..', '*']:
                     if fn.find(ch) != -1:
                         logger.error('Hawq register does not support file path with regex:
%s.' % fn)
+                        self.failure_handler.rollback()
                         sys.exit(1)
 
         if self.yml:
@@ -524,6 +537,7 @@ class HawqRegister(object):
         if self.mode == 'repair':
             if self.tabledir.strip('/') != self.filepath.strip('/'):
                 logger.error("In repair mode, file path from yaml file should be the same
with table's path.")
+                self.failure_handler.rollback()
                 sys.exit(1)
             existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
             existed_info = {}
@@ -532,14 +546,17 @@ class HawqRegister(object):
             for k, fn in enumerate(self.files):
                 if fn not in existed_files:
                     logger.error('Can not register in repair mode since giving non-existing
file: %s.' % fn)
+                    self.failure_handler.rollback()
                     sys.exit(1)
                 if self.sizes[k] > existed_info[fn]:
                     logger.error('Can not register in repair mode since giving larger file
size: %s' % self.sizes[k])
+                    self.failure_handler.rollback()
                     sys.exit(1)
 
         if self.mode == 'usage2_table_exist':
             if self.tabledir.strip('/') == self.filepath.strip('/'):
                 logger.error('Files to be registered should not be the same with table path.')
+                self.failure_handler.rollback()
                 sys.exit(1)
 
         self.do_not_move, self.files_update, self.sizes_update = False, [], []
@@ -548,12 +565,14 @@ class HawqRegister(object):
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files
in yaml configuration file. Otherwise you should drop the previous table before register --force.')
+                    self.failure_handler.rollback()
                     sys.exit(1)
                 else:
                     self.do_not_move, self.files_update, self.sizes_update = True, self.files,
self.sizes
                     self.files, self.sizes = [], []
             elif len(self.files) < len(existed_files):
                 logger.error('In force mode, you should include existing table files in yaml
configuration file. Otherwise you should drop the previous table before register --force.')
+                self.failure_handler.rollback()
                 sys.exit(1)
             else:
                 files_old, sizes_old = [f for f in self.files], [sz for sz in self.sizes]
@@ -565,6 +584,7 @@ class HawqRegister(object):
                         self.sizes.remove(sizes_old[k])
                 if sorted(self.files_update) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files
in yaml configuration file. Otherwise you should drop the previous table before register --force.')
+                    self.failure_handler.rollback()
                     sys.exit(1)
 
         elif self.mode == 'repair':
@@ -588,6 +608,7 @@ class HawqRegister(object):
         if self.filesize is not None:
             if len(self.files) != 1:
                 logger.error('-e option is only supported with single file case.')
+                self.failure_handler.rollback()
                 sys.exit(1)
             self.sizes = [self.filesize]
         check_sizes_valid()
@@ -609,12 +630,14 @@ class HawqRegister(object):
         filesystem = filepath.split('://')
         if filesystem[0] != 'hdfs':
             logger.error('Only support registering file(s) in hdfs.')
+            self.failure_handler.rollback()
             sys.exit(1)
         fileroot = filepath.split('/')
         tableroot = tabledir.split('/')
         # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check
'hdfs://localohst:8020'
         if fileroot[0:3] != tableroot[0:3]:
             logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s)
to be registered: '%s'\nTable path in HDFS: '%s'." % (filepath, tabledir))
+            self.failure_handler.rollback()
             sys.exit(1)
 
     def _get_files_in_hdfs(self, filepath):
@@ -624,6 +647,7 @@ class HawqRegister(object):
         result = local_ssh(hdfscmd, logger)
         if result != 0:
             logger.error("Path '%s' does not exist in hdfs" % filepath)
+            self.failure_handler.rollback()
             sys.exit(1)
         hdfscmd = "hdfs dfs -ls -R %s" % filepath
         result, out, err = local_ssh_output(hdfscmd)
@@ -636,6 +660,7 @@ class HawqRegister(object):
                 sizes.append(int(lineargs[4]))
         if len(files) == 0 and self.mode != 'force':
             logger.error("Dir '%s' is empty" % filepath)
+            self.failure_handler.rollback()
             sys.exit(1)
         return files, sizes
 
@@ -652,6 +677,7 @@ class HawqRegister(object):
             result2 = local_ssh(hdfscmd, logger)
             if result1 or result2:
                 logger.error('File %s is not parquet format' % f)
+                self.failure_handler.rollback()
                 sys.exit(1)
 
     def _move_files_in_hdfs(self):
@@ -678,6 +704,7 @@ class HawqRegister(object):
             result = local_ssh(hdfscmd, logger)
             if result != 0:
                 logger.error('Fail to delete %s ' % fn)
+                self.failure_handler.rollback()
                 sys.exit(1)
 
     def _modify_metadata(self, mode):


Mime
View raw message