drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [1/2] drill git commit: DRILL-2408 (part 2): CTAS should not create empty folders when underlying query returns no results
Date Thu, 07 May 2015 21:19:58 GMT
Repository: drill
Updated Branches:
  refs/heads/master d12bee05a -> 8c706e6fa


DRILL-2408 (part 2): CTAS should not create empty folders when underlying query returns no
results

- changed ParquetRecordWriter to avoid creating the parquet file until the first row of data
is available
- Moved unit tests in a separate test class that starts 3 drillbits, to test the case where
multiple fragments are attempting to write empty parquet files
- changed BaseQueryTest to update the storage plugin in all started bits and not just the
first one


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/868ce4de
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/868ce4de
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/868ce4de

Branch: refs/heads/master
Commit: 868ce4de25a9ed2153de96a6504272be9d1f4a2a
Parents: d12bee0
Author: adeneche <adeneche@gmail.com>
Authored: Mon Apr 20 13:03:07 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Thu May 7 13:51:17 2015 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetRecordWriter.java |  77 ++++--------
 .../apache/drill/exec/util/TestUtilities.java   |  21 ++--
 .../java/org/apache/drill/BaseTestQuery.java    |  10 +-
 .../physical/impl/writer/TestParquetWriter.java |  43 -------
 .../writer/TestParquetWriterEmptyFiles.java     | 118 +++++++++++++++++++
 .../apache/drill/jdbc/DrillConnectionImpl.java  |   3 +-
 6 files changed, 166 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 8615eb7..621f05c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
@@ -64,7 +63,7 @@ import parquet.schema.Type.Repetition;
 import com.google.common.collect.Lists;
 
 public class ParquetRecordWriter extends ParquetOutputRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
 
   private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -72,12 +71,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
-  private Map<String, String> extraMetaData = new HashMap();
+  private Map<String, String> extraMetaData = new HashMap<>();
   private int blockSize;
-  private int pageSize = 1 * 1024 * 1024;
+  private int pageSize = 1024 * 1024;
   private int dictionaryPageSize = pageSize;
   private boolean enableDictionary = false;
-  private boolean validating = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
   private DirectCodecFactory codecFactory;
@@ -96,7 +94,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private String prefix;
   private int index = 0;
   private OperatorContext oContext;
-  private ParquetDirectByteBufferAllocator allocator;
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
@@ -152,10 +149,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
     schema = new MessageType("root", types);
 
-    Path fileName = getPath();
-    parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
-    parquetFileWriter.start();
-
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size()
/ 5);
     pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
         codecFactory.getCompressor(codec, pageSize),
@@ -163,18 +156,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
         initialBlockBufferSize);
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize,
enableDictionary, writerVersion);
-    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
+    MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
   }
 
-  /**
-   * @return Path for the latest file created
-   */
-  private Path getPath() {
-    return new Path(location, prefix + "_" + index + ".parquet");
-  }
-
   private PrimitiveType getPrimitiveType(MaterializedField field) {
     MinorType minorType = field.getType().getMinorType();
     String name = field.getLastName();
@@ -204,12 +190,18 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   }
 
   private void flush() throws IOException {
-    parquetFileWriter.startBlock(recordCount);
-    store.flush();
-    ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-    recordCount = 0;
-    parquetFileWriter.endBlock();
-    parquetFileWriter.end(extraMetaData);
+    if (recordCount > 0) {
+      parquetFileWriter.startBlock(recordCount);
+      store.flush();
+      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
+      recordCount = 0;
+      parquetFileWriter.endBlock();
+
+      // we are writing one single block per file
+      parquetFileWriter.end(extraMetaData);
+      parquetFileWriter = null;
+    }
+
     store.close();
     ColumnChunkPageWriteStoreExposer.close(pageStore);
     store = null;
@@ -307,7 +299,16 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   @Override
   public void endRecord() throws IOException {
     consumer.endMessage();
+
+    // we wait until there is at least one record before creating the parquet file
+    if (parquetFileWriter == null) {
+      Path path = new Path(location, prefix + "_" + index + ".parquet");
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+      parquetFileWriter.start();
+    }
+
     recordCount++;
+
     checkBlockSizeReached();
   }
 
@@ -317,34 +318,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void cleanup() throws IOException {
-    boolean hasRecords = recordCount > 0;
-    if (hasRecords) {
-      parquetFileWriter.startBlock(recordCount);
-      store.flush();
-      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-      parquetFileWriter.end(extraMetaData);
-    }
-    if (store != null) {
-      store.close();
-    }
-    if (pageStore != null) {
-      ColumnChunkPageWriteStoreExposer.close(pageStore);
-    }
+    flush();
 
     codecFactory.close();
-
-    if (!hasRecords) {
-      // the very last file is empty, delete it (DRILL-2408)
-      Path path = getPath();
-      logger.debug("no record written, deleting parquet file {}", path);
-      FileSystem fs = path.getFileSystem(conf);
-      if (fs.exists(path)) {
-        if (!fs.delete(path, false)) {
-          throw new DrillRuntimeException("Couldn't delete empty file " + path);
-        }
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index a1fcc2a..cb687af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -44,29 +44,36 @@ public class TestUtilities {
   private static final String dfsTestTmpSchema = "tmp";
 
   /**
+   * Create and removes a temporary folder
+   *
+   * @return absolute path to temporary folder
+   */
+  public static String createTempDir() {
+    final File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    return tmpDir.getAbsolutePath();
+  }
+
+  /**
    * Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update
the location with an
    * exclusive temp directory just for use in the current test jvm.
    *
    * @param pluginRegistry
    * @return JVM exclusive temporary directory location.
    */
-  public static String updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry)
+  public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry,
+                                                      final String tmpDirPath)
       throws ExecutionSetupException {
     final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName);
     final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
     final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema);
 
-    final File tmpDir = Files.createTempDir();
-    tmpDir.deleteOnExit();
-    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDir.getAbsolutePath(),
-        true, tmpWSConfig.getDefaultInputFormat());
+    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDirPath, true, tmpWSConfig.getDefaultInputFormat());
 
     pluginConfig.workspaces.remove(dfsTestTmpSchema);
     pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig);
 
     pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true);
-
-    return tmpDir.getAbsolutePath();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 200bbc6..f8ec090 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -166,15 +166,17 @@ public class BaseTestQuery extends ExecTest {
       serviceSet = RemoteServiceSet.getLocalServiceSet();
     }
 
+    dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+
     bits = new Drillbit[drillbitCount];
     for(int i = 0; i < drillbitCount; i++) {
       bits[i] = new Drillbit(config, serviceSet);
       bits[i].run();
-    }
 
-    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
-    dfsTestTmpSchemaLocation = TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
-    TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+      final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
+      TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+    }
 
     client = QueryTestUtil.createClient(config,  serviceSet, MAX_WIDTH_PER_NODE, null);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 5670e1e..4a41669 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.writer;
 
-import java.io.File;
 import java.math.BigDecimal;
 import java.sql.Date;
 
@@ -28,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -445,22 +443,6 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
-
-  @Test // see DRILL-2408
-  public void testWriteEmptyFile() throws Exception {
-    String outputFile = "testparquetwriter_test_write_empty_file";
-
-    try {
-      Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
-      //    test("ALTER SESSION SET `planner.add_producer_consumer` = false");
-      test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0",
outputFile);
-
-      Assert.assertEquals(fs.listStatus(path).length, 0);
-    } finally {
-      deleteTableIfExists(outputFile);
-    }
-  }
-
   @Test // DRILL-2341
   public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
     final String newTblName = "testTableOutputSchema";
@@ -531,31 +513,6 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
-  @Test // see DRILL-2408
-  public void testWriteEmptyFileAfterFlush() throws Exception {
-    String outputFile = "testparquetwriter_test_write_empty_file_after_flush";
-
-    try {
-      // this specific value will force a flush just after the final row is written
-      // this will cause the creation of a new "empty" parquet file
-      test("ALTER SESSION SET `store.parquet.block-size` = 19926");
-
-      String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
-      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
-
-      // this query will fail if the "empty" file wasn't deleted
-      testBuilder()
-        .unOrdered()
-        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
-        .sqlBaselineQuery(query)
-        .go();
-    } finally {
-      // restore the session option
-      test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
-      deleteTableIfExists(outputFile);
-    }
-  }
-
   private static void deleteTableIfExists(String tableName) {
     try {
       Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
new file mode 100644
index 0000000..2848b68
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestParquetWriterEmptyFiles extends BaseTestQuery {
+
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void initFs() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+
+    fs = FileSystem.get(conf);
+
+    updateTestCluster(3, null);
+  }
+
+  @Test // see DRILL-2408
+  public void testWriteEmptyFile() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_testwriteemptyfile";
+
+    try {
+      test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0",
outputFile);
+
+      final Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
+      Assert.assertFalse(fs.exists(path));
+    } finally {
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  @Test
+  public void testMultipleWriters() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_testmultiplewriters";
+
+    runSQL("alter session set `planner.slice_target` = 1");
+
+    try {
+      final String query = "SELECT position_id FROM cp.`employee.json` WHERE position_id
IN (15, 16) GROUP BY position_id";
+      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+      // this query will fail if an "empty" file was created
+      testBuilder()
+        .unOrdered()
+        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      runSQL("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  @Test // see DRILL-2408
+  public void testWriteEmptyFileAfterFlush() throws Exception {
+    final String outputFile = "testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    deleteTableIfExists(outputFile);
+
+    try {
+      // this specific value will force a flush just after the final row is written
+      // this may cause the creation of a new "empty" parquet file
+      test("ALTER SESSION SET `store.parquet.block-size` = 19926");
+
+      final String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
+      test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);
+
+      // this query will fail if an "empty" file was created
+      testBuilder()
+        .unOrdered()
+        .sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
+        .sqlBaselineQuery(query)
+        .go();
+    } finally {
+      // restore the session option
+      test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
+      deleteTableIfExists(outputFile);
+    }
+  }
+
+  private static boolean deleteTableIfExists(String tableName) {
+    try {
+      Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
+      if (fs.exists(path)) {
+        return fs.delete(path, true);
+      }
+    } catch (Exception e) {
+      // ignore exceptions.
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/868ce4de/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index c73eb50..74c6655 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -332,7 +332,8 @@ public abstract class DrillConnectionImpl extends AvaticaConnection
   private static void makeTmpSchemaLocationsUnique(StoragePluginRegistry pluginRegistry,
Properties props) {
     try {
       if (props != null && "true".equalsIgnoreCase(props.getProperty("drillJDBCUnitTests")))
{
-        TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry);
+        final String tmpDirPath = TestUtilities.createTempDir();
+        TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tmpDirPath);
         TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
       }
     } catch(Throwable e) {


Mime
View raw message