hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vihan...@apache.org
Subject [2/2] hive git commit: HIVE-16672: Parquet vectorization doesn't work for tables with partition info (Colin Ma, reviewed by Ferdinand Xu)
Date Fri, 27 Oct 2017 00:08:44 GMT
HIVE-16672: Parquet vectorization doesn't work for tables with partition info (Colin Ma, reviewed
by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c14ae685
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c14ae685
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c14ae685

Branch: refs/heads/branch-2
Commit: c14ae6857bc8e2bc97bf78c7a06329e9e317bc8c
Parents: c47b2a0
Author: Ferdinand Xu <cheng.a.xu@intel.com>
Authored: Thu Jun 1 14:14:50 2017 +0800
Committer: Vihang Karajgaonkar <vihang@cloudera.com>
Committed: Thu Oct 26 17:04:49 2017 -0700

----------------------------------------------------------------------
 .../vector/VectorizedParquetRecordReader.java   |   32 +-
 .../vector_partitioned_date_time.q              |  113 +
 .../llap/vector_partitioned_date_time.q.out     | 2434 ++++++++++++++++++
 3 files changed, 2573 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c14ae685/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 5423bd0..846f7c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -72,6 +73,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private List<TypeInfo> columnTypesList;
   private VectorizedRowBatchCtx rbCtx;
   private List<Integer> indexColumnsWanted;
+  private Object[] partitionValues;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -123,12 +125,23 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       }
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+      initPartitionValues((FileSplit) oldInputSplit, conf);
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + e);
       throw new RuntimeException(e);
     }
   }
 
+  private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException
{
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+    } else {
+      partitionValues = null;
+    }
+   }
+
   public void initialize(
     InputSplit oldSplit,
     JobConf configuration) throws IOException, InterruptedException {
@@ -265,16 +278,23 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     if (rowsReturned >= totalRowCount) {
       return false;
     }
+
+    // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+    if (partitionValues != null) {
+      rbCtx.addPartitionColsToBatch(columnarBatch, partitionValues);
+    }
     checkEndOfRowGroup();
 
     int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned);
-    for (int i = 0; i < columnReaders.length; ++i) {
-      if (columnReaders[i] == null) {
-        continue;
+    if (colsToInclude.size() > 0) {
+      for (int i = 0; i < columnReaders.length; ++i) {
+        if (columnReaders[i] == null) {
+          continue;
+        }
+        columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
+        columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
+            columnTypesList.get(colsToInclude.get(i)));
       }
-      columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
-      columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
-        columnTypesList.get(colsToInclude.get(i)));
     }
     rowsReturned += num;
     columnarBatch.size = num;

http://git-wip-us.apache.org/repos/asf/hive/blob/c14ae685/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
index bf4c461..107fe7c 100644
--- a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
+++ b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
@@ -126,3 +126,116 @@ explain vectorization expression
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by fl_time;
 
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by fl_time;
+
+-- test for Parquet file format
+CREATE TABLE flights_tiny_parquet STORED AS PARQUET AS
+SELECT origin_city_name, dest_city_name, fl_date, to_utc_timestamp(fl_date, 'America/Los_Angeles')
as fl_time, arr_delay, fl_num
+FROM flights_tiny;
+
+SELECT * FROM flights_tiny_parquet;
+
+SET hive.vectorized.execution.enabled=false;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_date (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_time TIMESTAMP,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_date DATE)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_date
+PARTITION (fl_date)
+SELECT  origin_city_name, dest_city_name, fl_time, arr_delay, fl_num, fl_date
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_timestamp (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_date DATE,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_time TIMESTAMP)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_timestamp
+PARTITION (fl_time)
+SELECT  origin_city_name, dest_city_name, fl_date, arr_delay, fl_num, fl_time
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+explain vectorization expression
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
\ No newline at end of file


Mime
View raw message