hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hubertzh...@apache.org
Subject incubator-hawq git commit: HAWQ-1035. Add yml parser to support partition table.
Date Sun, 18 Sep 2016 08:32:57 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master ef2aef879 -> 77e245dde


HAWQ-1035. Add yml parser to support partition table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/77e245dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/77e245dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/77e245dd

Branch: refs/heads/master
Commit: 77e245ddeb3e619159eec90e4934eadc38ed259c
Parents: ef2aef8
Author: hzhang2 <zhanghuan929@163.com>
Authored: Sun Sep 18 16:32:33 2016 +0800
Committer: hzhang2 <zhanghuan929@163.com>
Committed: Sun Sep 18 16:32:33 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 92 +++++++++++++++++++++++++++++++++++----------
 1 file changed, 72 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/77e245dd/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index ffae437..0c75662 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -39,7 +39,6 @@ logger = get_default_logger()
 EXECNAME = os.path.split(__file__)[-1]
 setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
 
-
 def option_parser():
     '''option parser'''
     parser = OptParser(option_class=OptChecker,
@@ -110,23 +109,6 @@ def register_yaml_dict_check(D):
                 logger.error('Wrong configuration yaml file format: "%s" attribute does not
exist.\n See example in "hawq register --help".' % 'AO_FileLocations.%s' % attr)
                 sys.exit(1)
 
-
-def option_parser_yml(yml_file):
-    import yaml
-    with open(yml_file, 'r') as f:
-        params = yaml.load(f)
-    register_yaml_dict_check(params)
-    if params['FileFormat'].lower() == 'parquet':
-        if not len(params['Parquet_FileLocations']['Files']):
-            return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'],
params['Parquet_FileLocations'], params['Bucketnum']
-        files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']],
[d['size'] for d in params['Parquet_FileLocations']['Files']]
-        return 'Parquet', files, sizes, params['Parquet_Schema'], params['Distribution_Policy'],
params['Parquet_FileLocations'], params['Bucketnum']
-    if not len(params['AO_FileLocations']['Files']):
-        return 'AO', [], [], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'],
params['Bucketnum']
-    files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']],
[d['size'] for d in params['AO_FileLocations']['Files']]
-    return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'],
params['Bucketnum']
-
-
 class GpRegisterAccessor(object):
     def __init__(self, conn):
         self.conn = conn
@@ -297,9 +279,79 @@ class HawqRegister(object):
             if self.bucket_number != get_bucket_number():
                 logger.error('Bucket number of %s is not consistent with previous bucket
number.' % self.tablename)
                 sys.exit(1)
-
+        
+        def set_yml_dataa(file_format, files, sizes, schema, distribution_policy, file_locations,\
+                          bucket_number, partitionby, partitions_constraint, partitions_name,
partitions_compression_level,\
+                          partitions_compression_type, partitions_checksum, partitions_filepaths,
partitions_filesizes):
+            self.file_format = file_format
+            self.files = files
+            self.sizes = sizes
+            self.schema = schema
+            self.distribution_policy = distribution_policy
+            self.file_locations = file_locations
+            self.bucket_number = bucket_number
+            self.partitionby = partitionby
+            self.partitions_constraint = partitions_constraint
+            self.partitions_name = partitions_name 
+            self.partitions_compression_level = partitions_compression_level
+            self.partitions_compression_type = partitions_compression_type
+            self.partitions_checksum = partitions_checksum
+            self.partitions_filepaths = partitions_filepaths 
+            self.partitions_filesizes = partitions_filesizes
+
+        def option_parser_yml(yml_file):
+            import yaml
+            with open(yml_file, 'r') as f:
+                params = yaml.load(f)
+            register_yaml_dict_check(params)
+            partitions_filepaths = []
+            partitions_filesizes = []
+            partitions_constraint = []
+            partitions_name = []
+            partitions_checksum = []
+            partitions_compression_level = []
+            partitions_compression_type = []
+            files = []
+            sizes = []
+            
+            if params['FileFormat'].lower() == 'parquet':
+                partitionby = params.get('Parquet_FileLocations').get('PartitionBy')
+                if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']):
+                    partitions_checksum = [d['Checksum'] for d in params['Parquet_FileLocations']['Partitions']]
+                    partitions_compression_level = [d['CompressionLevel'] for d in params['Parquet_FileLocations']['Partitions']]
+                    partitions_compression_type = [d['CompressionType'] for d in params['Parquet_FileLocations']['Partitions']]
+                    partitions_constraint = [d['Constraint'] for d in params['Parquet_FileLocations']['Partitions']]
+                    partitions_files = [d['Files'] for d in params['Parquet_FileLocations']['Partitions']]
+                    if len(partitions_files):
+                        for pfile in partitions_files:
+                            partitions_filepaths.append([params['DFS_URL'] + item['path']
for item in pfile])
+                            partitions_filesizes.append([item['size'] for item in pfile])
+                    partitions_name = [d['Name'] for d in params['Parquet_FileLocations']['Partitions']]
+                if len(params['Parquet_FileLocations']['Files']):
+                    files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']],
[d['size'] for d in params['Parquet_FileLocations']['Files']]
+                set_yml_dataa('Parquet', files, sizes, params['Parquet_Schema'], params['Distribution_Policy'],
params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\
+                              partitions_constraint, partitions_name, partitions_compression_level,
partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes)
+                
+            else: #AO format
+                partitionby = params.get('AO_FileLocations').get('PartitionBy')
+                if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']):
+                    partitions_checksum = [d['Checksum'] for d in params['AO_FileLocations']['Partitions']]
+                    partitions_compressionLevel = [d['CompressionLevel'] for d in params['AO_FileLocations']['Partitions']]
+                    partitions_compressionType = [d['CompressionType'] for d in params['AO_FileLocations']['Partitions']]
+                    partitions_constraint = [d['Constraint'] for d in params['AO_FileLocations']['Partitions']]
+                    partitions_files = [d['Files'] for d in params['AO_FileLocations']['Partitions']]
+                    if len(partitions_files):
+                        for pfile in partitions_files:
+                            partitions_filepaths.append([params['DFS_URL'] + item['path']
for item in pfile])
+                            partitions_filesizes.append([item['size'] for item in pfile])
+                    partitions_name = [d['Name'] for d in params['AO_FileLocations']['Partitions']]
+                if len(params['AO_FileLocations']['Files']):
+                    files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']],
[d['size'] for d in params['AO_FileLocations']['Files']]
+                set_yml_dataa('AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'],
params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\
+                              partitions_name, partitions_compression_level, partitions_compression_type,
partitions_checksum, partitions_filepaths, partitions_filesizes)
+                
         if self.yml:
-            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy,
self.file_locations, self.bucket_number = option_parser_yml(self.yml)
+            option_parser_yml(options.yml_config)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
             check_distribution_policy()
             if self.mode != 'force' and self.mode != 'repair':


Mime
View raw message