hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject incubator-hawq git commit: HAWQ-1104. extract and register tupcount/varblocknumber/eofuncompressed
Date Thu, 27 Oct 2016 03:26:27 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 85690dc92 -> 00286d7f2


HAWQ-1104. extract and register tupcount/varblocknumber/eofuncompressed


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/00286d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/00286d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/00286d7f

Branch: refs/heads/master
Commit: 00286d7f2e3528efc57f494f4269a146f5eff5d7
Parents: 85690dc
Author: Wen Lin <wlin@pivotal.io>
Authored: Thu Oct 27 11:25:55 2016 +0800
Committer: Wen Lin <wlin@pivotal.io>
Committed: Thu Oct 27 11:25:55 2016 +0800

----------------------------------------------------------------------
 .../test_hawq_register_usage2_case1.cpp         |  64 +++++++----
 src/test/feature/lib/sql_util.cpp               |  11 ++
 src/test/feature/lib/sql_util.h                 |   2 +
 tools/bin/hawqextract                           |  24 ++---
 tools/bin/hawqregister                          | 106 +++++++++++++------
 5 files changed, 140 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
index 0b5eabf..8411f0c 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
@@ -15,44 +15,54 @@ using hawq::test::Command;
 using hawq::test::HdfsConfig;
 
 TEST_F(TestHawqRegister, TestUsage2Case1EmptyTable) {
-  SQLUtility util;
-  util.execute("drop table if exists t9;");
-  util.execute("create table t9(i int) with (appendonly=true, orientation=row) distributed
randomly;");
-  EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o t9.yml
testhawqregister_testusage2case1emptytable.t9"));
-  EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c t9.yml
testhawqregister_testusage2case1emptytable.nt9"));
-  util.query("select * from nt9;", 0);
-  EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml"));
-  util.execute("drop table t9;");
-  util.execute("drop table nt9;");
+    SQLUtility util;
+    util.execute("drop table if exists t9;");
+    util.execute("create table t9(i int) with (appendonly=true, orientation=row) distributed
randomly;");
+    EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o t9.yml
testhawqregister_testusage2case1emptytable.t9"));
+    EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
t9.yml testhawqregister_testusage2case1emptytable.nt9"));
+    util.query("select * from nt9;", 0);
+    std::string reloid = getTableOid("nt9");
+    /* An empty table has no row in pg_aoseg.pg_aoseg_xxx table */
+    util.query(hawq::test::stringFormat("select * from pg_aoseg.pg_aoseg_%s;", reloid.c_str()),
0);
+    EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml"));
+    util.execute("drop table t9;");
+    util.execute("drop table nt9;");
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case1IncorrectYaml) {
-  SQLUtility util;
-  string filePath = util.getTestRootPath() + "/ManagementTool/";
-
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "missing_pagesize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "missing_rowgroupsize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "missing_filesize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "wrong_schema.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "missing_checksum.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "wrong_dfs_url.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "missing_bucketnum.yml xx"));
+    SQLUtility util;
+    string filePath = util.getTestRootPath() + "/ManagementTool/";
+
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "missing_pagesize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "missing_rowgroupsize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "missing_filesize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "wrong_schema.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "missing_checksum.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "wrong_dfs_url.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "missing_bucketnum.yml xx"));
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case1MismatchFileNumber) {
-  SQLUtility util;
-  string filePath = util.getTestRootPath() + "/ManagementTool/";
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c "
+ filePath + "files_incomplete.yml xx"));
+    SQLUtility util;
+    string filePath = util.getTestRootPath() + "/ManagementTool/";
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c
" + filePath + "files_incomplete.yml xx"));
 }
 
 
 TEST_F(TestHawqRegister, TestUsage2Case1Expected) {
     SQLUtility util;
+    string fmt_prefix;
     std::vector<string> create_table_matrix = {"distributed by (i)", "distributed randomly"};
     std::vector<string> fmt_matrix = {"row", "parquet"};
     int suffix=0;
+
     for (auto & ddl : create_table_matrix) {
         for (auto & fmt : fmt_matrix) {
+            if (fmt.compare("row") == 0)
+                fmt_prefix = "aoseg";
+            else
+                fmt_prefix = "paqseg";
+
             suffix++;
             auto t = hawq::test::stringFormat("t_usage2_case1_%s", std::to_string(suffix).c_str());
             auto nt = hawq::test::stringFormat("nt_usage2_case1_%s", std::to_string(suffix).c_str());
@@ -63,10 +73,20 @@ TEST_F(TestHawqRegister, TestUsage2Case1Expected) {
             util.execute(hawq::test::stringFormat("create table %s(i int) with (appendonly=true,
orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str()));
             util.execute(hawq::test::stringFormat("insert into %s select generate_series(1,
100);", t.c_str()));
             util.query(hawq::test::stringFormat("select * from %s;", t.c_str()), 100);
+
+            // get pg_aoseg.pg_xxxseg_xxx table
+            std::string reloid1 = getTableOid(t.c_str());
+            string result1 = util.getQueryResultSetString(hawq::test::stringFormat("select
* from pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid1.c_str()));
+
             EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("hawq extract
-d %s -o t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, std::to_string(suffix).c_str(),
t.c_str())));
             EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("hawq register
-d %s -c t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, std::to_string(suffix).c_str(),
nt.c_str())));
             util.query(hawq::test::stringFormat("select * from %s;", nt.c_str()), 100);
 
+            // check pg_aoseg.pg_xxxseg_xxx table
+            std::string reloid2 = getTableOid(nt.c_str());
+            string result2 = util.getQueryResultSetString(hawq::test::stringFormat("select
* from pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid2.c_str()));
+            EXPECT_EQ(result1, result2);
+
             // hawq register -d hawq_feature_test -c t_usage2_case1_#.yml nt_usage2_case1_#,
where nt_usage2_case1_# exists
             util.execute(hawq::test::stringFormat("drop table if exists %s;", t.c_str()));
             util.execute(hawq::test::stringFormat("create table %s(i int) with (appendonly=true,
orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str()));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/lib/sql_util.cpp b/src/test/feature/lib/sql_util.cpp
index f0568a2..6eb34eb 100644
--- a/src/test/feature/lib/sql_util.cpp
+++ b/src/test/feature/lib/sql_util.cpp
@@ -268,6 +268,17 @@ std::string SQLUtility::getQueryResult(const std::string &query)
{
   return value;
 }
 
+std::string SQLUtility::getQueryResultSetString(const std::string &query) {
+  const hawq::test::PSQLQueryResult &result = executeQuery(query);
+  std::vector<std::vector<string> > resultString = result.getRows();
+  string resultStr;
+  for (auto row : result.getRows()) {
+    for (auto column : row) resultStr += column + "|";
+    resultStr += "\n";
+  }
+  return resultStr;
+}
+
 FilePath SQLUtility::splitFilePath(const string &filePath) const {
   FilePath fp;
   size_t found1 = filePath.find_last_of("/");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.h
----------------------------------------------------------------------
diff --git a/src/test/feature/lib/sql_util.h b/src/test/feature/lib/sql_util.h
index 9bf1f90..e6d4768 100644
--- a/src/test/feature/lib/sql_util.h
+++ b/src/test/feature/lib/sql_util.h
@@ -91,6 +91,8 @@ class SQLUtility {
   // @return the query result
   std::string getQueryResult(const std::string &query);
 
+  std::string getQueryResultSetString(const std::string &query);
+
   // execute expect error message
   // @param sql the given sql command
   // @param errmsg the expected sql error message

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
index 3a6ef66..255db84 100644
--- a/tools/bin/hawqextract
+++ b/tools/bin/hawqextract
@@ -147,17 +147,17 @@ class GpMetadataAccessor:
 
     def get_aoseg_files(self, oid):
         '''
-        Return rows in pg_aoseg_`oid` table, excluding
-        rows whose content id is -1.
+        Return rows in pg_aoseg_`oid` table
 
         Example:
         >>> accessor.get_aoseg_files(35709)
-        >>> [{'fileno':'1', 'filesize':'320'},
-        ...  {'fileno':'2', 'filesize':'880'},
-        ...  {'fileno':'3', 'filesize':'160'}]
+        >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 'varblockcount':'2',
eofuncompressed:'320'},
+        ...  {'fileno':'2', 'filesize':'880', 'tupcount':'27', 'varblockcount':'3', eofuncompressed:'880'},
+        ...  {'fileno':'3', 'filesize':'160', 'tupcount':'5', 'varblockcount':'2', eofuncompressed:'160'}]
         '''
         qry = """
-        SELECT segno as fileno, eof as filesize
+        SELECT segno as fileno, eof as filesize, tupcount as tupcount,
+        varblockcount as varblockcount, eofuncompressed as eofuncompressed
         FROM pg_aoseg.pg_aoseg_%d
         ORDER by fileno;
         """ % oid
@@ -170,12 +170,12 @@ class GpMetadataAccessor:
 
         Example:
         >>> accessor.get_paqseg_files(35709)
-        >>> [{'fileno':'1', 'filesize':'320'},
-        ...  {'fileno':'2', 'filesize':'880'},
-        ...  {'fileno':'3', 'filesize':'160'}]
+        >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 'eofuncompressed':'320'},
+        ...  {'fileno':'2', 'filesize':'880', 'tupcount':'27', 'eofuncompressed':'880'},
+        ...  {'fileno':'3', 'filesize':'160', 'tupcount':'5', 'eofuncompressed':'160'}]
         '''
         qry = """
-        SELECT segno as fileno, eof as filesize
+        SELECT segno as fileno, eof as filesize, tupcount, eofuncompressed
         FROM pg_aoseg.pg_paqseg_%d
         ORDER by fileno;
         """ % oid
@@ -357,7 +357,7 @@ def extract_metadata(conn, tbname):
                     relfilenode,
                     f['fileno']
             )
-            files.append({'path': path, 'size': int(f['filesize'])})
+            files.append({'path': path, 'size': int(f['filesize']), 'tupcount': int(f['tupcount']),
'varblockcount': int(f['varblockcount']), 'eofuncompressed': int(f['eofuncompressed'])})
         return files
 
     def get_parquet_table_files(oid, relfilenode):
@@ -375,7 +375,7 @@ def extract_metadata(conn, tbname):
                     relfilenode,
                     f['fileno']
             )
-            files.append({'path': path, 'size': int(f['filesize'])})
+            files.append({'path': path, 'size': int(f['filesize']), 'tupcount': int(f['tupcount']),
'eofuncompressed': int(f['eofuncompressed'])})
         return files
 
     def extract_AO_metadata():

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 82ce274..46a9a8f 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -372,12 +372,15 @@ class HawqRegister(object):
             self.failure_handler.rollback()
             sys.exit(1)
 
-    def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, distribution_policy,
file_locations,\
+    def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts,
tablename, schema, distribution_policy, file_locations,\
                       bucket_number, partitionby, partitions_constraint, partitions_name,
partitions_compression_level,\
                       partitions_compression_type, partitions_checksum, partitions_filepaths,
partitions_filesizes, encoding):
         self.file_format = file_format
         self.files = files
         self.sizes = sizes
+        self.tupcounts = tupcounts
+        self.eofuncompresseds = eofuncompresseds
+        self.varblockcounts = varblockcounts
         self.src_table_name = tablename
         self.schema = schema
         self.distribution_policy = distribution_policy
@@ -392,9 +395,6 @@ class HawqRegister(object):
         self.partitions_filepaths = partitions_filepaths
         self.partitions_filesizes = partitions_filesizes
         self.encoding = encoding
-        self.files_same_path = []
-        self.sizes_same_path = []
-        self.segnos_same_path = []
 
     def _option_parser_yml(self, yml_file):
         import yaml
@@ -414,7 +414,7 @@ class HawqRegister(object):
         partitions_checksum = []
         partitions_compression_level = []
         partitions_compression_type = []
-        files, sizes = [], []
+        files, sizes, tupcounts, eofuncompresseds, varblockcounts = [], [], [], [], []
 
         if params['FileFormat'].lower() == 'parquet':
             Format = 'Parquet'
@@ -437,11 +437,16 @@ class HawqRegister(object):
                     partitions_filesizes.append([item['size'] for item in pfile])
             partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']]
         if len(params[Format_FileLocations]['Files']):
-            files, sizes = [params['DFS_URL'] + d['path'] for d in params[Format_FileLocations]['Files']],
[d['size'] for d in params[Format_FileLocations]['Files']]
+            for ele in params[Format_FileLocations]['Files']:
+                files.append(params['DFS_URL'] + ele['path'])
+                sizes.append(ele['size'])
+                tupcounts.append(ele['tupcount'] if ele.has_key('tupcount') else -1)
+                eofuncompresseds.append(ele['eofuncompressed'] if ele.has_key('eofuncompressed')
else -1)
+                varblockcounts.append(ele['varblockcount'] if ele.has_key('varblockcount')
else -1)
+
         encoding = params['Encoding']
         bucketNum = params['Bucketnum'] if params['Distribution_Policy'].startswith('DISTRIBUTED
BY') else 6
-        self._set_yml_dataa(Format, files, sizes, params['TableName'], params['%s_Schema'
% Format], params['Distribution_Policy'], params[Format_FileLocations], bucketNum, partitionby,\
-                      partitions_constraint, partitions_name, partitions_compression_level,
partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes,
encoding)
+        self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, varblockcounts,
params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations],
bucketNum, partitionby, partitions_constraint, partitions_name, partitions_compression_level,
partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes,
encoding)
 
 
     # check conflicting distributed policy
@@ -524,6 +529,12 @@ class HawqRegister(object):
                     sys.exit(1)
 
     def _init(self):
+        self.files_same_path = []
+        self.sizes_same_path = []
+        self.tupcounts_same_path = []
+        self.varblockcounts_same_path = []
+        self.eofuncompresseds_same_path = []
+        self.segnos_same_path = []
         if self.yml:
             self._option_parser_yml(options.yml_config)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
@@ -537,9 +548,6 @@ class HawqRegister(object):
                 logger.error('-e option is only supported with single file case.')
                 sys.exit(1)
             self.file_format = 'Parquet'
-            self.files_same_path = []
-            self.sizes_same_path = []
-            self.segnos_same_path = []
             self._check_hash_type() # Usage1 only support randomly distributed table
         self.queries = "set allow_system_table_mods='dml';"
         self.queries += "begin transaction;"
@@ -602,9 +610,10 @@ class HawqRegister(object):
         if not self.yml:
             self._check_no_regex_filepath([self.filepath])
             self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
+            self.tupcounts = self.eofuncompresseds = self.varblockcounts = [-1 for i in range(len(self.files))]
 
-        self.do_not_move, self.files_update, self.sizes_update = False, [], []
-        self.files_append, self.sizes_append = [f for f in self.files], [sz for sz in self.sizes]
+        self.do_not_move, self.files_update, self.sizes_update, self.tupcounts_update, self.eofuncompresseds_update,
self.varblockcounts_update = False, [], [], [], [], []
+        self.files_append, self.sizes_append, self.tupcounts_append, self.eofuncompresseds_append,
self.varblockcounts_append = [f for f in self.files], [sz for sz in self.sizes], [tc for tc
in self.tupcounts], [eof for eof in self.eofuncompresseds], [v for v in self.varblockcounts]
         if self.mode == 'force':
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
@@ -612,8 +621,8 @@ class HawqRegister(object):
                     self.failure_handler.rollback()
                     sys.exit(1)
                 else:
-                    self.do_not_move, self.files_update, self.sizes_update = True, self.files,
self.sizes
-                self.files_append, self.sizes_append = [],[]
+                    self.do_not_move, self.files_update, self.sizes_update, self.tupcounts_update,
self.eofuncompresseds_update, self.varblockcounts_update = True, self.files, self.sizes, self.tupcounts,
self.eofuncompresseds, self.varblockcounts
+                self.files_append, self.sizes_append, self.tupcounts_append, self.eofuncompresseds_append,
self.varblockcounts_append = [], [], [], [], []
             elif len(self.files) < len(existed_files):
                 logger.error('In force mode, you should include existing table files in yaml
configuration file. Otherwise you should drop the previous table before register --force.')
                 self.failure_handler.rollback()
@@ -623,8 +632,14 @@ class HawqRegister(object):
                     if f in existed_files:
                         self.files_update.append(self.files[k])
                         self.sizes_update.append(self.sizes[k])
+                        self.tupcounts_update.append(self.tupcounts[k])
+                        self.eofuncompresseds_update.append(self.eofuncompresseds[k])
+                        self.varblockcounts_update.append(self.varblockcounts[k])
                         self.files_append.remove(self.files[k])
                         self.sizes_append.remove(self.sizes[k])
+                        self.tupcounts_append.remove(self.tupcounts[k])
+                        self.eofuncompresseds_append.remove(self.eofuncompresseds[k])
+                        self.varblockcounts_append.remove(self.varblockcounts[k])
                 if sorted(self.files_update) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files
in yaml configuration file. Otherwise you should drop the previous table before register --force.')
                     self.failure_handler.rollback()
@@ -729,6 +744,9 @@ class HawqRegister(object):
             if seg not in catalog_lst:
                 self.files_same_path.append(self.files_update[k])
                 self.sizes_same_path.append(self.sizes_update[k])
+                self.tupcounts_same_path.append(self.tupcounts_update[k])
+                self.eofuncompresseds_same_path.append(self.eofuncompresseds_update[k])
+                self.varblockcounts_same_path.append(self.varblockcounts_update[k])
             if seg in new_catalog_lst:
                 exist_catalog_lst.append(seg)
         for seg in update_segno_lst:
@@ -778,45 +796,67 @@ class HawqRegister(object):
         append_eofs = self.sizes_append
         update_eofs = self.sizes_update
         same_path_eofs = self.sizes_same_path
+        append_tupcounts = self.tupcounts_append
+        update_tupcounts = self.tupcounts_update
+        same_path_tupcounts = self.tupcounts_same_path
+        append_eofuncompresseds = self.eofuncompresseds_append
+        update_eofuncompresseds = self.eofuncompresseds_update
+        same_path_eofuncompresseds = self.eofuncompresseds_same_path
+        append_varblockcounts = self.varblockcounts_append
+        update_varblockcounts = self.varblockcounts_update
+        same_path_varblockcounts = self.varblockcounts_same_path
         update_segno_lst = [f.split('/')[-1] for f in self.files_update]
         same_path_segno_lst = [seg for seg in self.segnos_same_path]
         query = ""
+        
         if mode == 'force':
             query += "delete from pg_aoseg.%s;" % (self.seg_name)
         
         if self.file_format == 'Parquet': 
             if len(update_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name,
update_segno_lst[0], update_eofs[0], -1, -1)
-                for k, update_eof in enumerate(update_eofs[1:]):
-                    query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof,
-1, -1)
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name,
update_segno_lst[0], update_eofs[0], update_tupcounts[0], update_eofuncompresseds[0])
+                k = 0
+                for update_eof, update_tupcount, update_eofuncompressed in zip(update_eofs[1:],
update_tupcounts[1:], update_eofuncompresseds[1:]):
+                    query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof,
update_tupcount, update_eofuncompressed)
+                    k += 1
                 query += ';'
             if len(same_path_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
same_path_segno_lst[0], same_path_eofs[0], -1, -1)
-                for k, same_path_eof in enumerate(same_path_eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof,
-1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_eofuncompresseds[0])
+                k = 0
+                for same_path_eof, same_path_tupcount, same_path_eofuncompressed in zip(same_path_eofs[1:],
same_path_tupcounts[1:], same_path_eofuncompresseds[1]):
+                    query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof,
same_path_tupcount, same_path_eofuncompressed)
+                    k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
segno, append_eofs[0], -1, -1)
-                for k, append_eof in enumerate(append_eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name,
segno, append_eofs[0], append_tupcounts[0], append_eofuncompresseds[0])
+                k = 0
+                for append_eof, append_tupcount, append_eofuncompressed in zip(append_eofs[1:],
append_tupcounts[1:], append_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount,
append_eofuncompressed)
+                    k += 1
                 query += ';'
         else:
             if len(update_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name,
update_segno_lst[0], update_eofs[0], -1, -1, -1)
-                for k, update_eof in enumerate(update_eofs[1:]):
-                    query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof,
-1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name,
update_segno_lst[0], update_eofs[0], update_tupcounts[0], update_varblockcounts[0], update_eofuncompresseds[0])
+                k = 0
+                for update_eof, update_tupcount, update_varblockcount, update_eofuncompresseds
in zip(update_eofs[1:], update_tupcounts[1:], update_varblockcounts[1:], update_eofuncompresseds[1:]):
+                    query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof,
update_tupcount, update_varblockcount, update_eofuncompresseds)
+                    k += 1
                 query += ';'
             if len(same_path_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1)
-                for k, same_path_eof in enumerate(same_path_eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof,
-1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_varblockcounts[0],
same_path_eofuncompresseds[0])
+                k = 0
+                for same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed
in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_varblockcounts[1:], same_path_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof,
same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed)
+                    k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
segno, append_eofs[0], -1, -1, -1)
-                for k, append_eof in enumerate(append_eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1,
-1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name,
segno, append_eofs[0], append_tupcounts[0], append_varblockcounts[0], append_eofuncompresseds[0])
+                k = 0
+                for append_eof, append_tupcount, append_varblockcount, append_eofuncompressed
in zip(append_eofs[1:], append_tupcounts[1:], append_varblockcounts[1:], append_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount,
append_varblockcount, append_eofuncompressed)
+                    k += 1
                 query += ';'
         self.queries += query
     


Mime
View raw message