drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject drill git commit: DRILL-2408: Invalid (0 length) parquet file created by CTAS
Date Tue, 31 Mar 2015 14:00:12 GMT
Repository: drill
Updated Branches:
  refs/heads/master 0fbcddba1 -> 417ad38f7


DRILL-2408: Invalid (0 length) parquet file created by CTAS

Also updated TestExampleQueries and TestUnionAll so they use different names for the created
views because I kept hitting random errors.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/417ad38f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/417ad38f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/417ad38f

Branch: refs/heads/master
Commit: 417ad38f7239c6dc1ff0421cbd5a2358d1437c87
Parents: 0fbcddb
Author: adeneche <adeneche@gmail.com>
Authored: Thu Mar 19 12:34:28 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Tue Mar 31 00:19:31 2015 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetRecordWriter.java | 25 ++++++++++-
 .../org/apache/drill/TestExampleQueries.java    | 27 ++++++------
 .../java/org/apache/drill/TestUnionAll.java     | 22 +++++-----
 .../physical/impl/writer/TestParquetWriter.java | 45 ++++++++++++++++++++
 4 files changed, 92 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4a8ff5e..58f09f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
@@ -148,7 +149,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
     schema = new MessageType("root", types);
 
-    Path fileName = new Path(location, prefix + "_" + index + ".parquet");
+    Path fileName = getPath();
     parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
     parquetFileWriter.start();
 
@@ -165,6 +166,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     setUp(schema, consumer);
   }
 
+  /**
+   * @return Path for the latest file created
+   */
+  private Path getPath() {
+    return new Path(location, prefix + "_" + index + ".parquet");
+  }
+
   private PrimitiveType getPrimitiveType(MaterializedField field) {
     MinorType minorType = field.getType().getMinorType();
     String name = field.getLastName();
@@ -307,7 +315,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void cleanup() throws IOException {
-    if (recordCount > 0) {
+    boolean hasRecords = recordCount > 0;
+    if (hasRecords) {
       parquetFileWriter.startBlock(recordCount);
       store.flush();
       ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
@@ -324,5 +333,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     if(oContext!=null){
       oContext.close();
     }
+
+    if (!hasRecords) {
+      // the very last file is empty, delete it (DRILL-2408)
+      Path path = getPath();
+      logger.debug("no record written, deleting parquet file {}", path);
+      FileSystem fs = path.getFileSystem(conf);
+      if (fs.exists(path)) {
+        if (!fs.delete(path, false)) {
+          throw new DrillRuntimeException("Couldn't delete empty file " + path);
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 1d59a11..862de5e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -23,14 +23,13 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import java.math.BigDecimal;
 
 public class TestExampleQueries extends BaseTestQuery{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class);
 
   @Test // see DRILL-2328
   public void testConcatOnNull() throws Exception {
@@ -125,9 +124,9 @@ public class TestExampleQueries extends BaseTestQuery{
   @Test // see DRILL-985
   public void testViewFileName() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select * from cp.`tpch/nation.parquet`;");
-    test("select * from dfs.tmp.`nation_view.view.drill`");
-    test("drop view nation_view");
+    test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;");
+    test("select * from dfs.tmp.`nation_view_testexamplequeries.view.drill`");
+    test("drop view nation_view_testexamplequeries");
   }
 
   @Test
@@ -378,29 +377,29 @@ public class TestExampleQueries extends BaseTestQuery{
   @Test // DRILL-811
   public void testDRILL_811View() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select * from cp.`tpch/nation.parquet`;");
+    test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;");
 
-    test("select n.n_nationkey, n.n_name, n.n_regionkey from nation_view n where n.n_nationkey
> 8 order by n.n_regionkey");
+    test("select n.n_nationkey, n.n_name, n.n_regionkey from nation_view_testexamplequeries
n where n.n_nationkey > 8 order by n.n_regionkey");
 
-    test("select n.n_regionkey, count(*) as cnt from nation_view n where n.n_nationkey >
8 group by n.n_regionkey order by n.n_regionkey");
+    test("select n.n_regionkey, count(*) as cnt from nation_view_testexamplequeries n where
n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey");
 
-    test("drop view nation_view ");
+    test("drop view nation_view_testexamplequeries ");
   }
 
   @Test  // DRILL-811
   public void testDRILL_811ViewJoin() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select * from cp.`tpch/nation.parquet`;");
+    test("create view nation_view_testexamplequeries as select * from cp.`tpch/nation.parquet`;");
     test("create view region_view as select * from cp.`tpch/region.parquet`;");
 
-    test("select n.n_nationkey, n.n_regionkey, r.r_name from region_view r , nation_view
n where r.r_regionkey = n.n_regionkey ");
+    test("select n.n_nationkey, n.n_regionkey, r.r_name from region_view r , nation_view_testexamplequeries
n where r.r_regionkey = n.n_regionkey ");
 
-    test("select n.n_regionkey, count(*) as cnt from region_view r , nation_view n where
r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey");
+    test("select n.n_regionkey, count(*) as cnt from region_view r , nation_view_testexamplequeries
n where r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order
by n.n_regionkey");
 
-    test("select n.n_regionkey, count(*) as cnt from region_view r join nation_view n on
r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by n.n_regionkey");
+    test("select n.n_regionkey, count(*) as cnt from region_view r join nation_view_testexamplequeries
n on r.r_regionkey = n.n_regionkey and n.n_nationkey > 8 group by n.n_regionkey order by
n.n_regionkey");
 
     test("drop view region_view ");
-    test("drop view nation_view ");
+    test("drop view nation_view_testexamplequeries ");
   }
 
   @Test  // DRILL-811

http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 520c204..1ebb534 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -24,7 +24,7 @@ import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.junit.Test;
 
 public class TestUnionAll extends BaseTestQuery{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
 
   @Test  // Simple Union-All over two scans
   public void testUnionAll1() throws Exception {
@@ -184,16 +184,16 @@ public class TestUnionAll extends BaseTestQuery{
   @Test
   public void testUnionAllViewExpandableStar() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
+    test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
     test("create view region_view as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
 
-    String query1 = "(select * from dfs.tmp.`nation_view`) " +
+    String query1 = "(select * from dfs.tmp.`nation_view_testunionall`) " +
                     "union all " +
                     "(select * from dfs.tmp.`region_view`) ";
 
     String query2 =  "(select r_name, r_regionkey from cp.`tpch/region.parquet`) " +
                      "union all " +
-                     "(select * from dfs.tmp.`nation_view`)";
+                     "(select * from dfs.tmp.`nation_view_testunionall`)";
 
     try {
       testBuilder()
@@ -212,7 +212,7 @@ public class TestUnionAll extends BaseTestQuery{
           .baselineColumns("r_name", "r_regionkey")
           .build().run();
     } finally {
-      test("drop view nation_view");
+      test("drop view nation_view_testunionall");
       test("drop view region_view");
     }
   }
@@ -220,28 +220,28 @@ public class TestUnionAll extends BaseTestQuery{
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2002
   public void testUnionAllViewUnExpandableStar() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select * from cp.`tpch/nation.parquet`;");
+    test("create view nation_view_testunionall as select * from cp.`tpch/nation.parquet`;");
 
     try {
-      String query = "(select * from dfs.tmp.`nation_view`) " +
+      String query = "(select * from dfs.tmp.`nation_view_testunionall`) " +
                      "union all (select * from cp.`tpch/region.parquet`)";
       test(query);
     } catch(Exception ex) {
       SqlUnsupportedException.errorMessageToException(ex.getMessage());
       throw ex;
     } finally {
-      test("drop view nation_view");
+      test("drop view nation_view_testunionall");
     }
   }
 
   @Test
   public void testDiffDataTypesAndModes() throws Exception {
     test("use dfs.tmp");
-    test("create view nation_view as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
+    test("create view nation_view_testunionall as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
     test("create view region_view as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
 
     String t1 = "(select n_comment, n_regionkey from cp.`tpch/nation.parquet` limit 5)";
-    String t2 = "(select * from nation_view  limit 5)";
+    String t2 = "(select * from nation_view_testunionall  limit 5)";
     String t3 = "(select full_name, store_id from cp.`employee.json` limit 5)";
     String t4 = "(select * from region_view  limit 5)";
 
@@ -256,7 +256,7 @@ public class TestUnionAll extends BaseTestQuery{
           .baselineColumns("n_comment", "n_regionkey")
           .build().run();
     } finally {
-      test("drop view nation_view");
+      test("drop view nation_view_testunionall");
       test("drop view region_view");
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/417ad38f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 288a295..fbfb996 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -382,6 +383,50 @@ public class TestParquetWriter extends BaseTestQuery {
         .go();
   }
 
+  @Test // see DRILL-2408
+  public void testWriteEmptyFile() throws Exception {
+    String outputFile = "testparquetwriter_test_write_empty_file";
+
+    Path path = new Path("/tmp/" + outputFile);
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+//    test("ALTER SESSION SET `planner.add_producer_consumer` = false");
+    test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);
+
+    Assert.assertEquals(fs.listStatus(path).length, 0);
+  }
+
+  @Test // see DRILL-2408
+  public void testWriteEmptyFileAfterFlush() throws Exception {
+    String outputFile = "testparquetwriter_test_write_empty_file_after_flush";
+
+    Path path = new Path("/tmp/" + outputFile);
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+    try {
+      // this specific value will force a flush just after the final row is written
+      // this will cause the creation of a new "empty" parquet file
+      test("ALTER SESSION SET `store.parquet.block-size` = 19926");
+
+      String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
+      test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query);
+
+      // this query will fail if the "empty" file wasn't deleted
+      testBuilder()
+        .unOrdered()
+        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile)
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      // restore the session option
+      test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
+    }
+  }
+
   public void runTestAndValidate(String selection, String validationSelection, String inputTable,
String outputFile) throws Exception {
 
     Path path = new Path("/tmp/" + outputFile);


Mime
View raw message