tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-714: Enable setting Parquet tuning parameters. (David Chen via hyunsik)
Date Wed, 02 Apr 2014 02:55:01 GMT
Repository: tajo
Updated Branches:
  refs/heads/master a3a178a93 -> dc40d849e


TAJO-714: Enable setting Parquet tuning parameters. (David Chen via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/dc40d849
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/dc40d849
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/dc40d849

Branch: refs/heads/master
Commit: dc40d849e4f52b93753627ae699e4c161e4ebdf1
Parents: a3a178a
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Apr 2 11:54:05 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Apr 2 11:54:05 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 pom.xml                                         |  2 ++
 tajo-catalog/tajo-catalog-common/pom.xml        | 16 +++++++++++
 .../apache/tajo/catalog/CatalogConstants.java   | 29 ++++++++++++++++++++
 .../org/apache/tajo/catalog/CatalogUtil.java    | 13 +++++++++
 tajo-storage/pom.xml                            |  3 +-
 .../tajo/storage/parquet/ParquetAppender.java   | 27 +++++++++++++++++-
 .../tajo/storage/parquet/ParquetScanner.java    |  1 +
 .../apache/tajo/storage/TestMergeScanner.java   |  1 +
 .../org/apache/tajo/storage/TestStorages.java   |  3 ++
 .../apache/tajo/storage/v2/TestStorages.java    |  3 ++
 11 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 83990ff..ffeceb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-714: Enable setting Parquet tuning parameters. (David Chen via hyunsik)
+
     TAJO-691: HashJoin or HashAggregation is too slow if there is many unique keys.
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0421119..d09559e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,8 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <tajo.version>0.8.0-SNAPSHOT</tajo.version>
     <tajo.root>${basedir}</tajo.root>
+    <parquet.version>1.3.2</parquet.version>
+    <parquet.format.version>2.0.0</parquet.format.version>
   </properties>
 
   <modules>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/pom.xml b/tajo-catalog/tajo-catalog-common/pom.xml
index a4db647..fa90237 100644
--- a/tajo-catalog/tajo-catalog-common/pom.xml
+++ b/tajo-catalog/tajo-catalog-common/pom.xml
@@ -160,6 +160,22 @@
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging-api</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-format</artifactId>
+      <version>${parquet.format.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index c5e0dd4..c695fc8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.catalog;
 
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+
 public class CatalogConstants {
   public static final String STORE_CLASS="tajo.catalog.store.class";
 
@@ -64,4 +67,30 @@ public class CatalogConstants {
   public static final String DEFAULT_FIELD_DELIMITER = "|";
   public static final String DEFAULT_BINARY_SERDE = "org.apache.tajo.storage.BinarySerializerDeserializer";
   public static final String DEFAULT_TEXT_SERDE = "org.apache.tajo.storage.TextSerializerDeserializer";
+
+  public static final String PARQUET_DEFAULT_BLOCK_SIZE;
+  public static final String PARQUET_DEFAULT_PAGE_SIZE;
+  public static final String PARQUET_DEFAULT_COMPRESSION_CODEC_NAME;
+  public static final String PARQUET_DEFAULT_IS_DICTIONARY_ENABLED;
+  public static final String PARQUET_DEFAULT_IS_VALIDATION_ENABLED;
+
+  static {
+    PARQUET_DEFAULT_BLOCK_SIZE =
+        Integer.toString(ParquetWriter.DEFAULT_BLOCK_SIZE);
+    PARQUET_DEFAULT_PAGE_SIZE =
+        Integer.toString(ParquetWriter.DEFAULT_PAGE_SIZE);
+
+    // When parquet-hadoop 1.3.3 is available, this should be changed to
+    // ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.
+    PARQUET_DEFAULT_COMPRESSION_CODEC_NAME =
+        CompressionCodecName.UNCOMPRESSED.name().toLowerCase();
+
+    // When parquet-hadoop 1.3.3 is available, this should be changed to
+    // ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED.
+    PARQUET_DEFAULT_IS_DICTIONARY_ENABLED = "true";
+
+    // When parquet-hadoop 1.3.3 is available, this should be changed to
+    // ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED.
+    PARQUET_DEFAULT_IS_VALIDATION_ENABLED = "false";
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6e7d2a5..f9f92f0 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -36,6 +36,8 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import parquet.hadoop.ParquetOutputFormat;
+
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
@@ -296,6 +298,17 @@ public class CatalogUtil {
     } else if(StoreType.SEQUENCEFILE == type){
       options.put(CatalogConstants.SEQUENCEFILE_SERDE, CatalogConstants.DEFAULT_TEXT_SERDE);
       options.put(CatalogConstants.SEQUENCEFILE_DELIMITER, CatalogConstants.DEFAULT_FIELD_DELIMITER);
+    } else if (type == StoreType.PARQUET) {
+      options.put(ParquetOutputFormat.BLOCK_SIZE,
+          CatalogConstants.PARQUET_DEFAULT_BLOCK_SIZE);
+      options.put(ParquetOutputFormat.PAGE_SIZE,
+          CatalogConstants.PARQUET_DEFAULT_PAGE_SIZE);
+      options.put(ParquetOutputFormat.COMPRESSION,
+          CatalogConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME);
+      options.put(ParquetOutputFormat.ENABLE_DICTIONARY,
+          CatalogConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED);
+      options.put(ParquetOutputFormat.VALIDATION,
+          CatalogConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED);
     }
 
     return options;

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index c3f08cb..b9a162a 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -35,8 +35,6 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <parquet.version>1.3.2</parquet.version>
-    <parquet.format.version>2.0.0</parquet.format.version>
   </properties>
 
   <repositories>
@@ -265,6 +263,7 @@
       <artifactId>commons-lang</artifactId>
       <version>2.6</version>
     </dependency>
+
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index cb2f243..10b9331 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.storage.parquet;
 
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
@@ -34,6 +37,11 @@ import java.io.IOException;
  */
 public class ParquetAppender extends FileAppender {
   private TajoParquetWriter writer;
+  private int blockSize;
+  private int pageSize;
+  private CompressionCodecName compressionCodecName;
+  private boolean enableDictionary;
+  private boolean validating;
   private TableStatistics stats;
 
   /**
@@ -47,6 +55,16 @@ public class ParquetAppender extends FileAppender {
   public ParquetAppender(Configuration conf, Schema schema, TableMeta meta,
                          Path path) throws IOException {
     super(conf, schema, meta, path);
+    this.blockSize = Integer.parseInt(
+        meta.getOption(ParquetOutputFormat.BLOCK_SIZE));
+    this.pageSize = Integer.parseInt(
+        meta.getOption(ParquetOutputFormat.PAGE_SIZE));
+    this.compressionCodecName = CompressionCodecName.fromConf(
+        meta.getOption(ParquetOutputFormat.COMPRESSION));
+    this.enableDictionary = Boolean.parseBoolean(
+        meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY));
+    this.validating = Boolean.parseBoolean(
+        meta.getOption(ParquetOutputFormat.VALIDATION));
   }
 
   /**
@@ -54,10 +72,17 @@ public class ParquetAppender extends FileAppender {
    * and initializes the table statistics if enabled.
    */
   public void init() throws IOException {
-    writer = new TajoParquetWriter(path, schema);
+    writer = new TajoParquetWriter(path,
+                                   schema,
+                                   compressionCodecName,
+                                   blockSize,
+                                   pageSize,
+                                   enableDictionary,
+                                   validating);
     if (enabledStats) {
       this.stats = new TableStatistics(schema);
     }
+    super.init();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index 086f490..38d8ca4 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -57,6 +57,7 @@ public class ParquetScanner extends FileScanner {
     }
     reader = new TajoParquetReader(fragment.getPath(), schema,
                                    new Schema(targets));
+    super.init();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 5427592..354fbc2 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,6 +94,7 @@ public class TestMergeScanner {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
     Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
schema, table1Path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 0a38985..a500f09 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -146,6 +146,7 @@ public class TestStorages {
     schema.addColumn("score", Type.FLOAT4);
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
 
     Path tablePath = new Path(testDir, "testProjection.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
@@ -208,6 +209,7 @@ public class TestStorages {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
@@ -269,6 +271,7 @@ public class TestStorages {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
     meta.putOption(CatalogConstants.CSVFILE_NULL, "\\\\N");
     meta.putOption(CatalogConstants.RCFILE_NULL, "\\\\N");
     meta.putOption(CatalogConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
index ad49b36..140aa09 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -95,6 +95,7 @@ public class TestStorages {
       schema.addColumn("age", Type.INT8);
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
       Path tablePath = new Path(testDir, "Splitable.data");
       Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
schema, tablePath);
       appender.enableStats();
@@ -147,6 +148,7 @@ public class TestStorages {
     schema.addColumn("score", Type.FLOAT4);
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
 
     Path tablePath = new Path(testDir, "testProjection.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
@@ -207,6 +209,7 @@ public class TestStorages {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType));
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);


Mime
View raw message