Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BCB72200CD7 for ; Tue, 1 Aug 2017 17:03:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BAB001669C0; Tue, 1 Aug 2017 15:03:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8C1CF1669A3 for ; Tue, 1 Aug 2017 17:03:24 +0200 (CEST) Received: (qmail 84636 invoked by uid 500); 1 Aug 2017 15:03:23 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 84627 invoked by uid 99); 1 Aug 2017 15:03:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Aug 2017 15:03:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 56FFA180777 for ; Tue, 1 Aug 2017 15:03:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id fygU8iF8BYMZ for ; Tue, 1 Aug 2017 15:03:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id DFCF65FB40 for ; Tue, 1 Aug 2017 15:03:18 +0000 (UTC) Received: (qmail 84518 invoked by uid 99); 1 Aug 2017 15:03:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Aug 2017 15:03:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F6E4E9471; Tue, 1 Aug 2017 15:03:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Date: Tue, 01 Aug 2017 15:03:18 -0000 Message-Id: <1d67b9d1a1204ff6995ad801bd128d2e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] incubator-impala git commit: IMPALA-5009: Clean up test_insert_parquet.py archived-at: Tue, 01 Aug 2017 15:03:25 -0000 IMPALA-5009: Clean up test_insert_parquet.py Replace make_tmp_dir with py.test's own tmpdir Change-Id: Ia84c78d7ff74cc7fdb3d782060caa5e52d0cd7d2 Reviewed-on: http://gerrit.cloudera.org:8080/7518 Reviewed-by: David Knupp Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6d400c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6d400c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6d400c9 Branch: refs/heads/master Commit: b6d400c9d80d5e69d827d2a05f2a5fbae6e492c5 Parents: 229f12c Author: Lars Volker Authored: Wed Jul 26 16:32:48 2017 -0700 Committer: Impala Public Jenkins Committed: Tue Aug 1 00:41:09 2017 +0000 ---------------------------------------------------------------------- tests/query_test/test_insert_parquet.py | 147 +++++++++++++-------------- 1 file changed, 72 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6d400c9/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index 7cd23f7..dc73a2d 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -24,7 +24,6 @@ from datetime import datetime from decimal import Decimal from shutil import rmtree from subprocess import check_call -from tempfile import mkdtemp as make_tmp_dir from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder from tests.common.environ import impalad_basedir @@ -97,10 +96,10 @@ class TestInsertParquetQueries(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES)) - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').compression_codec == 'none') @SkipIfLocal.multiple_impalad @UniqueDatabase.parametrize(sync_ddl=True) @@ -127,10 +126,10 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite): sync_ddl=[1])) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("compression_codec", 'bzip2')) - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').compression_codec == 'none') @SkipIfLocal.multiple_impalad def test_insert_parquet_invalid_codec(self, vector): @@ -153,10 +152,10 @@ class TestInsertParquetVerifySize(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0], sync_ddl=[1])) - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').compression_codec == 'none') cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) @@ -204,7 +203,7 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') - def test_def_level_encoding(self, vector, unique_database): + def test_def_level_encoding(self, vector, unique_database, tmpdir): """IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a parquet table and running the parquet-reader tool on it, which performs sanity checking, such as that the correct number of definition levels were encoded. @@ -214,21 +213,16 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): self.execute_query("create table %s stored as parquet as select l_linenumber from " "tpch_parquet.lineitem limit 180000" % qualified_table_name) - tmp_dir = make_tmp_dir() - try: - hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq' - % (unique_database, table_name)) - check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir]) - - for root, subdirs, files in os.walk(tmp_dir): - for f in files: - if not f.endswith('parq'): - continue - check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file', - os.path.join(tmp_dir, str(f))]) - finally: - self.execute_query("drop table %s" % qualified_table_name) - rmtree(tmp_dir) + hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq' + % (unique_database, table_name)) + check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmpdir.strpath]) + + for root, subdirs, files in os.walk(tmpdir.strpath): + for f in files: + if not f.endswith('parq'): + continue + check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file', + os.path.join(tmpdir.strpath, str(f))]) def test_sorting_columns(self, vector, unique_database, tmpdir): """Tests that RowGroup::sorting_columns gets populated when the table has SORT BY @@ -348,36 +342,35 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): return file_stats - def _get_row_group_stats_from_hdfs_folder(self, hdfs_path): - """Returns a list of statistics for each row group in all parquet files in - 'hdfs_path'. The result is a two-dimensional list, containing stats by row group and - column.""" + def _get_row_group_stats_from_hdfs_folder(self, hdfs_path, tmp_dir): + """Returns a list of statistics for each row group in all parquet files i 'hdfs_path'. + 'tmp_dir' needs to be supplied by the caller and will be used to store temporary + files. The caller is responsible for cleaning up 'tmp_dir'. The result is a + two-dimensional list, containing stats by row group and column.""" row_group_stats = [] - try: - tmp_dir = make_tmp_dir() - check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir]) + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir]) - for root, subdirs, files in os.walk(tmp_dir): - for f in files: - parquet_file = os.path.join(root, str(f)) - row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file)) - - finally: - rmtree(tmp_dir) + for root, subdirs, files in os.walk(tmp_dir): + for f in files: + parquet_file = os.path.join(root, str(f)) + row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file)) return row_group_stats - def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs = None): + def _validate_parquet_stats(self, hdfs_path, tmp_dir, expected_values, + skip_col_idxs = None): """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup statistics in that file match the values in 'expected_values'. Columns indexed by - 'skip_col_idx' are excluded from the verification of the expected values. + 'skip_col_idx' are excluded from the verification of the expected values. 'tmp_dir' + needs to be supplied by the caller and will be used to store temporary files. The + caller is responsible for cleaning up 'tmp_dir'. """ skip_col_idxs = skip_col_idxs or [] # The caller has to make sure that the table fits into a single row group. We enforce # it here to make sure the results are predictable and independent of how the data # could get written across multiple files. - row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path) + row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path, tmp_dir) assert(len(row_group_stats)) == 1 table_stats = row_group_stats[0] @@ -392,10 +385,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): continue assert stats == expected - def _ctas_table_and_verify_stats(self, vector, unique_database, source_table, + def _ctas_table_and_verify_stats(self, vector, unique_database, tmp_dir, source_table, expected_values): """Copies 'source_table' into a parquet table and makes sure that the row group - statistics in the resulting parquet file match those in 'expected_values'. + statistics in the resulting parquet file match those in 'expected_values'. 'tmp_dir' + needs to be supplied by the caller and will be used to store temporary files. The + caller is responsible for cleaning up 'tmp_dir'. """ table_name = "test_hdfs_parquet_table_writer" qualified_table_name = "{0}.{1}".format(unique_database, table_name) @@ -409,9 +404,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): qualified_table_name, source_table) vector.get_value('exec_option')['num_nodes'] = 1 self.execute_query(query, vector.get_value('exec_option')) - self._validate_parquet_stats(hdfs_path, expected_values) + self._validate_parquet_stats(hdfs_path, tmp_dir, expected_values) - def test_write_statistics_alltypes(self, vector, unique_database): + def test_write_statistics_alltypes(self, vector, unique_database, tmpdir): """Test that writing a parquet file populates the rowgroup statistics with the correct values. """ @@ -433,10 +428,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('month', 1, 12, 0), ] - self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes", - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + "functional.alltypes", expected_min_max_values) - def test_write_statistics_decimal(self, vector, unique_database): + def test_write_statistics_decimal(self, vector, unique_database, tmpdir): """Test that writing a parquet file populates the rowgroup statistics with the correct values for decimal columns. """ @@ -450,10 +445,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('d6', 1, 1, 0) ] - self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl", - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + "functional.decimal_tbl", expected_min_max_values) - def test_write_statistics_multi_page(self, vector, unique_database): + def test_write_statistics_multi_page(self, vector, unique_database, tmpdir): """Test that writing a parquet file populates the rowgroup statistics with the correct values. This test write a single parquet file with several pages per column. """ @@ -471,10 +466,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): 'zzle. blithely regular instructions cajol', 0), ] - self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer", - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + "tpch_parquet.customer", expected_min_max_values) - def test_write_statistics_null(self, vector, unique_database): + def test_write_statistics_null(self, vector, unique_database, tmpdir): """Test that we don't write min/max statistics for null columns. Ensure null_count is set for columns with null values.""" expected_min_max_values = [ @@ -487,10 +482,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('g', '\x00', '\x00', 0) ] - self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable", - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + "functional.nulltable", expected_min_max_values) - def test_write_statistics_char_types(self, vector, unique_database): + def test_write_statistics_char_types(self, vector, unique_database, tmpdir): """Test that Impala correctly writes statistics for char columns.""" table_name = "test_char_types" qualified_table_name = "{0}.{1}".format(unique_database, table_name) @@ -502,7 +497,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): insert_stmt = """insert into {0} values (cast("def" as char(3)), "ghj xyz", "abc xyz"), (cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"), - (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name) + (cast("xy" as char(3)), "abc banana", "dolor dis amet") + """.format(qualified_table_name) self.execute_query(insert_stmt) expected_min_max_values = [ ColumnStats('c3', 'abc', 'xy', 0), @@ -510,10 +506,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('st', 'abc xyz', 'lorem ipsum', 0) ] - self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + qualified_table_name, expected_min_max_values) - def test_write_statistics_negative(self, vector, unique_database): + def test_write_statistics_negative(self, vector, unique_database, tmpdir): """Test that Impala correctly writes statistics for negative values.""" view_name = "test_negative_view" qualified_view_name = "{0}.{1}".format(unique_database, view_name) @@ -537,10 +533,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 0), ] - self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name, - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + qualified_view_name, expected_min_max_values) - def test_write_statistics_multiple_row_groups(self, vector, unique_database): + def test_write_statistics_multiple_row_groups(self, vector, unique_database, tmpdir): """Test that writing multiple row groups works as expected. This is done by inserting into a table using the SORT BY clause and then making sure that the min and max values of row groups don't overlap.""" @@ -563,7 +559,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): self.execute_query(query, vector.get_value('exec_option')) # Get all stats for the o_orderkey column - row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path) + row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path, + tmpdir.strpath) assert len(row_group_stats) > 1 orderkey_stats = [s[0] for s in row_group_stats] @@ -573,7 +570,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): for l, r in zip(orderkey_stats, orderkey_stats[1:]): assert l.max <= r.min - def test_write_statistics_float_infinity(self, vector, unique_database): + def test_write_statistics_float_infinity(self, vector, unique_database, tmpdir): """Test that statistics for -inf and inf are written correctly.""" table_name = "test_float_infinity" qualified_table_name = "{0}.{1}".format(unique_database, table_name) @@ -592,10 +589,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('d', float('-inf'), float('inf'), 0), ] - self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, - expected_min_max_values) + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, + qualified_table_name, expected_min_max_values) - def test_write_null_count_statistics(self, vector, unique_database): + def test_write_null_count_statistics(self, vector, unique_database, tmpdir): """Test that writing a parquet file populates the rowgroup statistics with the correct null_count. This test ensures that the null_count is correct for a table with multiple null values.""" @@ -609,5 +606,5 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): ColumnStats('income', 0, 189570, 29), ] - self._ctas_table_and_verify_stats(vector, unique_database, + self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath, "functional_parquet.zipcode_incomes", expected_min_max_values)