parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tians...@apache.org
Subject parquet-mr git commit: PARQUET-343 Caching nulls on group node to improve write performance on wide schema sparse data
Date Thu, 20 Aug 2015 21:21:27 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 3f36b7b50 -> 01fbf81e3


PARQUET-343 Caching nulls on group node to improve write performance on wide schema sparse
data

For really wide schema with sparse data, If a group node is empty, it could have a huge number
of leaves underneath it. Calling writeMull for each leaf every time when it's ancestor group
node is null is in-effcient and is bad for data locality in the memory especially when the
number of leaves is huge.

Instead, null can be cached on the group node. Flushing is only triggered when a group node
becomes non-null from null. This way, all the cached null values will be flushed to the leaf
nodes in a tight loop and improves write performance.

We tested this approach combined with PARQUET-341 on a really large schema and gave us ~2X
improvement on write performance

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #249 from tsdeng/batch_null and squashes the following commits:

0a61646 [Tianshuo Deng] use curly braces even for 1 line if statements
a8964c0 [Tianshuo Deng] optimize writeNullToLeaves
5309612 [Tianshuo Deng] optimize cacheNullForGroup
ecbdfca [Tianshuo Deng] add comments
ed692c0 [Tianshuo Deng] WIP
0cae1b6 [Tianshuo Deng] remove unused class
8e07db4 [Tianshuo Deng] refactor
dead618 [Tianshuo Deng] reformat
c3c0c70 [Tianshuo Deng] refactor
636ab52 [Tianshuo Deng] remove unused method
767b4fd [Tianshuo Deng] use parent definition level
8f251a0 [Tianshuo Deng] use IntArrayList
c549c84 [Tianshuo Deng] fix
9583d04 [Tianshuo Deng] wIP
d8cb878 [Tianshuo Deng] WIP
35f1fa1 [Tianshuo Deng] cache columnWriter for each parent
46fd464 [Tianshuo Deng] address comments
8c83964 [Tianshuo Deng] flush null directly to leaves


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/01fbf81e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/01fbf81e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/01fbf81e

Branch: refs/heads/master
Commit: 01fbf81e34a36cedf505f20b1c52306afceedc3e
Parents: 3f36b7b
Author: Tianshuo Deng <tdeng@twitter.com>
Authored: Thu Aug 20 14:21:12 2015 -0700
Committer: Tianshuo Deng <tdeng@twitter.com>
Committed: Thu Aug 20 14:21:12 2015 -0700

----------------------------------------------------------------------
 .../org/apache/parquet/io/MessageColumnIO.java  | 139 ++++++++++++++++---
 .../parquet/io/ValidatingRecordConsumer.java    |   4 +
 .../apache/parquet/io/api/RecordConsumer.java   |   7 +
 .../org/apache/parquet/io/TestColumnIO.java     |  21 ++-
 .../org/apache/parquet/io/TestFiltered.java     |   5 +-
 .../hadoop/InternalParquetRecordReader.java     |   3 +-
 .../hadoop/InternalParquetRecordWriter.java     |   6 +-
 .../parquet/thrift/TestParquetReadProtocol.java |   1 +
 8 files changed, 154 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index 048dcc3..cb1c8d6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -30,6 +30,7 @@ import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.dictionary.IntList;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -46,6 +47,8 @@ import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntIterator;
 import static org.apache.parquet.Preconditions.checkNotNull;
 
 /**
@@ -53,7 +56,6 @@ import static org.apache.parquet.Preconditions.checkNotNull;
  *
  *
  * @author Julien Le Dem
- *
  */
 public class MessageColumnIO extends GroupColumnIO {
   private static final Log logger = Log.getLog(MessageColumnIO.class);
@@ -144,6 +146,38 @@ public class MessageColumnIO extends GroupColumnIO {
     });
   }
 
+  /**
+   * To improve null writing performance, we cache null values on group nodes. We flush nulls
when a
+   * non-null value hits the group node.
+   *
+   * Intuitively, when a group node hits a null value, all the leaves underneath it should
be null.
+   * A direct way of doing it is to write nulls for all the leaves underneath it when a group
node
+   * is null. This approach is not optimal, consider following case:
+   *
+   *    - When the schema is really wide where for each group node, there are thousands of
leaf
+   *    nodes underneath it.
+   *    - When the data being written is really sparse, group nodes could hit nulls frequently.
+   *
+   * With the direct approach, if a group node hit null values a thousand times, and there
are a
+   * thousand nodes underneath it.
+   * For each null value, it iterates over a thousand leaf writers to write null values and
it
+   *  will do it for a thousand null values.
+   *
+   * In the above case, each leaf writer maintains it's own buffer of values, calling thousands
of
+   * them in turn is very bad for memory locality. Instead each group node can remember the
null values
+   * encountered and flush only when a non-null value hits the group node. In this way, when
we flush
+   * null values, we only iterate through all the leaves 1 time and multiple cached null
values are
+   * flushed to each leaf in a tight loop. This implementation has following characteristics.
+   *
+   *    1. When a group node hits a null value, it adds the repetition level of the null
value to
+   *    the groupNullCache. The definition level of the cached nulls should always be the
same as
+   *    the definition level of the group node so there is no need to store it.
+   *
+   *    2. When a group node hits a non null value and it has null value cached, it should
flush null
+   *    values and start from his children group nodes first. This make sure the order of
null values
+   *     being flushed is correct.
+   *
+   */
   private class MessageColumnIORecordConsumer extends RecordConsumer {
     private ColumnIO currentColumnIO;
     private int currentLevel = 0;
@@ -154,8 +188,8 @@ public class MessageColumnIO extends GroupColumnIO {
       @Override
       public String toString() {
         return "VistedIndex{" +
-                "vistedIndexes=" + vistedIndexes +
-                '}';
+            "vistedIndexes=" + vistedIndexes +
+            '}';
       }
 
       public void reset(int fieldsCount) {
@@ -175,14 +209,24 @@ public class MessageColumnIO extends GroupColumnIO {
     private final FieldsMarker[] fieldsWritten;
     private final int[] r;
     private final ColumnWriter[] columnWriter;
-    /** maintain a map of a group and all the leaf nodes underneath it. It's used to optimize
writing null for a group node
-     * all the leaves can be called directly without traversing the sub tree of the group
node */
-    private Map<GroupColumnIO, List<ColumnWriter>>  groupToLeafWriter = new HashMap<GroupColumnIO,
List<ColumnWriter>>();
+
+    /**
+     * Maintain a map of groups and all the leaf nodes underneath it. It's used to optimize
writing null for a group node.
+     * Instead of using recursion calls, all the leaves can be called directly without traversing
the sub tree of the group node
+     */
+    private Map<GroupColumnIO, List<ColumnWriter>> groupToLeafWriter = new HashMap<GroupColumnIO,
List<ColumnWriter>>();
+
+
+    /*
+     * Cache nulls for each group node. It only stores the repetition level, since the definition
level
+     * should always be the definition level of the group node.
+     */
+    private Map<GroupColumnIO, IntArrayList> groupNullCache = new HashMap<GroupColumnIO,
IntArrayList>();
     private final ColumnWriteStore columns;
     private boolean emptyField = true;
 
     private void buildGroupToLeafWriterMap(PrimitiveColumnIO primitive, ColumnWriter writer)
{
-      GroupColumnIO  parent = primitive.getParent();
+      GroupColumnIO parent = primitive.getParent();
       do {
         getLeafWriters(parent).add(writer);
         parent = parent.getParent();
@@ -227,7 +271,7 @@ public class MessageColumnIO extends GroupColumnIO {
 
     private void log(Object m) {
       String indent = "";
-      for (int i = 0; i<currentLevel; ++i) {
+      for (int i = 0; i < currentLevel; ++i) {
         indent += "  ";
       }
       logger.debug(indent + m);
@@ -238,7 +282,7 @@ public class MessageColumnIO extends GroupColumnIO {
       if (DEBUG) log("< MESSAGE START >");
       currentColumnIO = MessageColumnIO.this;
       r[0] = 0;
-      int numberOfFieldsToVisit = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      int numberOfFieldsToVisit = ((GroupColumnIO) currentColumnIO).getChildrenCount();
       fieldsWritten[0].reset(numberOfFieldsToVisit);
       if (DEBUG) printState();
     }
@@ -255,7 +299,7 @@ public class MessageColumnIO extends GroupColumnIO {
     public void startField(String field, int index) {
       try {
         if (DEBUG) log("startField(" + field + ", " + index + ")");
-        currentColumnIO = ((GroupColumnIO)currentColumnIO).getChild(index);
+        currentColumnIO = ((GroupColumnIO) currentColumnIO).getChild(index);
         emptyField = true;
         if (DEBUG) printState();
       } catch (RuntimeException e) {
@@ -276,11 +320,11 @@ public class MessageColumnIO extends GroupColumnIO {
     }
 
     private void writeNullForMissingFieldsAtCurrentLevel() {
-      int currentFieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      int currentFieldsCount = ((GroupColumnIO) currentColumnIO).getChildrenCount();
       for (int i = 0; i < currentFieldsCount; i++) {
         if (!fieldsWritten[currentLevel].isWritten(i)) {
           try {
-            ColumnIO undefinedField = ((GroupColumnIO)currentColumnIO).getChild(i);
+            ColumnIO undefinedField = ((GroupColumnIO) currentColumnIO).getChild(i);
             int d = currentColumnIO.getDefinitionLevel();
             if (DEBUG)
               log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + r[currentLevel]
+ "," + d + ")");
@@ -294,17 +338,36 @@ public class MessageColumnIO extends GroupColumnIO {
 
     private void writeNull(ColumnIO undefinedField, int r, int d) {
       if (undefinedField.getType().isPrimitive()) {
-        columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d);
+        columnWriter[((PrimitiveColumnIO) undefinedField).getId()].writeNull(r, d);
       } else {
-        GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
-        writeNullToLeaves(groupColumnIO, r, d);
+        GroupColumnIO groupColumnIO = (GroupColumnIO) undefinedField;
+        // only cache the repetition level, the definition level should always be the definition
level of the parent node
+        cacheNullForGroup(groupColumnIO, r);
       }
     }
 
-    private void writeNullToLeaves(GroupColumnIO group, int r, int d) {
-      for(ColumnWriter leafWriter: groupToLeafWriter.get(group)) {
-        leafWriter.writeNull(r,d);
+    private void cacheNullForGroup(GroupColumnIO group, int r) {
+      IntArrayList nulls = groupNullCache.get(group);
+      if (nulls == null) {
+        nulls = new IntArrayList();
+        groupNullCache.put(group, nulls);
       }
+      nulls.add(r);
+    }
+
+    private void writeNullToLeaves(GroupColumnIO group) {
+      IntArrayList nullCache = groupNullCache.get(group);
+      if (nullCache == null || nullCache.isEmpty())
+        return;
+
+      int parentDefinitionLevel = group.getParent().getDefinitionLevel();
+      for (ColumnWriter leafWriter : groupToLeafWriter.get(group)) {
+        for (IntIterator iter = nullCache.iterator(); iter.hasNext();) {
+          int repetitionLevel = iter.nextInt();
+          leafWriter.writeNull(repetitionLevel, parentDefinitionLevel);
+        }
+      }
+      nullCache.clear();
     }
 
     private void setRepetitionLevel() {
@@ -315,28 +378,52 @@ public class MessageColumnIO extends GroupColumnIO {
     @Override
     public void startGroup() {
       if (DEBUG) log("startGroup()");
+      GroupColumnIO group = (GroupColumnIO) currentColumnIO;
 
-      ++ currentLevel;
+      // current group is not null, need to flush all the nulls that were cached before
+      if (hasNullCache(group)) {
+        flushCachedNulls(group);
+      }
+
+      ++currentLevel;
       r[currentLevel] = r[currentLevel - 1];
 
-      int fieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      int fieldsCount = ((GroupColumnIO) currentColumnIO).getChildrenCount();
       fieldsWritten[currentLevel].reset(fieldsCount);
       if (DEBUG) printState();
     }
 
+    private boolean hasNullCache(GroupColumnIO group) {
+      IntArrayList nulls = groupNullCache.get(group);
+      return nulls != null && !nulls.isEmpty();
+    }
+
+
+    private void flushCachedNulls(GroupColumnIO group) {
+      //flush children first
+      for (int i = 0; i < group.getChildrenCount(); i++) {
+        ColumnIO child = group.getChild(i);
+        if (child instanceof GroupColumnIO) {
+          flushCachedNulls((GroupColumnIO) child);
+        }
+      }
+      //then flush itself
+      writeNullToLeaves(group);
+    }
+
     @Override
     public void endGroup() {
       if (DEBUG) log("endGroup()");
       emptyField = false;
       writeNullForMissingFieldsAtCurrentLevel();
-      -- currentLevel;
+      --currentLevel;
 
       setRepetitionLevel();
       if (DEBUG) printState();
     }
 
     private ColumnWriter getColumnWriter() {
-      return columnWriter[((PrimitiveColumnIO)currentColumnIO).getId()];
+      return columnWriter[((PrimitiveColumnIO) currentColumnIO).getId()];
     }
 
     @Override
@@ -399,6 +486,12 @@ public class MessageColumnIO extends GroupColumnIO {
       if (DEBUG) printState();
     }
 
+
+    //should flush null for all groups
+    @Override
+    public void flush() {
+      flushCachedNulls(MessageColumnIO.this);
+    }
   }
 
   public RecordConsumer getRecordWriter(ColumnWriteStore columns) {
@@ -421,6 +514,6 @@ public class MessageColumnIO extends GroupColumnIO {
 
   @Override
   public MessageType getType() {
-    return (MessageType)super.getType();
+    return (MessageType) super.getType();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
index e1d3ba7..bf4c196 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
@@ -126,6 +126,10 @@ public class ValidatingRecordConsumer extends RecordConsumer {
     types.pop();
     previousField.pop();
   }
+  @Override
+  public void flush(){
+    delegate.flush();
+  }
 
   private void validate(PrimitiveTypeName p) {
     Type currentType = types.peek().asGroupType().getType(fields.peek());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
index 953d87a..e11d763 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
@@ -125,4 +125,11 @@ abstract public class RecordConsumer {
    */
   abstract public void addDouble(double value);
 
+  /**
+   * NoOps by default
+   * Subclass class can implement its own flushing logic
+   */
+  public void flush() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index 7c7b72c..e7274cc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -288,10 +288,12 @@ public class TestColumnIO {
     ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
     ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
     MessageColumnIO columnIO = columnIOFactory.getColumnIO(writtenSchema);
-    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), writtenSchema);
+    RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+    GroupWriter groupWriter = new GroupWriter(recordWriter, writtenSchema);
     for (Group group : groups) {
       groupWriter.write(group);
     }
+    recordWriter.flush();
     columns.flush();
   }
 
@@ -310,9 +312,12 @@ public class TestColumnIO {
     {
       MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema);
       log(columnIO);
-      GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+      RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+      GroupWriter groupWriter = new GroupWriter(recordWriter, schema);
       groupWriter.write(r1);
       groupWriter.write(r2);
+
+      recordWriter.flush();
       columns.flush();
       log(columns);
       log("=========");
@@ -461,11 +466,13 @@ public class TestColumnIO {
     log(columnIO);
 
     // Write groups.
+    RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     GroupWriter groupWriter =
-        new GroupWriter(columnIO.getRecordWriter(columns), messageSchema);
+        new GroupWriter(recordWriter, messageSchema);
     for (Group group : groups) {
       groupWriter.write(group);
     }
+    recordWriter.flush();
     columns.flush();
 
     // Read groups and verify.
@@ -508,7 +515,9 @@ public class TestColumnIO {
     MemPageStore memPageStore = new MemPageStore(1);
     ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
     MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    new GroupWriter(columnIO.getRecordWriter(columns), schema).write(r1);
+    RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+    new GroupWriter(recordWriter, schema).write(r1);
+    recordWriter.flush();
     columns.flush();
 
     RecordReader<Void> recordReader = columnIO.getRecordReader(memPageStore, new ExpectationValidatingConverter(expectedEventsForR1,
schema));
@@ -584,9 +593,11 @@ public class TestColumnIO {
 
     ValidatingColumnWriteStore columns = new ValidatingColumnWriteStore(expected);
     MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+    RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+    GroupWriter groupWriter = new GroupWriter(recordWriter, schema);
     groupWriter.write(r1);
     groupWriter.write(r2);
+    recordWriter.flush();
     columns.validate();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
index 479b138..9fde4b1 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
@@ -21,6 +21,7 @@ package org.apache.parquet.io;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.parquet.io.api.RecordConsumer;
 import org.junit.Test;
 
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
@@ -259,11 +260,13 @@ public class TestFiltered {
     MemPageStore memPageStore = new MemPageStore(number * 2);
     ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);
 
-    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+    RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+    GroupWriter groupWriter = new GroupWriter(recordWriter, schema);
     for ( int i = 0; i < number; i++ ) {
       groupWriter.write(r1);
       groupWriter.write(r2);
     }
+    recordWriter.flush();
     columns.flush();
     return memPageStore;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 21e69b7..c1bd037 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -62,7 +62,6 @@ class InternalParquetRecordReader<T> {
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
-  private MessageColumnIO columnIO;
   private int columnCount;
   private final ReadSupport<T> readSupport;
 
@@ -137,6 +136,7 @@ class InternalParquetRecordReader<T> {
       BenchmarkCounter.incrementTime(timeSpentReading);
       if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count
= " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema,
strictTypeChecking);
       recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
@@ -174,7 +174,6 @@ class InternalParquetRecordReader<T> {
     this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
     this.requestedSchema = readContext.getRequestedSchema();
     this.fileSchema = fileSchema;
-    this.columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
     this.file = file;
     this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 37e8db5..ab9cb3e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -37,6 +37,7 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
 import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 
 class InternalParquetRecordWriter<T> {
@@ -63,6 +64,7 @@ class InternalParquetRecordWriter<T> {
 
   private ColumnWriteStore columnStore;
   private ColumnChunkPageWriteStore pageStore;
+  private RecordConsumer recordConsumer;
 
 
   /**
@@ -106,7 +108,8 @@ class InternalParquetRecordWriter<T> {
         pageStore,
         pageSize);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
-    writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
+    this.recordConsumer = columnIO.getRecordWriter(columnStore);
+    writeSupport.prepareForWrite(recordConsumer);
   }
 
   public void close() throws IOException, InterruptedException {
@@ -154,6 +157,7 @@ class InternalParquetRecordWriter<T> {
 
   private void flushRowGroupToStore()
       throws IOException {
+    recordConsumer.flush();
     LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
     if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
       LOG.warn("Too much memory used: " + columnStore.memUsageString());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
index cc55a3f..f5f3ff1 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
@@ -154,6 +154,7 @@ public class TestParquetReadProtocol {
     ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO,
thriftType);
 
     expected.write(parquetWriteProtocol);
+    recordWriter.flush();
     columns.flush();
 
     ThriftRecordConverter<T> converter = new TBaseRecordConverter<T>(thriftClass,
schema, thriftType);


Mime
View raw message