Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A4E8717533 for ; Tue, 31 Mar 2015 14:00:12 +0000 (UTC) Received: (qmail 17176 invoked by uid 500); 31 Mar 2015 14:00:12 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 17146 invoked by uid 500); 31 Mar 2015 14:00:12 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 17137 invoked by uid 99); 31 Mar 2015 14:00:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Mar 2015 14:00:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 63F79E0EF3; Tue, 31 Mar 2015 14:00:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amansinha@apache.org To: commits@drill.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-2408: Invalid (0 length) parquet file created by CTAS Date: Tue, 31 Mar 2015 14:00:12 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 0fbcddba1 -> 417ad38f7 DRILL-2408: Invalid (0 length) parquet file created by CTAS Also updated TestExampleQueries and TestUnionAll so they use different names for the created views because I kept hitting random errors. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/417ad38f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/417ad38f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/417ad38f Branch: refs/heads/master Commit: 417ad38f7239c6dc1ff0421cbd5a2358d1437c87 Parents: 0fbcddb Author: adeneche Authored: Thu Mar 19 12:34:28 2015 -0700 Committer: Aman Sinha Committed: Tue Mar 31 00:19:31 2015 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetRecordWriter.java | 25 ++++++++++- .../org/apache/drill/TestExampleQueries.java | 27 ++++++------ .../java/org/apache/drill/TestUnionAll.java | 22 +++++----- .../physical/impl/writer/TestParquetWriter.java | 45 ++++++++++++++++++++ 4 files changed, 92 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 4a8ff5e..58f09f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; @@ -148,7 +149,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } schema = new MessageType("root", types); - Path fileName = new Path(location, prefix + "_" + index + ".parquet"); + Path fileName = getPath(); parquetFileWriter = new ParquetFileWriter(conf, schema, fileName); parquetFileWriter.start(); @@ -165,6 +166,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { setUp(schema, consumer); } + /** + * @return Path for the latest file created + */ + private Path getPath() { + return new Path(location, prefix + "_" + index + ".parquet"); + } + private PrimitiveType getPrimitiveType(MaterializedField field) { MinorType minorType = field.getType().getMinorType(); String name = field.getLastName(); @@ -307,7 +315,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void cleanup() throws IOException { - if (recordCount > 0) { + boolean hasRecords = recordCount > 0; + if (hasRecords) { parquetFileWriter.startBlock(recordCount); store.flush(); ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter); @@ -324,5 +333,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { if(oContext!=null){ oContext.close(); } + + if (!hasRecords) { + // the very last file is empty, delete it (DRILL-2408) + Path path = getPath(); + logger.debug("no record written, deleting parquet file {}", path); + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + if (!fs.delete(path, false)) { + throw new DrillRuntimeException("Couldn't delete empty file " + path); + } + } + } } } http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 1d59a11..862de5e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -23,14 +23,13 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.planner.physical.PlannerSettings; import org.junit.Ignore; import org.junit.Test; import java.math.BigDecimal; public class TestExampleQueries extends BaseTestQuery{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class); @Test // see DRILL-2328 public void testConcatOnNull() throws Exception { @@ -125,9 +124,9 @@ public class TestExampleQueries extends BaseTestQuery{ @Test // see DRILL-985 public void testViewFileName() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select * from cp.`tpch/nation.parquet`;"); - test("select * from dfs.tmp.`nation_view.view.drill`"); - test("drop view nation_view"); + test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;"); + test("select * from dfs.tmp.`nation_view_testexamplequeries.view.drill`"); + test("drop view nation_view_testexamplequeries"); } @Test @@ -378,29 +377,29 @@ public class TestExampleQueries extends BaseTestQuery{ @Test // DRILL-811 public void testDRILL_811View() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select * from cp.`tpch/nation.parquet`;"); + test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;"); - test("select n.n_nationkey, n.n_name, n.n_regionkey from nation_view n where n.n_nationkey > 8 order by n.n_regionkey"); + test("select n.n_nationkey, n.n_name, n.n_regionkey from nation_view_testexamplequeries n where n.n_nationkey > 8 order by n.n_regionkey"); - test("select n.n_regionkey, count(*) as cnt from nation_view n where n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); + test("select n.n_regionkey, count(*) as cnt from nation_view_testexamplequeries n where n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); - test("drop view nation_view "); + test("drop view nation_view_testexamplequeries "); } @Test // DRILL-811 public void testDRILL_811ViewJoin() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select * from cp.`tpch/nation.parquet`;"); + test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;"); test("create view region_view as select * from cp.`tpch/region.parquet`;"); - test("select n.n_nationkey, n.n_regionkey, r.r_name from region_view r , nation_view n where r.r_regionkey = n.n_regionkey "); + test("select n.n_nationkey, n.n_regionkey, r.r_name from region_view r , nation_view_testexamplequeries n where r.r_regionkey = n.n_regionkey "); - test("select n.n_regionkey, count(*) as cnt from region_view r , nation_view n where r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); + test("select n.n_regionkey, count(*) as cnt from region_view r , nation_view_testexamplequeries n where r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); - test("select n.n_regionkey, count(*) as cnt from region_view r join nation_view n on r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); + test("select n.n_regionkey, count(*) as cnt from region_view r join nation_view_testexamplequeries n on r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey"); test("drop view region_view "); - test("drop view nation_view "); + test("drop view nation_view_testexamplequeries "); } @Test // DRILL-811 http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java index 520c204..1ebb534 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java @@ -24,7 +24,7 @@ import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException; import org.junit.Test; public class TestUnionAll extends BaseTestQuery{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); @Test // Simple Union-All over two scans public void testUnionAll1() throws Exception { @@ -184,16 +184,16 @@ public class TestUnionAll extends BaseTestQuery{ @Test public void testUnionAllViewExpandableStar() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select n_name, n_nationkey from cp.`tpch/nation.parquet`;"); + test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;"); test("create view region_view as select r_name, r_regionkey from cp.`tpch/region.parquet`;"); - String query1 = "(select * from dfs.tmp.`nation_view`) " + + String query1 = "(select * from dfs.tmp.`nation_view_testunionall`) " + "union all " + "(select * from dfs.tmp.`region_view`) "; String query2 = "(select r_name, r_regionkey from cp.`tpch/region.parquet`) " + "union all " + - "(select * from dfs.tmp.`nation_view`)"; + "(select * from dfs.tmp.`nation_view_testunionall`)"; try { testBuilder() @@ -212,7 +212,7 @@ public class TestUnionAll extends BaseTestQuery{ .baselineColumns("r_name", "r_regionkey") .build().run(); } finally { - test("drop view nation_view"); + test("drop view nation_view_testunionall"); test("drop view region_view"); } } @@ -220,28 +220,28 @@ public class TestUnionAll extends BaseTestQuery{ @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2002 public void testUnionAllViewUnExpandableStar() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select * from cp.`tpch/nation.parquet`;"); + test("create view nation_view_testunionall as select * from cp.`tpch/nation.parquet`;"); try { - String query = "(select * from dfs.tmp.`nation_view`) " + + String query = "(select * from dfs.tmp.`nation_view_testunionall`) " + "union all (select * from cp.`tpch/region.parquet`)"; test(query); } catch(Exception ex) { SqlUnsupportedException.errorMessageToException(ex.getMessage()); throw ex; } finally { - test("drop view nation_view"); + test("drop view nation_view_testunionall"); } } @Test public void testDiffDataTypesAndModes() throws Exception { test("use dfs.tmp"); - test("create view nation_view as select n_name, n_nationkey from cp.`tpch/nation.parquet`;"); + test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;"); test("create view region_view as select r_name, r_regionkey from cp.`tpch/region.parquet`;"); String t1 = "(select n_comment, n_regionkey from cp.`tpch/nation.parquet` limit 5)"; - String t2 = "(select * from nation_view limit 5)"; + String t2 = "(select * from nation_view_testunionall limit 5)"; String t3 = "(select full_name, store_id from cp.`employee.json` limit 5)"; String t4 = "(select * from region_view limit 5)"; @@ -256,7 +256,7 @@ public class TestUnionAll extends BaseTestQuery{ .baselineColumns("n_comment", "n_regionkey") .build().run(); } finally { - test("drop view nation_view"); + test("drop view nation_view_testunionall"); test("drop view region_view"); } } http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 288a295..fbfb996 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -382,6 +383,50 @@ public class TestParquetWriter extends BaseTestQuery { .go(); } + @Test // see DRILL-2408 + public void testWriteEmptyFile() throws Exception { + String outputFile = "testparquetwriter_test_write_empty_file"; + + Path path = new Path("/tmp/" + outputFile); + if (fs.exists(path)) { + fs.delete(path, true); + } + +// test("ALTER SESSION SET `planner.add_producer_consumer` = false"); + test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile); + + Assert.assertEquals(fs.listStatus(path).length, 0); + } + + @Test // see DRILL-2408 + public void testWriteEmptyFileAfterFlush() throws Exception { + String outputFile = "testparquetwriter_test_write_empty_file_after_flush"; + + Path path = new Path("/tmp/" + outputFile); + if (fs.exists(path)) { + fs.delete(path, true); + } + + try { + // this specific value will force a flush just after the final row is written + // this will cause the creation of a new "empty" parquet file + test("ALTER SESSION SET `store.parquet.block-size` = 19926"); + + String query = "SELECT * FROM cp.`employee.json` LIMIT 100"; + test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query); + + // this query will fail if the "empty" file wasn't deleted + testBuilder() + .unOrdered() + .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile) + .sqlBaselineQuery(query) + .go(); + } finally { + // restore the session option + test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val); + } + } + public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception { Path path = new Path("/tmp/" + outputFile);