parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tians...@apache.org
Subject parquet-mr git commit: PARQUET-341 improve write performance for wide schema sparse data
Date Wed, 05 Aug 2015 23:29:24 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master b86f68e39 -> 2f956f465


PARQUET-341 improve write performance for wide schema sparse data

In write path, when there are tons of sparse data, most of time is spent on writing nulls.
Currently writing nulls has the same code path as writing values, which is reclusive traverse
all the leaves when a group is null.
Due to the fact that when a group is null all the leaves beneath it should be written with
null value with the same repetition level and definition level, we can eliminate the recursion
call to get the leaves

This PR caches the leaves for each group node. So when a group node is null, their leaves
can be flushed with null values directly.

We tested it with a really wide schema on one of our production data. It improves the performance
by ~20%

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #247 from tsdeng/flush_null_directly and squashes the following commits:

253f2e3 [Tianshuo Deng] address comments
8676cd7 [Tianshuo Deng] flush null directly to leaves


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/2f956f46
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/2f956f46
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/2f956f46

Branch: refs/heads/master
Commit: 2f956f46580e5b4752173e885d37a20fe31a78d8
Parents: b86f68e
Author: Tianshuo Deng <tdeng@twitter.com>
Authored: Wed Aug 5 16:29:00 2015 -0700
Committer: Tianshuo Deng <tdeng@twitter.com>
Committed: Wed Aug 5 16:29:00 2015 -0700

----------------------------------------------------------------------
 .../org/apache/parquet/io/MessageColumnIO.java  | 39 +++++++++++++++++---
 .../hadoop/InternalParquetRecordReader.java     |  3 +-
 2 files changed, 36 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/2f956f46/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index 9a8f88e..048dcc3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -18,9 +18,12 @@
  */
 package org.apache.parquet.io;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnWriteStore;
@@ -172,16 +175,39 @@ public class MessageColumnIO extends GroupColumnIO {
     private final FieldsMarker[] fieldsWritten;
     private final int[] r;
     private final ColumnWriter[] columnWriter;
+    /** maintain a map of a group and all the leaf nodes underneath it. It's used to optimize
writing null for a group node
+     * all the leaves can be called directly without traversing the sub tree of the group
node */
+    private Map<GroupColumnIO, List<ColumnWriter>>  groupToLeafWriter = new HashMap<GroupColumnIO,
List<ColumnWriter>>();
     private final ColumnWriteStore columns;
     private boolean emptyField = true;
 
+    private void buildGroupToLeafWriterMap(PrimitiveColumnIO primitive, ColumnWriter writer)
{
+      GroupColumnIO  parent = primitive.getParent();
+      do {
+        getLeafWriters(parent).add(writer);
+        parent = parent.getParent();
+      } while (parent != null);
+    }
+
+    private List<ColumnWriter> getLeafWriters(GroupColumnIO group) {
+      List<ColumnWriter> writers = groupToLeafWriter.get(group);
+      if (writers == null) {
+        writers = new ArrayList<ColumnWriter>();
+        groupToLeafWriter.put(group, writers);
+      }
+      return writers;
+    }
+
     public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
       this.columns = columns;
       int maxDepth = 0;
       this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];
+
       for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
+        ColumnWriter w = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
         maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length);
-        columnWriter[primitiveColumnIO.getId()] = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
+        columnWriter[primitiveColumnIO.getId()] = w;
+        buildGroupToLeafWriterMap(primitiveColumnIO, w);
       }
 
       fieldsWritten = new FieldsMarker[maxDepth];
@@ -271,10 +297,13 @@ public class MessageColumnIO extends GroupColumnIO {
         columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d);
       } else {
         GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
-        int childrenCount = groupColumnIO.getChildrenCount();
-        for (int i = 0; i < childrenCount; i++) {
-          writeNull(groupColumnIO.getChild(i), r, d);
-        }
+        writeNullToLeaves(groupColumnIO, r, d);
+      }
+    }
+
+    private void writeNullToLeaves(GroupColumnIO group, int r, int d) {
+      for(ColumnWriter leafWriter: groupToLeafWriter.get(group)) {
+        leafWriter.writeNull(r,d);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/2f956f46/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index c1bd037..21e69b7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -62,6 +62,7 @@ class InternalParquetRecordReader<T> {
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
+  private MessageColumnIO columnIO;
   private int columnCount;
   private final ReadSupport<T> readSupport;
 
@@ -136,7 +137,6 @@ class InternalParquetRecordReader<T> {
       BenchmarkCounter.incrementTime(timeSpentReading);
       if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count
= " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema,
strictTypeChecking);
       recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
@@ -174,6 +174,7 @@ class InternalParquetRecordReader<T> {
     this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
     this.requestedSchema = readContext.getRequestedSchema();
     this.fileSchema = fileSchema;
+    this.columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
     this.file = file;
     this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(


Mime
View raw message