parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ziva...@apache.org
Subject [parquet-mr] branch column-indexes updated: PARQUET-1211: Column indexes: read/write API (#456)
Date Thu, 17 May 2018 10:57:52 GMT
This is an automated email from the ASF dual-hosted git repository.

zivanfi pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/column-indexes by this push:
     new aa571d7  PARQUET-1211: Column indexes: read/write API (#456)
aa571d7 is described below

commit aa571d7e16ad2872f9b62bab8175303f4ecc8b67
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Thu May 17 12:57:50 2018 +0200

    PARQUET-1211: Column indexes: read/write API (#456)
---
 .../apache/parquet/column/ParquetProperties.java   |   2 +-
 ...WriteStoreV2.java => ColumnWriteStoreBase.java} | 108 ++-
 .../parquet/column/impl/ColumnWriteStoreV1.java    | 115 +--
 .../parquet/column/impl/ColumnWriteStoreV2.java    | 148 +---
 .../{ColumnWriterV2.java => ColumnWriterBase.java} | 172 ++--
 .../apache/parquet/column/impl/ColumnWriterV1.java | 255 +-----
 .../apache/parquet/column/impl/ColumnWriterV2.java | 294 +------
 .../org/apache/parquet/column/page/PageWriter.java |  17 +-
 .../rle/RunLengthBitPackingHybridValuesWriter.java |   9 +-
 .../columnindex/BinaryColumnIndexBuilder.java      | 105 +++
 .../columnindex/BooleanColumnIndexBuilder.java     | 106 +++
 .../internal/column/columnindex/BoundaryOrder.java |  26 +
 .../internal/column/columnindex/ColumnIndex.java   |  55 ++
 .../column/columnindex/ColumnIndexBuilder.java     | 429 ++++++++++
 .../columnindex/DoubleColumnIndexBuilder.java      | 108 +++
 .../columnindex/FloatColumnIndexBuilder.java       | 108 +++
 .../column/columnindex/IntColumnIndexBuilder.java  | 108 +++
 .../column/columnindex/LongColumnIndexBuilder.java | 108 +++
 .../internal/column/columnindex/OffsetIndex.java   |  52 ++
 .../column/columnindex/OffsetIndexBuilder.java     | 175 ++++
 .../apache/parquet/column/mem/TestMemColumn.java   |   9 +-
 .../parquet/column/page/mem/MemPageWriter.java     |   6 +
 .../column/columnindex/TestColumnIndexBuilder.java | 949 +++++++++++++++++++++
 .../column/columnindex/TestOffsetIndexBuilder.java | 111 +++
 parquet-common/pom.xml                             |   6 +
 .../format/converter/ParquetMetadataConverter.java | 108 +++
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |  46 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   |  38 +
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 179 +++-
 .../hadoop/metadata/ColumnChunkMetaData.java       |  39 +
 .../internal/hadoop/metadata/IndexReference.java   |  41 +
 .../converter/TestParquetMetadataConverter.java    |  62 ++
 .../hadoop/TestColumnChunkPageWriteStore.java      |  89 +-
 .../parquet/hadoop/TestParquetFileWriter.java      | 138 +++
 pom.xml                                            |   2 +-
 35 files changed, 3439 insertions(+), 884 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 39b65da..f965511 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -163,7 +163,7 @@ public class ParquetProperties {
                                               PageWriteStore pageStore) {
     switch (writerVersion) {
     case PARQUET_1_0:
-      return new ColumnWriteStoreV1(pageStore, this);
+      return new ColumnWriteStoreV1(schema, pageStore, this);
     case PARQUET_2_0:
       return new ColumnWriteStoreV2(schema, pageStore, this);
     default:
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
similarity index 60%
copy from parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
copy to parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 7574ced..d04192f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,6 @@ import static java.lang.Math.min;
 import static java.util.Collections.unmodifiableMap;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -37,37 +36,79 @@ import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.schema.MessageType;
 
-public class ColumnWriteStoreV2 implements ColumnWriteStore {
+/**
+ * Base implementation for {@link ColumnWriteStore} to be extended to specialize for V1 and V2 pages.
+ */
+abstract class ColumnWriteStoreBase implements ColumnWriteStore {
+
+  // Used to support the deprecated workflow of ColumnWriteStoreV1 (lazy init of ColumnWriters)
+  private interface ColumnWriterProvider {
+    ColumnWriter getColumnWriter(ColumnDescriptor path);
+  }
+
+  private final ColumnWriterProvider columnWriterProvider;
 
   // will flush even if size bellow the threshold by this much to facilitate page alignment
   private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
 
-  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
-  private final Collection<ColumnWriterV2> writers;
+  private final Map<ColumnDescriptor, ColumnWriterBase> columns;
   private final ParquetProperties props;
   private final long thresholdTolerance;
   private long rowCount;
   private long rowCountForNextSizeCheck;
 
-  public ColumnWriteStoreV2(
+  // To be used by the deprecated constructor of ColumnWriteStoreV1
+  @Deprecated
+  ColumnWriteStoreBase(
+      final PageWriteStore pageWriteStore,
+      final ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+
+    this.columns = new TreeMap<>();
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        ColumnWriterBase column = columns.get(path);
+        if (column == null) {
+          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+          columns.put(path, column);
+        }
+        return column;
+      }
+    };
+  }
+
+  ColumnWriteStoreBase(
       MessageType schema,
       PageWriteStore pageWriteStore,
       ParquetProperties props) {
     this.props = props;
-    this.thresholdTolerance = (long)(props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
-    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, props));
+      mcolumns.put(path, createColumnWriter(path, pageWriter, props));
     }
     this.columns = unmodifiableMap(mcolumns);
-    this.writers = this.columns.values();
 
     this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        return columns.get(path);
+      }
+    };
   }
 
+  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    return columns.get(path);
+    return columnWriterProvider.getColumnWriter(path);
   }
 
   public Set<ColumnDescriptor> getColumnDescriptors() {
@@ -76,19 +117,19 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
 
   @Override
   public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
+    StringBuilder sb = new StringBuilder();
+    for (Entry<ColumnDescriptor, ColumnWriterBase> entry : columns.entrySet()) {
+      sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+      sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+      sb.append("\n");
+    }
+    return sb.toString();
   }
 
   @Override
   public long getAllocatedSize() {
     long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
+    for (ColumnWriterBase memColumn : columns.values()) {
       total += memColumn.allocatedSize();
     }
     return total;
@@ -97,7 +138,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
   @Override
   public long getBufferedSize() {
     long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
+    for (ColumnWriterBase memColumn : columns.values()) {
       total += memColumn.getTotalBufferedSize();
     }
     return total;
@@ -105,7 +146,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
 
   @Override
   public void flush() {
-    for (ColumnWriterV2 memColumn : columns.values()) {
+    for (ColumnWriterBase memColumn : columns.values()) {
       long rows = rowCount - memColumn.getRowsWrittenSoFar();
       if (rows > 0) {
         memColumn.writePage(rowCount);
@@ -116,24 +157,32 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
 
   public String memUsageString() {
     StringBuilder b = new StringBuilder("Store {\n");
-    for (ColumnWriterV2 memColumn : columns.values()) {
+    for (ColumnWriterBase memColumn : columns.values()) {
       b.append(memColumn.memUsageString(" "));
     }
     b.append("}\n");
     return b.toString();
   }
 
+  public long maxColMemSize() {
+    long max = 0;
+    for (ColumnWriterBase memColumn : columns.values()) {
+      max = Math.max(max, memColumn.getBufferedSizeInMemory());
+    }
+    return max;
+  }
+
   @Override
   public void close() {
     flush(); // calling flush() here to keep it consistent with the behavior before merging with master
-    for (ColumnWriterV2 memColumn : columns.values()) {
+    for (ColumnWriterBase memColumn : columns.values()) {
       memColumn.close();
     }
   }
 
   @Override
   public void endRecord() {
-    ++ rowCount;
+    ++rowCount;
     if (rowCount >= rowCountForNextSizeCheck) {
       sizeCheck();
     }
@@ -141,7 +190,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
 
   private void sizeCheck() {
     long minRecordToWait = Long.MAX_VALUE;
-    for (ColumnWriterV2 writer : writers) {
+    for (ColumnWriterBase writer : columns.values()) {
       long usedMem = writer.getCurrentPageBufferedSize();
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
@@ -152,7 +201,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
       long rowsToFillPage =
           usedMem == 0 ?
               props.getMaxRowCountForPageSizeCheck()
-              : (long)((float)rows) / usedMem * remainingMem;
+              : (long) ((float) rows) / usedMem * remainingMem;
       if (rowsToFillPage < minRecordToWait) {
         minRecordToWait = rowsToFillPage;
       }
@@ -161,7 +210,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
       minRecordToWait = props.getMinRowCountForPageSizeCheck();
     }
 
-    if(props.estimateNextSizeCheck()) {
+    if (props.estimateNextSizeCheck()) {
       // will check again halfway if between min and max
       rowCountForNextSizeCheck = rowCount +
           min(
@@ -171,5 +220,4 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
       rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
     }
   }
-
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 93a497f..7258423 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -18,121 +18,26 @@
  */
 package org.apache.parquet.column.impl;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.schema.MessageType;
 
-public class ColumnWriteStoreV1 implements ColumnWriteStore {
-
-  private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
-  private final PageWriteStore pageWriteStore;
-  private final ParquetProperties props;
-
-  public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
-                            ParquetProperties props) {
-    this.pageWriteStore = pageWriteStore;
-    this.props = props;
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    ColumnWriterV1 column = columns.get(path);
-    if (column == null) {
-      column = newMemColumn(path);
-      columns.put(path, column);
-    }
-    return column;
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
-    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterV1(path, pageWriter, props);
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.getBufferedSizeInMemory();
-    }
-    return total;
-  }
-
-  @Override
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
-  }
+public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
 
-  public long maxColMemSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long max = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      max = Math.max(max, memColumn.getBufferedSizeInMemory());
-    }
-    return max;
+  public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
+    super(schema, pageWriteStore, props);
   }
 
-  @Override
-  public void flush() {
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      memColumn.flush();
-    }
+  @Deprecated
+  public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
+      final ParquetProperties props) {
+    super(pageWriteStore, props);
   }
 
   @Override
-  public void endRecord() {
-    // V1 does not take record boundaries into account
-  }
-
-  public void close() {
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      memColumn.close();
-    }
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    return new ColumnWriterV1(path, pageWriter, props);
   }
-
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index 7574ced..bf1090d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -18,158 +18,20 @@
  */
 package org.apache.parquet.column.impl;
 
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.util.Collections.unmodifiableMap;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.schema.MessageType;
 
-public class ColumnWriteStoreV2 implements ColumnWriteStore {
-
-  // will flush even if size bellow the threshold by this much to facilitate page alignment
-  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
-
-  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
-  private final Collection<ColumnWriterV2> writers;
-  private final ParquetProperties props;
-  private final long thresholdTolerance;
-  private long rowCount;
-  private long rowCountForNextSizeCheck;
-
-  public ColumnWriteStoreV2(
-      MessageType schema,
-      PageWriteStore pageWriteStore,
-      ParquetProperties props) {
-    this.props = props;
-    this.thresholdTolerance = (long)(props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
-    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
-    for (ColumnDescriptor path : schema.getColumns()) {
-      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, props));
-    }
-    this.columns = unmodifiableMap(mcolumns);
-    this.writers = this.columns.values();
-
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    return columns.get(path);
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.getTotalBufferedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public void flush() {
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      long rows = rowCount - memColumn.getRowsWrittenSoFar();
-      if (rows > 0) {
-        memColumn.writePage(rowCount);
-      }
-      memColumn.finalizeColumnChunk();
-    }
-  }
+public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
 
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
+  public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
+    super(schema, pageWriteStore, props);
   }
 
   @Override
-  public void close() {
-    flush(); // calling flush() here to keep it consistent with the behavior before merging with master
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      memColumn.close();
-    }
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    return new ColumnWriterV2(path, pageWriter, props);
   }
-
-  @Override
-  public void endRecord() {
-    ++ rowCount;
-    if (rowCount >= rowCountForNextSizeCheck) {
-      sizeCheck();
-    }
-  }
-
-  private void sizeCheck() {
-    long minRecordToWait = Long.MAX_VALUE;
-    for (ColumnWriterV2 writer : writers) {
-      long usedMem = writer.getCurrentPageBufferedSize();
-      long rows = rowCount - writer.getRowsWrittenSoFar();
-      long remainingMem = props.getPageSizeThreshold() - usedMem;
-      if (remainingMem <= thresholdTolerance) {
-        writer.writePage(rowCount);
-        remainingMem = props.getPageSizeThreshold();
-      }
-      long rowsToFillPage =
-          usedMem == 0 ?
-              props.getMaxRowCountForPageSizeCheck()
-              : (long)((float)rows) / usedMem * remainingMem;
-      if (rowsToFillPage < minRecordToWait) {
-        minRecordToWait = rowsToFillPage;
-      }
-    }
-    if (minRecordToWait == Long.MAX_VALUE) {
-      minRecordToWait = props.getMinRowCountForPageSizeCheck();
-    }
-
-    if(props.estimateNextSizeCheck()) {
-      // will check again halfway if between min and max
-      rowCountForNextSizeCheck = rowCount +
-          min(
-              max(minRecordToWait / 2, props.getMinRowCountForPageSizeCheck()),
-              props.getMaxRowCountForPageSizeCheck());
-    } else {
-      rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
-    }
-  }
-
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
similarity index 67%
copy from parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
copy to parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 9abdee8..16085bb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,42 +21,39 @@ package org.apache.parquet.column.impl;
 import java.io.IOException;
 
 import org.apache.parquet.Ints;
-import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
-import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ * Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages.
  */
-final class ColumnWriterV2 implements ColumnWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV2.class);
+abstract class ColumnWriterBase implements ColumnWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterBase.class);
 
   // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
   // the java compiler (not the JIT) to remove the unused statements during build time.
   private static final boolean DEBUG = false;
 
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
-  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
+  final ColumnDescriptor path;
+  final PageWriter pageWriter;
+  private ValuesWriter repetitionLevelColumn;
+  private ValuesWriter definitionLevelColumn;
   private ValuesWriter dataColumn;
   private int valueCount;
 
   private Statistics<?> statistics;
   private long rowsWrittenSoFar = 0;
 
-  public ColumnWriterV2(
+  ColumnWriterBase(
       ColumnDescriptor path,
       PageWriter pageWriter,
       ParquetProperties props) {
@@ -64,11 +61,15 @@ final class ColumnWriterV2 implements ColumnWriter {
     this.pageWriter = pageWriter;
     resetStatistics();
 
-    this.repetitionLevelColumn = props.newRepetitionLevelEncoder(path);
-    this.definitionLevelColumn = props.newDefinitionLevelEncoder(path);
+    this.repetitionLevelColumn = createRLWriter(props, path);
+    this.definitionLevelColumn = createDLWriter(props, path);
     this.dataColumn = props.newValuesWriter(path);
   }
 
+  abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
+
+  abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
+
   private void log(Object value, int r, int d) {
     LOG.debug("{} {} r:{} d:{}", path, value, r, d);
   }
@@ -78,32 +79,27 @@ final class ColumnWriterV2 implements ColumnWriter {
   }
 
   private void definitionLevel(int definitionLevel) {
-    try {
-      definitionLevelColumn.writeInt(definitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
-    }
+    definitionLevelColumn.writeInteger(definitionLevel);
   }
 
   private void repetitionLevel(int repetitionLevel) {
-    try {
-      repetitionLevelColumn.writeInt(repetitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
-    }
+    repetitionLevelColumn.writeInteger(repetitionLevel);
   }
 
   /**
-   * writes the current null value
+   * Writes the current null value
+   *
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(null, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     statistics.incrementNumNulls();
-    ++ valueCount;
+    ++valueCount;
   }
 
   @Override
@@ -117,109 +113,128 @@ final class ColumnWriterV2 implements ColumnWriter {
   @Override
   public long getBufferedSizeInMemory() {
     return repetitionLevelColumn.getBufferedSize()
-      + definitionLevelColumn.getBufferedSize()
-      + dataColumn.getBufferedSize()
-      + pageWriter.getMemSize();
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeDouble(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeFloat(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeBytes(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeBoolean(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeInteger(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
-   * writes the current value
+   * Writes the current value
+   *
    * @param value
    * @param repetitionLevel
    * @param definitionLevel
    */
+  @Override
   public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
     repetitionLevel(repetitionLevel);
     definitionLevel(definitionLevel);
     dataColumn.writeLong(value);
     statistics.updateStats(value);
-    ++ valueCount;
+    ++valueCount;
   }
 
   /**
    * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
    * Is called right after writePage
    */
-  public void finalizeColumnChunk() {
+  void finalizeColumnChunk() {
     final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
     if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
+      if (DEBUG)
+        LOG.debug("write dictionary");
       try {
         pageWriter.writeDictionaryPage(dictionaryPage);
       } catch (IOException e) {
@@ -230,20 +245,22 @@ final class ColumnWriterV2 implements ColumnWriter {
   }
 
   /**
-   * used to decide when to write a page
+   * Used to decide when to write a page
+   *
    * @return the number of bytes of memory used to buffer the current data
    */
-  public long getCurrentPageBufferedSize() {
+  long getCurrentPageBufferedSize() {
     return repetitionLevelColumn.getBufferedSize()
         + definitionLevelColumn.getBufferedSize()
         + dataColumn.getBufferedSize();
   }
 
   /**
-   * used to decide when to write a page or row group
+   * Used to decide when to write a page or row group
+   *
    * @return the number of bytes of memory used to buffer the current data and the previously written pages
    */
-  public long getTotalBufferedSize() {
+  long getTotalBufferedSize() {
     return repetitionLevelColumn.getBufferedSize()
         + definitionLevelColumn.getBufferedSize()
         + dataColumn.getBufferedSize()
@@ -253,18 +270,19 @@ final class ColumnWriterV2 implements ColumnWriter {
   /**
    * @return actual memory used
    */
-  public long allocatedSize() {
+  long allocatedSize() {
     return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
+        + definitionLevelColumn.getAllocatedSize()
+        + dataColumn.getAllocatedSize()
+        + pageWriter.allocatedSize();
   }
 
   /**
-   * @param indent a prefix to format lines
+   * @param indent
+   *          a prefix to format lines
    * @return a formatted string showing how memory is used
    */
-  public String memUsageString(String indent) {
+  String memUsageString(String indent) {
     StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
     b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
     b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
@@ -275,32 +293,23 @@ final class ColumnWriterV2 implements ColumnWriter {
     return b.toString();
   }
 
-  public long getRowsWrittenSoFar() {
+  long getRowsWrittenSoFar() {
     return this.rowsWrittenSoFar;
   }
 
   /**
-   * writes the current data to a new page in the page store
-   * @param rowCount how many rows have been written so far
+   * Writes the current data to a new page in the page store
+   *
+   * @param rowCount
+   *          how many rows have been written so far
    */
-  public void writePage(long rowCount) {
+  void writePage(long rowCount) {
     int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
     this.rowsWrittenSoFar = rowCount;
-    if (DEBUG) LOG.debug("write page");
+    if (DEBUG)
+      LOG.debug("write page");
     try {
-      // TODO: rework this API. Those must be called *in that order*
-      BytesInput bytes = dataColumn.getBytes();
-      Encoding encoding = dataColumn.getEncoding();
-      pageWriter.writePageV2(
-          pageRowCount,
-          Ints.checkedCast(statistics.getNumNulls()),
-          valueCount,
-          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
-          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
-          encoding,
-          bytes,
-          statistics
-          );
+      writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
     } catch (IOException e) {
       throw new ParquetEncodingException("could not write page for " + path, e);
     }
@@ -310,4 +319,7 @@ final class ColumnWriterV2 implements ColumnWriter {
     valueCount = 0;
     resetStatistics();
   }
+
+  abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException;
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index c1f5d67..646e31a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -23,261 +23,40 @@ import static org.apache.parquet.bytes.BytesInput.concat;
 import java.io.IOException;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
-final class ColumnWriterV1 implements ColumnWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV1.class);
+final class ColumnWriterV1 extends ColumnWriterBase {
 
-  // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
-  // the java compiler (not the JIT) to remove the unused statements during build time.
-  private static final boolean DEBUG = false;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private final ParquetProperties props;
-
-  private ValuesWriter repetitionLevelColumn;
-  private ValuesWriter definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-  private int valueCountForNextSizeCheck;
-
-  private Statistics statistics;
-
-  public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
-                        ParquetProperties props) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    this.props = props;
-
-    // initial check of memory usage. So that we have enough data to make an initial prediction
-    this.valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-
-    resetStatistics();
-
-    this.repetitionLevelColumn = props.newRepetitionLevelWriter(path);
-    this.definitionLevelColumn = props.newDefinitionLevelWriter(path);
-    this.dataColumn = props.newValuesWriter(path);
-  }
-
-  private void log(Object value, int r, int d) {
-    if (DEBUG) LOG.debug( "{} {} r:{} d:{}", path, value, r, d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.createStats(this.path.getPrimitiveType());
-  }
-
-  /**
-   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
-   *
-   * We measure the memory used when we reach the mid point toward our estimated count.
-   * We then update the estimate and flush the page if we reached the threshold.
-   *
-   * That way we check the memory size log2(n) times.
-   *
-   */
-  private void accountForValueWritten() {
-    ++ valueCount;
-    if (valueCount > valueCountForNextSizeCheck) {
-      // not checking the memory used for every value
-      long memSize = repetitionLevelColumn.getBufferedSize()
-          + definitionLevelColumn.getBufferedSize()
-          + dataColumn.getBufferedSize();
-      if (memSize > props.getPageSizeThreshold()) {
-        // we will write the current page and check again the size at the predicted middle of next page
-        if (props.estimateNextSizeCheck()) {
-          valueCountForNextSizeCheck = valueCount / 2;
-        } else {
-          valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-        }
-        writePage();
-      } else if (props.estimateNextSizeCheck()) {
-        // not reached the threshold, will check again midway
-        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1;
-      } else {
-        valueCountForNextSizeCheck += props.getMinRowCountForPageSizeCheck();
-      }
-    }
-  }
-
-  private void updateStatisticsNumNulls() {
-    statistics.incrementNumNulls();
-  }
-
-  private void updateStatistics(int value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(long value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(float value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(double value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(Binary value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(boolean value) {
-   statistics.updateStats(value);
-  }
-
-  private void writePage() {
-    if (DEBUG) LOG.debug("write page");
-    try {
-      pageWriter.writePage(
-          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
-          valueCount,
-          statistics,
-          repetitionLevelColumn.getEncoding(),
-          definitionLevelColumn.getEncoding(),
-          dataColumn.getEncoding());
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
-  }
-
-  @Override
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    updateStatisticsNumNulls();
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeDouble(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeFloat(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBytes(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    super(path, pageWriter, props);
   }
 
   @Override
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBoolean(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return props.newRepetitionLevelWriter(path);
   }
 
   @Override
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeInteger(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return props.newDefinitionLevelWriter(path);
   }
 
   @Override
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeLong(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  public void flush() {
-    if (valueCount > 0) {
-      writePage();
-    }
-    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
-      }
-      dataColumn.resetDictionary();
-    }
-  }
-
-  @Override
-  public void close() {
-    flush();
-    // Close the Values writers.
-    repetitionLevelColumn.close();
-    definitionLevelColumn.close();
-    dataColumn.close();
-  }
-
-  @Override
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
-
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
-  }
-
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
-    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
+  void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
+    pageWriter.writePage(
+        concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
+        valueCount,
+        rowCount,
+        statistics,
+        repetitionLevels.getEncoding(),
+        definitionLevels.getEncoding(),
+        values.getEncoding());
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 9abdee8..04076c9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -23,291 +23,67 @@ import java.io.IOException;
 import org.apache.parquet.Ints;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
-final class ColumnWriterV2 implements ColumnWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV2.class);
+final class ColumnWriterV2 extends ColumnWriterBase {
 
-  // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
-  // the java compiler (not the JIT) to remove the unused statements during build time.
-  private static final boolean DEBUG = false;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
-  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-
-  private Statistics<?> statistics;
-  private long rowsWrittenSoFar = 0;
-
-  public ColumnWriterV2(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      ParquetProperties props) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    resetStatistics();
-
-    this.repetitionLevelColumn = props.newRepetitionLevelEncoder(path);
-    this.definitionLevelColumn = props.newDefinitionLevelEncoder(path);
-    this.dataColumn = props.newValuesWriter(path);
-  }
-
-  private void log(Object value, int r, int d) {
-    LOG.debug("{} {} r:{} d:{}", path, value, r, d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.createStats(path.getPrimitiveType());
-  }
-
-  private void definitionLevel(int definitionLevel) {
-    try {
-      definitionLevelColumn.writeInt(definitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+  // Extending the original implementation to not to write the size of the data as the original writer would
+  private static class RLEWriterForV2 extends RunLengthBitPackingHybridValuesWriter {
+    public RLEWriterForV2(RunLengthBitPackingHybridEncoder encoder) {
+      super(encoder);
     }
-  }
 
-  private void repetitionLevel(int repetitionLevel) {
-    try {
-      repetitionLevelColumn.writeInt(repetitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
-    }
-  }
-
-  /**
-   * writes the current null value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    statistics.incrementNumNulls();
-    ++ valueCount;
-  }
-
-  @Override
-  public void close() {
-    // Close the Values writers.
-    repetitionLevelColumn.close();
-    definitionLevelColumn.close();
-    dataColumn.close();
-  }
-
-  @Override
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-      + definitionLevelColumn.getBufferedSize()
-      + dataColumn.getBufferedSize()
-      + pageWriter.getMemSize();
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeDouble(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeFloat(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBytes(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBoolean(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeInteger(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeLong(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
-   * Is called right after writePage
-   */
-  public void finalizeColumnChunk() {
-    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
+    @Override
+    public BytesInput getBytes() {
       try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
+        return encoder.toBytes();
       } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+        throw new ParquetEncodingException(e);
       }
-      dataColumn.resetDictionary();
     }
   }
 
-  /**
-   * used to decide when to write a page
-   * @return the number of bytes of memory used to buffer the current data
-   */
-  public long getCurrentPageBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize();
-  }
-
-  /**
-   * used to decide when to write a page or row group
-   * @return the number of bytes of memory used to buffer the current data and the previously written pages
-   */
-  public long getTotalBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
+  private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
 
-  /**
-   * @return actual memory used
-   */
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
+  ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    super(path, pageWriter, props);
   }
 
-  /**
-   * @param indent a prefix to format lines
-   * @return a formatted string showing how memory is used
-   */
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
+  @Override
+  ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
   }
 
-  public long getRowsWrittenSoFar() {
-    return this.rowsWrittenSoFar;
+  @Override
+  ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return path.getMaxDefinitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newDefinitionLevelEncoder(path));
   }
 
-  /**
-   * writes the current data to a new page in the page store
-   * @param rowCount how many rows have been written so far
-   */
-  public void writePage(long rowCount) {
-    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
-    this.rowsWrittenSoFar = rowCount;
-    if (DEBUG) LOG.debug("write page");
-    try {
-      // TODO: rework this API. Those must be called *in that order*
-      BytesInput bytes = dataColumn.getBytes();
-      Encoding encoding = dataColumn.getEncoding();
-      pageWriter.writePageV2(
-          pageRowCount,
-          Ints.checkedCast(statistics.getNumNulls()),
-          valueCount,
-          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
-          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
-          encoding,
-          bytes,
-          statistics
-          );
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
+  @Override
+  void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
+    // TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different otherwise)
+    BytesInput bytes = values.getBytes();
+    Encoding encoding = values.getEncoding();
+    pageWriter.writePageV2(
+        rowCount,
+        Ints.checkedCast(statistics.getNumNulls()),
+        valueCount,
+        repetitionLevels.getBytes(),
+        definitionLevels.getBytes(),
+        encoding,
+        bytes,
+        statistics);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
index a2d079f..a72be48 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
@@ -20,7 +20,6 @@ package org.apache.parquet.column.page;
 
 import java.io.IOException;
 
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.statistics.Statistics;
@@ -39,10 +38,26 @@ public interface PageWriter {
    * @param dlEncoding definition level encoding
    * @param valuesEncoding values encoding
    * @throws IOException if there is an exception while writing page data
+   * @deprecated will be removed in 2.0.0. This method does not support writing column indexes; Use
+   *             {@link #writePage(BytesInput, int, int, Statistics, Encoding, Encoding, Encoding)} instead
    */
+  @Deprecated
   void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
 
   /**
+   * writes a single page
+   * @param bytesInput the bytes for the page
+   * @param valueCount the number of values in that page
+   * @param rowCount the number of rows in that page
+   * @param statistics the statistics for that page
+   * @param rlEncoding repetition level encoding
+   * @param dlEncoding definition level encoding
+   * @param valuesEncoding values encoding
+   * @throws IOException
+   */
+  void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+
+  /**
    * writes a single page in the new format
    * @param rowCount the number of rows in this page
    * @param nullCount the number of null values (out of valueCount)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index 3b7a5de..a51a8c4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
+import java.util.Objects;
 
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Ints;
@@ -28,10 +29,14 @@ import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 
 public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
-  private final RunLengthBitPackingHybridEncoder encoder;
+  protected final RunLengthBitPackingHybridEncoder encoder;
 
   public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
-    this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator);
+    this(new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator));
+  }
+
+  protected RunLengthBitPackingHybridValuesWriter(RunLengthBitPackingHybridEncoder encoder) {
+    this.encoder = Objects.requireNonNull(encoder);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
new file mode 100644
index 0000000..c352516
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class BinaryColumnIndex extends ColumnIndexBase {
+    private Binary[] minValues;
+    private Binary[] maxValues;
+
+    private BinaryColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final List<Binary> minValues = new ArrayList<>();
+  private final List<Binary> maxValues = new ArrayList<>();
+
+  private static Binary convert(ByteBuffer buffer) {
+    return Binary.fromReusedByteBuffer(buffer);
+  }
+
+  private static ByteBuffer convert(Binary value) {
+    return value.toByteBuffer();
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? null : convert(min));
+    maxValues.add(max == null ? null : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add((Binary) min);
+    maxValues.add((Binary) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
+    columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
+    columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
new file mode 100644
index 0000000..9a4ea89
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import it.unimi.dsi.fastutil.booleans.BooleanList;
+
+class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class BooleanColumnIndex extends ColumnIndexBase {
+    private boolean[] minValues;
+    private boolean[] maxValues;
+
+    private BooleanColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final BooleanList minValues = new BooleanArrayList();
+  private final BooleanList maxValues = new BooleanArrayList();
+
+  private static boolean convert(ByteBuffer buffer) {
+    return buffer.get(0) != 0;
+  }
+
+  private static ByteBuffer convert(boolean value) {
+    return ByteBuffer.allocate(1).put(0, value ? (byte) 1 : 0);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? false : convert(min));
+    maxValues.add(max == null ? false : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? false : (boolean) min);
+    maxValues.add(max == null ? false : (boolean) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    BooleanColumnIndex columnIndex = new BooleanColumnIndex(type);
+    columnIndex.minValues = minValues.toBooleanArray();
+    columnIndex.maxValues = maxValues.toBooleanArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
new file mode 100644
index 0000000..5d82815
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+/**
+ * Enum for {@link org.apache.parquet.format.BoundaryOrder}.
+ */
+public enum BoundaryOrder {
+  UNORDERED, ASCENDING, DESCENDING;
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
new file mode 100644
index 0000000..f7bd16b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Column index containing min/max and null count values for the pages in a column chunk.
+ *
+ * @see org.apache.parquet.format.ColumnIndex
+ */
+public interface ColumnIndex {
+  /**
+   * @return the boundary order of the min/max values; used for converting to the related thrift object
+   */
+  public BoundaryOrder getBoundaryOrder();
+
+  /**
+   * @return the unmodifiable list of null counts; used for converting to the related thrift object
+   */
+  public List<Long> getNullCounts();
+
+  /**
+   * @return the unmodifiable list of null pages; used for converting to the related thrift object
+   */
+  public List<Boolean> getNullPages();
+
+  /**
+   * @return the list of the min values as {@link ByteBuffer}s; used for converting to the related thrift object
+   */
+  public List<ByteBuffer> getMinValues();
+
+  /**
+   * @return the list of the max values as {@link ByteBuffer}s; used for converting to the related thrift object
+   */
+  public List<ByteBuffer> getMaxValues();
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
new file mode 100644
index 0000000..6edd753
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.util.Objects.requireNonNull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import it.unimi.dsi.fastutil.booleans.BooleanList;
+import it.unimi.dsi.fastutil.booleans.BooleanLists;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import it.unimi.dsi.fastutil.longs.LongLists;
+
+/**
+ * Builder implementation to create {@link ColumnIndex} objects during writing a parquet file.
+ */
+public abstract class ColumnIndexBuilder {
+
+  static abstract class ColumnIndexBase implements ColumnIndex {
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+    private static final int MAX_VALUE_LENGTH_FOR_TOSTRING = 40;
+    private static final String TOSTRING_TRUNCATION_MARKER = "(...)";
+    private static final int TOSTRING_TRUNCATION_START_POS =
+        (MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length()) / 2;
+    private static final int TOSTRING_TRUNCATION_END_POS =
+        MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
+    private static final String TOSTRING_MISSING_VALUE_MARKER = "<none>";
+
+    final PrimitiveStringifier stringifier;
+    final PrimitiveComparator<Binary> comparator;
+    private boolean[] nullPages;
+    private BoundaryOrder boundaryOrder;
+    // might be null
+    private long[] nullCounts;
+
+    static String truncate(String str) {
+      if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) {
+        return str;
+      }
+      return str.substring(0, TOSTRING_TRUNCATION_START_POS) + TOSTRING_TRUNCATION_MARKER
+          + str.substring(str.length() - TOSTRING_TRUNCATION_END_POS);
+    }
+
+    ColumnIndexBase(PrimitiveType type) {
+      comparator = type.comparator();
+      stringifier = type.stringifier();
+    }
+
+    @Override
+    public BoundaryOrder getBoundaryOrder() {
+      return boundaryOrder;
+    }
+
+    @Override
+    public List<Long> getNullCounts() {
+      if (nullCounts == null) {
+        return null;
+      }
+      return LongLists.unmodifiable(LongArrayList.wrap(nullCounts));
+    }
+
+    @Override
+    public List<Boolean> getNullPages() {
+      return BooleanLists.unmodifiable(BooleanArrayList.wrap(nullPages));
+    }
+
+    @Override
+    public List<ByteBuffer> getMinValues() {
+      List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      for (int i = 0, n = getPageCount(); i < n; ++i) {
+        if (isNullPage(i)) {
+          list.add(EMPTY_BYTE_BUFFER);
+        } else {
+          list.add(getMinValueAsBytes(i));
+        }
+      }
+      return list;
+    }
+
+    @Override
+    public List<ByteBuffer> getMaxValues() {
+      List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      for (int i = 0, n = getPageCount(); i < n; ++i) {
+        if (isNullPage(i)) {
+          list.add(EMPTY_BYTE_BUFFER);
+        } else {
+          list.add(getMaxValueAsBytes(i));
+        }
+      }
+      return list;
+    }
+
+    @Override
+    public String toString() {
+      try (Formatter formatter = new Formatter()) {
+        formatter.format("Boudary order: %s\n", boundaryOrder);
+        String minMaxPart = "  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s\n";
+        formatter.format("%-10s  %20s" + minMaxPart, "", "null count", "min", "max");
+        String format = "page-%-5d  %20s" + minMaxPart;
+        for (int i = 0, n = nullPages.length; i < n; ++i) {
+          String nullCount = nullCounts == null ? TOSTRING_MISSING_VALUE_MARKER : Long.toString(nullCounts[i]);
+          String min, max;
+          if (nullPages[i]) {
+            min = max = TOSTRING_MISSING_VALUE_MARKER;
+          } else {
+            min = truncate(getMinValueAsString(i));
+            max = truncate(getMaxValueAsString(i));
+          }
+          formatter.format(format, i, nullCount, min, max);
+        }
+        return formatter.toString();
+      }
+    }
+
+    int getPageCount() {
+      return nullPages.length;
+    }
+
+    boolean isNullPage(int pageIndex) {
+      return nullPages[pageIndex];
+    }
+
+    abstract ByteBuffer getMinValueAsBytes(int pageIndex);
+
+    abstract ByteBuffer getMaxValueAsBytes(int pageIndex);
+
+    abstract String getMinValueAsString(int pageIndex);
+
+    abstract String getMaxValueAsString(int pageIndex);
+  }
+
+  private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
+    @Override
+    public ColumnIndex build() {
+      return null;
+    }
+
+    @Override
+    public void add(Statistics<?> stats) {
+    }
+
+    @Override
+    void addMinMax(Object min, Object max) {
+    }
+
+    @Override
+    ColumnIndexBase createColumnIndex(PrimitiveType type) {
+      return null;
+    }
+
+    @Override
+    void clearMinMax() {
+    }
+
+    @Override
+    void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    }
+
+    @Override
+    int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+      return 0;
+    }
+
+    @Override
+    int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+      return 0;
+    }
+  };
+
+  private static final Map<PrimitiveTypeName, ColumnIndexBuilder> BUILDERS = new EnumMap<>(PrimitiveTypeName.class);
+
+  private PrimitiveType type;
+  private final BooleanList nullPages = new BooleanArrayList();
+  private final LongList nullCounts = new LongArrayList();
+
+  /**
+   * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
+   *         {@link #build()}.
+   */
+  public static ColumnIndexBuilder getNoOpBuilder() {
+    return NO_OP_BUILDER;
+  }
+
+  /**
+   * @param type
+   *          the type this builder is to be created for
+   * @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
+   */
+  public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
+    ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
+    builder.type = type;
+    return builder;
+  }
+
+  private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
+    switch (type) {
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+      case INT96:
+        return new BinaryColumnIndexBuilder();
+      case BOOLEAN:
+        return new BooleanColumnIndexBuilder();
+      case DOUBLE:
+        return new DoubleColumnIndexBuilder();
+      case FLOAT:
+        return new FloatColumnIndexBuilder();
+      case INT32:
+        return new IntColumnIndexBuilder();
+      case INT64:
+        return new LongColumnIndexBuilder();
+      default:
+        throw new IllegalArgumentException("Unsupported type for column index: " + type);
+    }
+  }
+
+  /**
+   * @param type
+   *          the primitive type
+   * @param boundaryOrder
+   *          the boundary order of the min/max values
+   * @param nullPages
+   *          the null pages (one boolean value for each page that signifies whether the page consists of nulls
+   *          entirely)
+   * @param nullCounts
+   *          the number of null values for each page
+   * @param minValues
+   *          the min values for each page
+   * @param maxValues
+   *          the max values for each page
+   * @return the newly created {@link ColumnIndex} object based on the specified arguments
+   */
+  public static ColumnIndex build(
+      PrimitiveType type,
+      BoundaryOrder boundaryOrder,
+      List<Boolean> nullPages,
+      List<Long> nullCounts,
+      List<ByteBuffer> minValues,
+      List<ByteBuffer> maxValues) {
+
+    PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+    ColumnIndexBuilder builder = BUILDERS.get(typeName);
+    if (builder == null) {
+      builder = createNewBuilder(typeName);
+      BUILDERS.put(typeName, builder);
+    }
+
+    builder.fill(nullPages, nullCounts, minValues, maxValues);
+    ColumnIndexBase columnIndex = builder.build(type);
+    columnIndex.boundaryOrder = requireNonNull(boundaryOrder);
+    return columnIndex;
+  }
+
+  ColumnIndexBuilder() {
+    // Shall be able to be created inside this package only
+  }
+
+  /**
+   * Adds the data from the specified statistics to this builder
+   *
+   * @param stats
+   *          the statistics to be added
+   */
+  public void add(Statistics<?> stats) {
+    if (stats.hasNonNullValue()) {
+      nullPages.add(false);
+      addMinMax(stats.genericGetMin(), stats.genericGetMax());
+    } else {
+      nullPages.add(true);
+      addMinMax(null, null);
+    }
+    nullCounts.add(stats.getNumNulls());
+  }
+
+  abstract void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max);
+
+  abstract void addMinMax(Object min, Object max);
+
+  private void fill(List<Boolean> nullPages, List<Long> nullCounts, List<ByteBuffer> minValues,
+      List<ByteBuffer> maxValues) {
+    clear();
+    int requiredSize = nullPages.size();
+    if ((nullCounts != null && nullCounts.size() != requiredSize) || minValues.size() != requiredSize
+        || maxValues.size() != requiredSize) {
+      throw new IllegalArgumentException(
+          String.format("Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d",
+              nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), minValues.size(), maxValues.size()));
+    }
+    this.nullPages.addAll(nullPages);
+    // Null counts is optional in the format
+    if (nullCounts != null) {
+      this.nullCounts.addAll(nullCounts);
+    }
+
+    for (int i = 0; i < requiredSize; ++i) {
+      if (nullPages.get(i)) {
+        addMinMaxFromBytes(null, null);
+      } else {
+        addMinMaxFromBytes(minValues.get(i), maxValues.get(i));
+      }
+    }
+  }
+
+  /**
+   * @return the newly created column index or {@code null} if the {@link ColumnIndex} would be empty
+   */
+  public ColumnIndex build() {
+    ColumnIndexBase columnIndex = build(type);
+    if (columnIndex == null) {
+      return null;
+    }
+    columnIndex.boundaryOrder = calculateBoundaryOrder(type.comparator());
+    return columnIndex;
+  }
+
+  private ColumnIndexBase build(PrimitiveType type) {
+    if (nullPages.isEmpty()) {
+      return null;
+    }
+    ColumnIndexBase columnIndex = createColumnIndex(type);
+    columnIndex.nullPages = nullPages.toBooleanArray();
+    // Null counts is optional so keep it null if the builder has no values
+    if (!nullCounts.isEmpty()) {
+      columnIndex.nullCounts = nullCounts.toLongArray();
+    }
+
+    return columnIndex;
+  }
+
+  private BoundaryOrder calculateBoundaryOrder(PrimitiveComparator<Binary> comparator) {
+    if (isAscending(comparator)) {
+      return BoundaryOrder.ASCENDING;
+    } else if (isDescending(comparator)) {
+      return BoundaryOrder.DESCENDING;
+    } else {
+      return BoundaryOrder.UNORDERED;
+    }
+  }
+
+  // min[i] <= min[i+1] && max[i] <= max[i+1]
+  private boolean isAscending(PrimitiveComparator<Binary> comparator) {
+    int prevPage = nextNonNullPage(0);
+    // All pages are null-page
+    if (prevPage < 0) {
+      return false;
+    }
+    int nextPage = nextNonNullPage(prevPage + 1);
+    while (nextPage > 0) {
+      if (compareMinValues(comparator, prevPage, nextPage) > 0
+          || compareMaxValues(comparator, prevPage, nextPage) > 0) {
+        return false;
+      }
+      prevPage = nextPage;
+      nextPage = nextNonNullPage(nextPage + 1);
+    }
+    return true;
+  }
+
+  // min[i] >= min[i+1] && max[i] >= max[i+1]
+  private boolean isDescending(PrimitiveComparator<Binary> comparator) {
+    int prevPage = nextNonNullPage(0);
+    // All pages are null-page
+    if (prevPage < 0) {
+      return false;
+    }
+    int nextPage = nextNonNullPage(prevPage + 1);
+    while (nextPage > 0) {
+      if (compareMinValues(comparator, prevPage, nextPage) < 0
+          || compareMaxValues(comparator, prevPage, nextPage) < 0) {
+        return false;
+      }
+      prevPage = nextPage;
+      nextPage = nextNonNullPage(nextPage + 1);
+    }
+    return true;
+  }
+
+  private int nextNonNullPage(int startIndex) {
+    for (int i = startIndex, n = nullPages.size(); i < n; ++i) {
+      if (!nullPages.get(i)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  abstract int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
+
+  abstract int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
+
+  private void clear() {
+    nullPages.clear();
+    nullCounts.clear();
+    clearMinMax();
+  }
+
+  abstract void clearMinMax();
+
+  abstract ColumnIndexBase createColumnIndex(PrimitiveType type);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
new file mode 100644
index 0000000..249652a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleList;
+
+class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class DoubleColumnIndex extends ColumnIndexBase {
+    private double[] minValues;
+    private double[] maxValues;
+
+    private DoubleColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final DoubleList minValues = new DoubleArrayList();
+  private final DoubleList maxValues = new DoubleArrayList();
+
+  private static double convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getDouble(0);
+  }
+
+  private static ByteBuffer convert(double value) {
+    return ByteBuffer.allocate(Double.SIZE / 8).order(LITTLE_ENDIAN).putDouble(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (double) min);
+    maxValues.add(max == null ? 0 : (double) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    DoubleColumnIndex columnIndex = new DoubleColumnIndex(type);
+    columnIndex.minValues = minValues.toDoubleArray();
+    columnIndex.maxValues = maxValues.toDoubleArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
new file mode 100644
index 0000000..24c911f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.floats.FloatList;
+
+class FloatColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class FloatColumnIndex extends ColumnIndexBase {
+    private float[] minValues;
+    private float[] maxValues;
+
+    private FloatColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final FloatList minValues = new FloatArrayList();
+  private final FloatList maxValues = new FloatArrayList();
+
+  private static float convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getFloat(0);
+  }
+
+  private static ByteBuffer convert(float value) {
+    return ByteBuffer.allocate(Float.SIZE / 8).order(LITTLE_ENDIAN).putFloat(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (float) min);
+    maxValues.add(max == null ? 0 : (float) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    FloatColumnIndex columnIndex = new FloatColumnIndex(type);
+    columnIndex.minValues = minValues.toFloatArray();
+    columnIndex.maxValues = maxValues.toFloatArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
new file mode 100644
index 0000000..e4a117c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+class IntColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class IntColumnIndex extends ColumnIndexBase {
+    private int[] minValues;
+    private int[] maxValues;
+
+    private IntColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final IntList minValues = new IntArrayList();
+  private final IntList maxValues = new IntArrayList();
+
+  private static int convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getInt(0);
+  }
+
+  private static ByteBuffer convert(int value) {
+    return ByteBuffer.allocate(Integer.SIZE / 8).order(LITTLE_ENDIAN).putInt(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (int) min);
+    maxValues.add(max == null ? 0 : (int) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    IntColumnIndex columnIndex = new IntColumnIndex(type);
+    columnIndex.minValues = minValues.toIntArray();
+    columnIndex.maxValues = maxValues.toIntArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
new file mode 100644
index 0000000..94e7e0f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+class LongColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class LongColumnIndex extends ColumnIndexBase {
+    private long[] minValues;
+    private long[] maxValues;
+
+    private LongColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final LongList minValues = new LongArrayList();
+  private final LongList maxValues = new LongArrayList();
+
+  private static long convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getLong(0);
+  }
+
+  private static ByteBuffer convert(long value) {
+    return ByteBuffer.allocate(Long.SIZE / 8).order(LITTLE_ENDIAN).putLong(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (long) min);
+    maxValues.add(max == null ? 0 : (long) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    LongColumnIndex columnIndex = new LongColumnIndex(type);
+    columnIndex.minValues = minValues.toLongArray();
+    columnIndex.maxValues = maxValues.toLongArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
new file mode 100644
index 0000000..fd99ef3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+/**
+ * Offset index containing the offset and size of the page and the index of the first row in the page.
+ *
+ * @see org.apache.parquet.format.OffsetIndex
+ */
+public interface OffsetIndex {
+  /**
+   * @return the number of pages
+   */
+  public int getPageCount();
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the offset of the page in the file
+   */
+  public long getOffset(int pageIndex);
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the compressed size of the page (including page header)
+   */
+  public int getCompressedPageSize(int pageIndex);
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the index of the first row in the page
+   */
+  public long getFirstRowIndex(int pageIndex);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
new file mode 100644
index 0000000..e4907b5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.util.Formatter;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Builder implementation to create {@link OffsetIndex} objects during writing a parquet file.
+ */
+public class OffsetIndexBuilder {
+
+  private static class OffsetIndexImpl implements OffsetIndex {
+    private long[] offsets;
+    private int[] compressedPageSizes;
+    private long[] firstRowIndexes;
+
+    @Override
+    public String toString() {
+      try (Formatter formatter = new Formatter()) {
+        formatter.format("%-10s  %20s  %16s  %20s\n", "", "offset", "compressed size", "first row index");
+        for (int i = 0, n = offsets.length; i < n; ++i) {
+          formatter.format("page-%-5d  %20d  %16d  %20d\n", i, offsets[i], compressedPageSizes[i], firstRowIndexes[i]);
+        }
+        return formatter.toString();
+      }
+    }
+
+    @Override
+    public int getPageCount() {
+      return offsets.length;
+    }
+
+    @Override
+    public long getOffset(int pageIndex) {
+      return offsets[pageIndex];
+    }
+
+    @Override
+    public int getCompressedPageSize(int pageIndex) {
+      return compressedPageSizes[pageIndex];
+    }
+
+    @Override
+    public long getFirstRowIndex(int pageIndex) {
+      return firstRowIndexes[pageIndex];
+    }
+  }
+
+  private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() {
+    @Override
+    public void add(int compressedPageSize, long rowCount) {
+    }
+
+    @Override
+    public void add(long offset, int compressedPageSize, long rowCount) {
+    }
+  };
+
+  private final LongList offsets = new LongArrayList();
+  private final IntList compressedPageSizes = new IntArrayList();
+  private final LongList firstRowIndexes = new LongArrayList();
+  private long previousOffset;
+  private int previousPageSize;
+  private long previousRowIndex;
+  private long previousRowCount;
+
+  /**
+   * @return a no-op builder that does not collect values and therefore returns {@code null} at {@link #build(long)}
+   */
+  public static OffsetIndexBuilder getNoOpBuilder() {
+    return NO_OP_BUILDER;
+  }
+
+  /**
+   * @return an {@link OffsetIndexBuilder} instance to build an {@link OffsetIndex} object
+   */
+  public static OffsetIndexBuilder getBuilder() {
+    return new OffsetIndexBuilder();
+  }
+
+  private OffsetIndexBuilder() {
+  }
+
+  /**
+   * Adds the specified parameters to this builder. Used by the writers to building up {@link OffsetIndex} objects to be
+   * written to the Parquet file.
+   *
+   * @param compressedPageSize
+   *          the size of the page (including header)
+   * @param rowCount
+   *          the number of rows in the page
+   */
+  public void add(int compressedPageSize, long rowCount) {
+    add(previousOffset + previousPageSize, compressedPageSize, previousRowIndex + previousRowCount);
+    previousRowCount = rowCount;
+  }
+
+  /**
+   * Adds the specified parameters to this builder. Used by the metadata converter to building up {@link OffsetIndex}
+   * objects read from the Parquet file.
+   *
+   * @param offset
+   *          the offset of the page in the file
+   * @param compressedPageSize
+   *          the size of the page (including header)
+   * @param firstRowIndex
+   *          the index of the first row in the page (within the row group)
+   */
+  public void add(long offset, int compressedPageSize, long firstRowIndex) {
+    previousOffset = offset;
+    offsets.add(offset);
+    previousPageSize = compressedPageSize;
+    compressedPageSizes.add(compressedPageSize);
+    previousRowIndex = firstRowIndex;
+    firstRowIndexes.add(firstRowIndex);
+  }
+
+  /**
+   * Builds the offset index. Used by the metadata converter to building up {@link OffsetIndex}
+   * objects read from the Parquet file.
+   *
+   * @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
+   */
+  public OffsetIndex build() {
+    return build(0);
+  }
+
+  /**
+   * Builds the offset index. Used by the writers to building up {@link OffsetIndex} objects to be
+   * written to the Parquet file.
+   *
+   * @param firstPageOffset
+   *          the actual offset in the file to be used to translate all the collected offsets
+   * @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
+   */
+  public OffsetIndex build(long firstPageOffset) {
+    if (compressedPageSizes.isEmpty()) {
+      return null;
+    }
+    long[] offsets = this.offsets.toLongArray();
+    if (firstPageOffset != 0) {
+      for (int i = 0, n = offsets.length; i < n; ++i) {
+        offsets[i] += firstPageOffset;
+      }
+    }
+    OffsetIndexImpl offsetIndex = new OffsetIndexImpl();
+    offsetIndex.offsets = offsets;
+    offsetIndex.compressedPageSizes = compressedPageSizes.toIntArray();
+    offsetIndex.firstRowIndexes = firstRowIndexes.toLongArray();
+
+    return offsetIndex;
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index c855339..c28649e 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -20,12 +20,10 @@ package org.apache.parquet.column.mem;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.parquet.column.ParquetProperties;
-import org.junit.Test;
-
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
 import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -33,6 +31,7 @@ import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,7 @@ public class TestMemColumn {
     ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(42l, 0, 0);
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
@@ -85,6 +85,7 @@ public class TestMemColumn {
 
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(Binary.fromString("42"), 0, 0);
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
@@ -108,6 +109,7 @@ public class TestMemColumn {
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     for (int i = 0; i < 2000; i++) {
       columnWriter.write(42l, 0, 0);
+      memColumnsStore.endRecord();
     }
     memColumnsStore.flush();
 
@@ -141,6 +143,7 @@ public class TestMemColumn {
       } else {
         columnWriter.writeNull(r, d);
       }
+      memColumnsStore.endRecord();
     }
     memColumnsStore.flush();
 
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
index be3a0f9..706b001 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
@@ -57,6 +57,12 @@ public class MemPageWriter implements PageWriter {
   }
 
   @Override
+  public void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics,
+      Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+    writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding);
+  }
+
+  @Override
   public void writePageV2(int rowCount, int nullCount, int valueCount,
       BytesInput repetitionLevels, BytesInput definitionLevels,
       Encoding dataEncoding, BytesInput data, Statistics<?> statistics) throws IOException {
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
new file mode 100644
index 0000000..f1706a1
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.OriginalType.DECIMAL;
+import static org.apache.parquet.schema.OriginalType.UINT_8;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.column.columnindex.BinaryColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.BooleanColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.DoubleColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.FloatColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.IntColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.LongColumnIndexBuilder;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ColumnIndexBuilder}.
+ */
+public class TestColumnIndexBuilder {
+
+  @Test
+  public void testBuildBinaryDecimal() {
+    PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("-0.17"), decimalBinary("1234567890.12")));
+    builder.add(stats(type, decimalBinary("-234.23"), null, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, decimalBinary("-9999293.23"), decimalBinary("2348978.45")));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("87656273")));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 3, 0, 4, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        decimalBinary("1234567890.12"),
+        decimalBinary("-234.23"),
+        null,
+        decimalBinary("2348978.45"),
+        null,
+        null,
+        decimalBinary("87656273"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        decimalBinary("-0.17"),
+        decimalBinary("-234.23"),
+        null,
+        decimalBinary("-9999293.23"),
+        null,
+        null,
+        decimalBinary("87656273"));
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23")));
+    builder.add(stats(type, decimalBinary("-0.17"), decimalBinary("87656273")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("87656273")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), null, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 4, 0, 0, 2, 0, 2, 3, 3);
+    assertCorrectNullPages(columnIndex, true, false, false, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        decimalBinary("-234.23"),
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("1234567890.12"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        decimalBinary("-9999293.23"),
+        decimalBinary("-0.17"),
+        null,
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("1234567890.12"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), decimalBinary("87656273")));
+    builder.add(stats(type, decimalBinary("987656273"), decimalBinary("-0.17")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("-234.23"), decimalBinary("-9999293.23")));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 3, 2, 3, 4, 0, 0, 2, 0);
+    assertCorrectNullPages(columnIndex, true, true, false, true, false, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        null,
+        decimalBinary("1234567890.12"),
+        null,
+        decimalBinary("1234567890.12"),
+        decimalBinary("987656273"),
+        null,
+        decimalBinary("-234.23"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        null,
+        decimalBinary("1234567890.12"),
+        null,
+        decimalBinary("87656273"),
+        decimalBinary("-0.17"),
+        null,
+        decimalBinary("-9999293.23"));
+  }
+
+  @Test
+  public void testBuildBinaryUtf8() {
+    PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Jeltz"), stringBinary("Slartibartfast"), null, null));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Beeblebrox"), stringBinary("Prefect")));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Trilian"), null));
+    builder.add(stats(type, stringBinary("Beeblebrox")));
+    builder.add(stats(type, null, null));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 5, 2, 0, 1, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        null,
+        stringBinary("Prefect"),
+        stringBinary("Trilian"),
+        stringBinary("Beeblebrox"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        stringBinary("Jeltz"),
+        null,
+        null,
+        stringBinary("Beeblebrox"),
+        stringBinary("Dent"),
+        stringBinary("Beeblebrox"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Jeltz")));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Prefect"), null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Slartibartfast")));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 5, 0, 1, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, false, true, true, false, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Jeltz"),
+        stringBinary("Prefect"),
+        null,
+        stringBinary("Slartibartfast"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        stringBinary("Beeblebrox"),
+        null,
+        null,
+        stringBinary("Dent"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Slartibartfast"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Slartibartfast")));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, stringBinary("Prefect"), stringBinary("Jeltz"), null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Dent")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Beeblebrox"), null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 5, 1, 0, 2, 2, 2);
+    assertCorrectNullPages(columnIndex, true, false, true, false, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        stringBinary("Prefect"),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Dent"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        stringBinary("Jeltz"),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Beeblebrox"));
+  }
+
+  @Test
+  public void testStaticBuildBinary() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
+        BoundaryOrder.ASCENDING,
+        asList(true, true, false, false, true, false, true, false),
+        asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l),
+        toBBList(
+            null,
+            null,
+            stringBinary("Beeblebrox"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Jeltz"),
+            null,
+            stringBinary("Slartibartfast")),
+        toBBList(
+            null,
+            null,
+            stringBinary("Dent"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Prefect"),
+            null,
+            stringBinary("Slartibartfast")));
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectNullPages(columnIndex, true, true, false, false, true, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        null,
+        stringBinary("Dent"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Prefect"),
+        null,
+        stringBinary("Slartibartfast"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        null,
+        stringBinary("Beeblebrox"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Jeltz"),
+        null,
+        stringBinary("Slartibartfast"));
+  }
+
+  @Test
+  public void testBuildBoolean() {
+    PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, false, true));
+    builder.add(stats(type, true, false, null));
+    builder.add(stats(type, true, true, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, false, false));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
+    assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, false, false));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, false, true, null));
+    builder.add(stats(type, false, true, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 4, 1, 2, 3);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, true, true));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, true, false, null));
+    builder.add(stats(type, false, false, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 4, 1, 2, 3);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, true, null, null, true, false, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, false, false, null);
+  }
+
+  @Test
+  public void testStaticBuildBoolean() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(BOOLEAN).named("test_boolean"),
+        BoundaryOrder.DESCENDING,
+        asList(false, true, false, true, false, true),
+        asList(9l, 8l, 7l, 6l, 5l, 0l),
+        toBBList(false, null, false, null, true, null),
+        toBBList(true, null, false, null, true, null));
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 9, 8, 7, 6, 5, 0);
+    assertCorrectNullPages(columnIndex, false, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), true, null, false, null, true, null);
+    assertCorrectValues(columnIndex.getMinValues(), false, null, false, null, true, null);
+  }
+
+  @Test
+  public void testBuildDouble() {
+    PrimitiveType type = Types.required(DOUBLE).named("test_double");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4.2, -4.1));
+    builder.add(stats(type, -11.7, 7.0, null));
+    builder.add(stats(type, 2.2, 2.2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1.9, 2.32));
+    builder.add(stats(type, -21.0, 8.1));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
+    assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532.3, -345.2, null, null));
+    builder.add(stats(type, -234.7, -234.6, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -234.6, 2.99999));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3.0, 42.83));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532.3, 345.2));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234.7, 234.6, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 234.69, -2.99999));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3.0, -42.83));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532.3, null, 234.7, null, 234.69, null, null, -3.0);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, null, -2.99999, null, null, -42.83);
+  }
+
+  @Test
+  public void testStaticBuildDouble() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(DOUBLE).named("test_double"),
+        BoundaryOrder.UNORDERED,
+        asList(false, false, false, false, false, false),
+        asList(0l, 1l, 2l, 3l, 4l, 5l),
+        toBBList(-1.0, -2.0, -3.0, -4.0, -5.0, -6.0),
+        toBBList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0));
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 4, 5);
+    assertCorrectNullPages(columnIndex, false, false, false, false, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 1.0, 2.0, 3.0, 4.0, 5.0, 6.0);
+    assertCorrectValues(columnIndex.getMinValues(), -1.0, -2.0, -3.0, -4.0, -5.0, -6.0);
+  }
+
+  @Test
+  public void testBuildFloat() {
+    PrimitiveType type = Types.required(FLOAT).named("test_float");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4.2f, -4.1f));
+    builder.add(stats(type, -11.7f, 7.0f, null));
+    builder.add(stats(type, 2.2f, 2.2f, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1.9f, 2.32f));
+    builder.add(stats(type, -21.0f, 8.1f));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
+    assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532.3f, -345.2f, null, null));
+    builder.add(stats(type, -300.6f, -234.7f, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -234.6f, 2.99999f));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3.0f, 42.83f));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532.3f, 345.2f));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234.7f, 234.6f, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 234.6f, -2.99999f));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3.0f, -42.83f));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532.3f, null, 234.7f, null, 234.6f, null, null, -3.0f);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 234.6f, null, -2.99999f, null, null, -42.83f);
+  }
+
+  @Test
+  public void testStaticBuildFloat() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(FLOAT).named("test_float"),
+        BoundaryOrder.ASCENDING,
+        asList(true, true, true, false, false, false),
+        asList(9l, 8l, 7l, 6l, 0l, 0l),
+        toBBList(null, null, null, -3.0f, -2.0f, 0.1f),
+        toBBList(null, null, null, -2.0f, 0.0f, 6.0f));
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 9, 8, 7, 6, 0, 0);
+    assertCorrectNullPages(columnIndex, true, true, true, false, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, null, null, -2.0f, 0.0f, 6.0f);
+    assertCorrectValues(columnIndex.getMinValues(), null, null, null, -3.0f, -2.0f, 0.1f);
+  }
+
+  @Test
+  public void testBuildInt32() {
+    PrimitiveType type = Types.required(INT32).named("test_int32");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4, 10));
+    builder.add(stats(type, -11, 7, null));
+    builder.add(stats(type, 2, 2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1, 2));
+    builder.add(stats(type, -21, 8));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
+    assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532, -345, null, null));
+    builder.add(stats(type, -500, -42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -42, 2));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3, 42));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532, 345));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234, 42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 42, -2));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3, -42));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532, null, 234, null, 42, null, null, -3);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, -2, null, null, -42);
+  }
+
+  @Test
+  public void testStaticBuildInt32() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(INT32).named("test_int32"),
+        BoundaryOrder.DESCENDING,
+        asList(false, false, false, true, true, true),
+        asList(0l, 10l, 0l, 3l, 5l, 7l),
+        toBBList(10, 8, 6, null, null, null),
+        toBBList(9, 7, 5, null, null, null));
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 10, 0, 3, 5, 7);
+    assertCorrectNullPages(columnIndex, false, false, false, true, true, true);
+    assertCorrectValues(columnIndex.getMaxValues(), 9, 7, 5, null, null, null);
+    assertCorrectValues(columnIndex.getMinValues(), 10, 8, 6, null, null, null);
+  }
+
+  @Test
+  public void testBuildUInt8() {
+    PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, 4, 10));
+    builder.add(stats(type, 11, 17, null));
+    builder.add(stats(type, 2, 2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1, 0xFF));
+    builder.add(stats(type, 0xEF, 0xFA));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
+    assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0, 0, null, null));
+    builder.add(stats(type, 0, 42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 42, 0xEE));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0xEF, 0xFF));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 0xFF, 0xFF));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 0xEF, 0xEA, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0xEE, 42));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 41, 0));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 0xFF, null, 0xEF, null, 0xEE, null, null, 41);
+    assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, null, 42, null, null, 0);
+  }
+
+  @Test
+  public void testBuildInt64() {
+    PrimitiveType type = Types.required(INT64).named("test_int64");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4l, 10l));
+    builder.add(stats(type, -11l, 7l, null));
+    builder.add(stats(type, 2l, 2l, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1l, 2l));
+    builder.add(stats(type, -21l, 8l));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0l, 1l, 2l, 3l, 0l, 0l);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
+    assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532l, -345l, null, null));
+    builder.add(stats(type, -234l, -42l, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -42l, 2l));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3l, 42l));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532l, 345l));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234l, 42l, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 42l, -2l));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3l, -42l));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532l, null, 234l, null, 42l, null, null, -3l);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, null, -2l, null, null, -42l);
+  }
+
+  @Test
+  public void testStaticBuildInt64() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(INT64).named("test_int64"),
+        BoundaryOrder.UNORDERED,
+        asList(true, false, true, false, true, false),
+        asList(1l, 2l, 3l, 4l, 5l, 6l),
+        toBBList(null, 2l, null, 4l, null, 9l),
+        toBBList(null, 3l, null, 15l, null, 10l));
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 1, 2, 3, 4, 5, 6);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 3l, null, 15l, null, 10l);
+    assertCorrectValues(columnIndex.getMinValues(), null, 2l, null, 4l, null, 9l);
+  }
+
+  @Test
+  public void testNoOpBuilder() {
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getNoOpBuilder();
+    builder.add(stats(Types.required(BINARY).as(UTF8).named("test_binary_utf8"), stringBinary("Jeltz"),
+        stringBinary("Slartibartfast"), null, null));
+    builder.add(stats(Types.required(BOOLEAN).named("test_boolean"), true, true, null, null));
+    builder.add(stats(Types.required(DOUBLE).named("test_double"), null, null, null));
+    builder.add(stats(Types.required(INT32).named("test_int32"), null, null));
+    builder.add(stats(Types.required(INT64).named("test_int64"), -234l, -42l, null));
+    assertNull(builder.build());
+  }
+
+  private static List<ByteBuffer> toBBList(Binary... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Binary value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(value.toByteBuffer());
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Boolean... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Boolean value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.booleanToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Double... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Double value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(value))));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Float... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Float value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.intToBytes(Float.floatToIntBits(value))));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Integer... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Integer value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.intToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Long... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Long value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.longToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static Binary decimalBinary(String num) {
+    return Binary.fromConstantByteArray(new BigDecimal(num).unscaledValue().toByteArray());
+  }
+
+  private static Binary stringBinary(String str) {
+    return Binary.fromString(str);
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Binary... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Binary expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertArrayEquals("Invalid value for page " + i, expectedValue.getBytesUnsafe(), value.array());
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Boolean... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Boolean expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 1 byte long for boolean", 1, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.booleanValue(), value.get(0) != 0);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Double... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Double expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 8 bytes long for double", 8, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.doubleValue(), value.getDouble(0), 0.0);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Float... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Float expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 4 bytes long for double", 4, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.floatValue(), value.getFloat(0), 0.0f);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Integer... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Integer expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 4 bytes long for int32", 4, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.intValue(), value.getInt(0));
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Long... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Long expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 8 bytes long for int64", 8, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.intValue(), value.getLong(0));
+      }
+    }
+  }
+
+  private static void assertCorrectNullCounts(ColumnIndex columnIndex, long... expectedNullCounts) {
+    List<Long> nullCounts = columnIndex.getNullCounts();
+    assertEquals(expectedNullCounts.length, nullCounts.size());
+    for (int i = 0; i < expectedNullCounts.length; ++i) {
+      assertEquals("Invalid null count at page " + i, expectedNullCounts[i], nullCounts.get(i).longValue());
+    }
+  }
+
+  private static void assertCorrectNullPages(ColumnIndex columnIndex, boolean... expectedNullPages) {
+    List<Boolean> nullPages = columnIndex.getNullPages();
+    assertEquals(expectedNullPages.length, nullPages.size());
+    for (int i = 0; i < expectedNullPages.length; ++i) {
+      assertEquals("Invalid null pages at page " + i, expectedNullPages[i], nullPages.get(i).booleanValue());
+    }
+  }
+
+  private static Statistics<?> stats(PrimitiveType type, Object... values) {
+    Statistics<?> stats = Statistics.createStats(type);
+    for (Object value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+        continue;
+      }
+      switch (type.getPrimitiveTypeName()) {
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+        case INT96:
+          stats.updateStats((Binary) value);
+          break;
+        case BOOLEAN:
+          stats.updateStats((boolean) value);
+          break;
+        case DOUBLE:
+          stats.updateStats((double) value);
+          break;
+        case FLOAT:
+          stats.updateStats((float) value);
+          break;
+        case INT32:
+          stats.updateStats((int) value);
+          break;
+        case INT64:
+          stats.updateStats((long) value);
+          break;
+        default:
+          fail("Unsupported value type for stats: " + value.getClass());
+      }
+    }
+    return stats;
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
new file mode 100644
index 0000000..6207084
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.junit.Test;
+
+/**
+ * Tests for {@link OffsetIndexBuilder}.
+ */
+public class TestOffsetIndexBuilder {
+  @Test
+  public void testBuilderWithSizeAndRowCount() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    assertNull(builder.build());
+    assertNull(builder.build(1234));
+
+    builder.add(1000, 10);
+    builder.add(2000, 19);
+    builder.add(3000, 27);
+    builder.add(1200, 9);
+    assertCorrectValues(builder.build(),
+        0, 1000, 0,
+        1000, 2000, 10,
+        3000, 3000, 29,
+        6000, 1200, 56);
+    assertCorrectValues(builder.build(10000),
+        10000, 1000, 0,
+        11000, 2000, 10,
+        13000, 3000, 29,
+        16000, 1200, 56);
+  }
+
+  @Test
+  public void testNoOpBuilderWithSizeAndRowCount() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getNoOpBuilder();
+    builder.add(1, 2);
+    builder.add(3, 4);
+    builder.add(5, 6);
+    builder.add(7, 8);
+    assertNull(builder.build());
+    assertNull(builder.build(1000));
+  }
+
+  @Test
+  public void testBuilderWithOffsetSizeIndex() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    assertNull(builder.build());
+    assertNull(builder.build(1234));
+
+    builder.add(1000, 10000, 0);
+    builder.add(22000, 12000, 100);
+    builder.add(48000, 22000, 211);
+    builder.add(90000, 30000, 361);
+    assertCorrectValues(builder.build(),
+        1000, 10000, 0,
+        22000, 12000, 100,
+        48000, 22000, 211,
+        90000, 30000, 361);
+    assertCorrectValues(builder.build(100000),
+        101000, 10000, 0,
+        122000, 12000, 100,
+        148000, 22000, 211,
+        190000, 30000, 361);
+  }
+
+  @Test
+  public void testNoOpBuilderWithOffsetSizeIndex() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getNoOpBuilder();
+    builder.add(1, 2, 3);
+    builder.add(4, 5, 6);
+    builder.add(7, 8, 9);
+    builder.add(10, 11, 12);
+    assertNull(builder.build());
+    assertNull(builder.build(1000));
+  }
+
+  private void assertCorrectValues(OffsetIndex offsetIndex, long... offset_size_rowIndex_triplets) {
+    assertEquals(offset_size_rowIndex_triplets.length % 3, 0);
+    int pageCount = offset_size_rowIndex_triplets.length / 3;
+    assertEquals("Invalid pageCount", pageCount, offsetIndex.getPageCount());
+    for (int i = 0; i < pageCount; ++i) {
+      assertEquals("Invalid offsetIndex at page " + i, offset_size_rowIndex_triplets[3 * i],
+          offsetIndex.getOffset(i));
+      assertEquals("Invalid compressedPageSize at page " + i, offset_size_rowIndex_triplets[3 * i + 1],
+          offsetIndex.getCompressedPageSize(i));
+      assertEquals("Invalid firstRowIndex at page " + i, offset_size_rowIndex_triplets[3 * i + 2],
+          offsetIndex.getFirstRowIndex(i));
+    }
+  }
+}
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index e7b2446..1e8124d 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -61,6 +61,12 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.yetus</groupId>
+      <artifactId>audience-annotations</artifactId>
+      <version>0.7.0</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 555b856..6fce6f2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -42,7 +42,9 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.format.CompressionCodec;
 import org.apache.parquet.format.PageEncodingStats;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BoundaryOrder;
 import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnIndex;
 import org.apache.parquet.format.ColumnMetaData;
 import org.apache.parquet.format.ColumnOrder;
 import org.apache.parquet.format.ConvertedType;
@@ -53,7 +55,9 @@ import org.apache.parquet.format.Encoding;
 import org.apache.parquet.format.FieldRepetitionType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.KeyValue;
+import org.apache.parquet.format.OffsetIndex;
 import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageLocation;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.RowGroup;
 import org.apache.parquet.format.SchemaElement;
@@ -65,6 +69,9 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 import org.apache.parquet.schema.GroupType;
@@ -247,6 +254,17 @@ public class ParquetMetadataConverter {
 //      columnChunk.meta_data.index_page_offset = ;
 //      columnChunk.meta_data.key_value_metadata = ; // nothing yet
 
+      IndexReference columnIndexRef = columnMetaData.getColumnIndexReference();
+      if (columnIndexRef != null) {
+        columnChunk.setColumn_index_offset(columnIndexRef.getOffset());
+        columnChunk.setColumn_index_length(columnIndexRef.getLength());
+      }
+      IndexReference offsetIndexRef = columnMetaData.getOffsetIndexReference();
+      if (offsetIndexRef != null) {
+        columnChunk.setOffset_index_offset(offsetIndexRef.getOffset());
+        columnChunk.setOffset_index_length(offsetIndexRef.getLength());
+      }
+
       parquetColumns.add(columnChunk);
     }
     RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
@@ -920,6 +938,8 @@ public class ParquetMetadataConverter {
               metaData.num_values,
               metaData.total_compressed_size,
               metaData.total_uncompressed_size);
+          column.setColumnIndexReference(toColumnIndexReference(columnChunk));
+          column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
           // TODO
           // index_page_offset
           // key_value_metadata
@@ -941,6 +961,20 @@ public class ParquetMetadataConverter {
         blocks);
   }
 
+  private static IndexReference toColumnIndexReference(ColumnChunk columnChunk) {
+    if (columnChunk.isSetColumn_index_offset() && columnChunk.isSetColumn_index_length()) {
+      return new IndexReference(columnChunk.getColumn_index_offset(), columnChunk.getColumn_index_length());
+    }
+    return null;
+  }
+
+  private static IndexReference toOffsetIndexReference(ColumnChunk columnChunk) {
+    if (columnChunk.isSetOffset_index_offset() && columnChunk.isSetOffset_index_length()) {
+      return new IndexReference(columnChunk.getOffset_index_offset(), columnChunk.getOffset_index_length());
+    }
+    return null;
+  }
+
   private static ColumnPath getPath(ColumnMetaData metaData) {
     String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
     return ColumnPath.get(path);
@@ -1125,4 +1159,78 @@ public class ParquetMetadataConverter {
     writePageHeader(pageHeader, to);
   }
 
+  private static BoundaryOrder toParquetBoundaryOrder(
+      org.apache.parquet.internal.column.columnindex.BoundaryOrder boundaryOrder) {
+    switch (boundaryOrder) {
+      case ASCENDING:
+        return BoundaryOrder.ASCENDING;
+      case DESCENDING:
+        return BoundaryOrder.DESCENDING;
+      case UNORDERED:
+        return BoundaryOrder.UNORDERED;
+      default:
+        throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder);
+    }
+  }
+
+  private static org.apache.parquet.internal.column.columnindex.BoundaryOrder fromParquetBoundaryOrder(
+      BoundaryOrder boundaryOrder) {
+    switch (boundaryOrder) {
+      case ASCENDING:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING;
+      case DESCENDING:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING;
+      case UNORDERED:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED;
+      default:
+        throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder);
+    }
+  }
+
+  public static ColumnIndex toParquetColumnIndex(PrimitiveType type,
+      org.apache.parquet.internal.column.columnindex.ColumnIndex columnIndex) {
+    if (!isMinMaxStatsSupported(type) || columnIndex == null) {
+      return null;
+    }
+    ColumnIndex parquetColumnIndex = new ColumnIndex(
+        columnIndex.getNullPages(),
+        columnIndex.getMinValues(),
+        columnIndex.getMaxValues(),
+        toParquetBoundaryOrder(columnIndex.getBoundaryOrder()));
+    parquetColumnIndex.setNull_counts(columnIndex.getNullCounts());
+    return parquetColumnIndex;
+  }
+
+  public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex(PrimitiveType type,
+      ColumnIndex parquetColumnIndex) {
+    if (!isMinMaxStatsSupported(type)) {
+      return null;
+    }
+    return ColumnIndexBuilder.build(type,
+        fromParquetBoundaryOrder(parquetColumnIndex.getBoundary_order()),
+        parquetColumnIndex.getNull_pages(),
+        parquetColumnIndex.getNull_counts(),
+        parquetColumnIndex.getMin_values(),
+        parquetColumnIndex.getMax_values());
+  }
+
+  public static OffsetIndex toParquetOffsetIndex(org.apache.parquet.internal.column.columnindex.OffsetIndex offsetIndex) {
+    List<PageLocation> pageLocations = new ArrayList<>(offsetIndex.getPageCount());
+    for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+      pageLocations.add(new PageLocation(
+          offsetIndex.getOffset(i),
+          offsetIndex.getCompressedPageSize(i),
+          offsetIndex.getFirstRowIndex(i)));
+    }
+    return new OffsetIndex(pageLocations);
+  }
+
+  public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromParquetOffsetIndex(
+      OffsetIndex parquetOffsetIndex) {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    for (PageLocation pageLocation : parquetOffsetIndex.getPage_locations()) {
+      builder.add(pageLocation.getOffset(), pageLocation.getCompressed_page_size(), pageLocation.getFirst_row_index());
+    }
+    return builder.build();
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 82c288f..0646493 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -37,6 +37,8 @@ import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.bytes.ByteBufferAllocator;
@@ -67,6 +69,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     private Set<Encoding> dlEncodings = new HashSet<Encoding>();
     private List<Encoding> dataEncodings = new ArrayList<Encoding>();
 
+    private ColumnIndexBuilder columnIndexBuilder;
+    private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
     private final ByteBufferAllocator allocator;
 
@@ -77,11 +81,25 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
+      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType());
+      this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    }
+
+    @Override
+    @Deprecated
+    public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding,
+        Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+      // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk
+      columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+      offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+
+      writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding);
     }
 
     @Override
     public void writePage(BytesInput bytes,
                           int valueCount,
+                          int rowCount,
                           Statistics statistics,
                           Encoding rlEncoding,
                           Encoding dlEncoding,
@@ -121,6 +139,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
         totalStatistics.mergeStatistics(statistics);
       }
 
+      columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
+
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
@@ -166,6 +187,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
         totalStatistics.mergeStatistics(statistics);
       }
 
+      columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);
+
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(
@@ -193,14 +217,20 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     }
 
     public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        // tracking the dictionary encoding is handled in writeDictionaryPage
-      }
-      writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics,
-          rlEncodings, dlEncodings, dataEncodings);
-      writer.endColumn();
+      writer.writeColumnChunk(
+          path,
+          totalValueCount,
+          compressor.getCodecName(),
+          dictionaryPage,
+          buf,
+          uncompressedLength,
+          compressedLength,
+          totalStatistics,
+          columnIndexBuilder,
+          offsetIndexBuilder,
+          rlEncodings,
+          dlEncodings,
+          dataEncodings);
       if (LOG.isDebugEnabled()) {
         LOG.debug(
             String.format(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 95aaec4..eda4745 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -85,10 +85,14 @@ import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.yetus.audience.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -903,6 +907,40 @@ public class ParquetFileReader implements Closeable {
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
+  /**
+   * @param column
+   *          the column chunk which the column index is to be returned for
+   * @return the column index for the specified column chunk or {@code null} if there is no index
+   * @throws IOException
+   *           if any I/O error occurs during reading the file
+   */
+  @Private
+  public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException {
+    IndexReference ref = column.getColumnIndexReference();
+    if (ref == null) {
+      return null;
+    }
+    f.seek(ref.getOffset());
+    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(f));
+  }
+
+  /**
+   * @param column
+   *          the column chunk which the offset index is to be returned for
+   * @return the offset index for the specified column chunk or {@code null} if there is no index
+   * @throws IOException
+   *           if any I/O error occurs during reading the file
+   */
+  @Private
+  public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException {
+    IndexReference ref = column.getOffsetIndexReference();
+    if (ref == null) {
+      return null;
+    }
+    f.seek(ref.getOffset());
+    return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c98c247..bd0f683 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -50,6 +50,7 @@ import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -59,6 +60,11 @@ import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.SeekableInputStream;
@@ -97,9 +103,17 @@ public class ParquetFileWriter {
   // file data
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
 
+  // The column/offset indexes per blocks per column chunks
+  private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
+  private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+
   // row group data
   private BlockMetaData currentBlock; // appended to by endColumn
 
+  // The column/offset indexes for the actual block
+  private List<ColumnIndex> currentColumnIndexes;
+  private List<OffsetIndex> currentOffsetIndexes;
+
   // row group data set at the start of a row group
   private long currentRecordCount; // set in startBlock
 
@@ -109,6 +123,9 @@ public class ParquetFileWriter {
   private long uncompressedLength;
   private long compressedLength;
   private Statistics currentStatistics; // accumulated in writePage(s)
+  private ColumnIndexBuilder columnIndexBuilder;
+  private OffsetIndexBuilder offsetIndexBuilder;
+  private long firstPageOffset;
 
   // column chunk data set at the start of a column
   private CompressionCodecName currentChunkCodec; // set in startColumn
@@ -296,6 +313,9 @@ public class ParquetFileWriter {
 
     currentBlock = new BlockMetaData();
     currentRecordCount = recordCount;
+
+    currentColumnIndexes = new ArrayList<>();
+    currentOffsetIndexes = new ArrayList<>();
   }
 
   /**
@@ -320,6 +340,10 @@ public class ParquetFileWriter {
     uncompressedLength = 0;
     // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
     currentStatistics = null;
+
+    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType);
+    offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    firstPageOffset = -1;
   }
 
   /**
@@ -367,6 +391,9 @@ public class ParquetFileWriter {
       Encoding dlEncoding,
       Encoding valuesEncoding) throws IOException {
     state = state.write();
+    // We are unable to build indexes without rowCount so skip them for this column
+    offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+    columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
     long beforeHeader = out.getPos();
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
     int compressedPageSize = (int)bytes.size();
@@ -398,18 +425,63 @@ public class ParquetFileWriter {
    * @param dlEncoding encoding of the definition level
    * @param valuesEncoding encoding of values
    * @throws IOException if there is an error while writing
+   * @deprecated this method does not support writing column indexes; Use
+   *             {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead
+   */
+  @Deprecated
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
+    // We are unable to build indexes without rowCount so skip them for this column
+    offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+    columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+  }
+
+  /**
+   * Writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics the statistics of the page
+   * @param rowCount the number of rows in the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @throws IOException if any I/O error occurs during writing the file
    */
   public void writeDataPage(
       int valueCount, int uncompressedPageSize,
       BytesInput bytes,
       Statistics statistics,
+      long rowCount,
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
+    long beforeHeader = out.getPos();
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+
+    offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+  }
+
+  private void innerWriteDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
       Encoding rlEncoding,
       Encoding dlEncoding,
       Encoding valuesEncoding) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
+    if (firstPageOffset == -1) {
+      firstPageOffset = beforeHeader;
+    }
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
-    int compressedPageSize = (int)bytes.size();
+    int compressedPageSize = (int) bytes.size();
     metadataConverter.writeDataPageHeader(
         uncompressedPageSize, compressedPageSize,
         valueCount,
@@ -431,6 +503,8 @@ public class ParquetFileWriter {
       currentStatistics.mergeStatistics(statistics);
     }
 
+    columnIndexBuilder.add(statistics);
+
     encodingStatsBuilder.addDataEncoding(valuesEncoding);
     currentEncodings.add(rlEncoding);
     currentEncodings.add(dlEncoding);
@@ -438,25 +512,47 @@ public class ParquetFileWriter {
   }
 
   /**
-   * writes a number of pages at once
-   * @param bytes bytes to be written including page headers
+   * Writes a column chunk at once
+   * @param descriptor the descriptor of the column
+   * @param valueCount the value count in this column
+   * @param compressionCodecName the name of the compression codec used for compressing the pages
+   * @param dictionaryPage the dictionary page for this column chunk (might be null)
+   * @param bytes the encoded pages including page headers to be written as is
    * @param uncompressedTotalPageSize total uncompressed size (without page headers)
    * @param compressedTotalPageSize total compressed size (without page headers)
+   * @param totalStats accumulated statistics for the column chunk
+   * @param columnIndexBuilder the builder object for the column index
+   * @param offsetIndexBuilder the builder object for the offset index
+   * @param rlEncodings the RL encodings used in this column chunk
+   * @param dlEncodings the DL encodings used in this column chunk
+   * @param dataEncodings the data encodings used in this column chunk
    * @throws IOException if there is an error while writing
    */
-  void writeDataPages(BytesInput bytes,
-                      long uncompressedTotalPageSize,
-                      long compressedTotalPageSize,
-                      Statistics totalStats,
-                      Set<Encoding> rlEncodings,
-                      Set<Encoding> dlEncodings,
-                      List<Encoding> dataEncodings) throws IOException {
+  void writeColumnChunk(ColumnDescriptor descriptor,
+      long valueCount,
+      CompressionCodecName compressionCodecName,
+      DictionaryPage dictionaryPage,
+      BytesInput bytes,
+      long uncompressedTotalPageSize,
+      long compressedTotalPageSize,
+      Statistics<?> totalStats,
+      ColumnIndexBuilder columnIndexBuilder,
+      OffsetIndexBuilder offsetIndexBuilder,
+      Set<Encoding> rlEncodings,
+      Set<Encoding> dlEncodings,
+      List<Encoding> dataEncodings) throws IOException {
+    startColumn(descriptor, valueCount, compressionCodecName);
+
     state = state.write();
+    if (dictionaryPage != null) {
+      writeDictionaryPage(dictionaryPage);
+    }
     LOG.debug("{}: write data pages", out.getPos());
     long headersSize = bytes.size() - compressedTotalPageSize;
     this.uncompressedLength += uncompressedTotalPageSize + headersSize;
     this.compressedLength += compressedTotalPageSize + headersSize;
     LOG.debug("{}: write data pages content", out.getPos());
+    firstPageOffset = out.getPos();
     bytes.writeAllTo(out);
     encodingStatsBuilder.addDataEncodings(dataEncodings);
     if (rlEncodings.isEmpty()) {
@@ -466,6 +562,11 @@ public class ParquetFileWriter {
     currentEncodings.addAll(dlEncodings);
     currentEncodings.addAll(dataEncodings);
     currentStatistics = totalStats;
+
+    this.columnIndexBuilder = columnIndexBuilder;
+    this.offsetIndexBuilder = offsetIndexBuilder;
+
+    endColumn();
   }
 
   /**
@@ -475,6 +576,8 @@ public class ParquetFileWriter {
   public void endColumn() throws IOException {
     state = state.endColumn();
     LOG.debug("{}: end column", out.getPos());
+    currentColumnIndexes.add(columnIndexBuilder.build());
+    currentOffsetIndexes.add(offsetIndexBuilder.build(firstPageOffset));
     currentBlock.addColumn(ColumnChunkMetaData.get(
         currentChunkPath,
         currentChunkType,
@@ -490,6 +593,8 @@ public class ParquetFileWriter {
     this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
     this.uncompressedLength = 0;
     this.compressedLength = 0;
+    columnIndexBuilder = null;
+    offsetIndexBuilder = null;
   }
 
   /**
@@ -501,6 +606,10 @@ public class ParquetFileWriter {
     LOG.debug("{}: end block", out.getPos());
     currentBlock.setRowCount(currentRecordCount);
     blocks.add(currentBlock);
+    columnIndexes.add(currentColumnIndexes);
+    offsetIndexes.add(currentOffsetIndexes);
+    currentColumnIndexes = null;
+    currentOffsetIndexes = null;
     currentBlock = null;
   }
 
@@ -613,6 +722,11 @@ public class ParquetFileWriter {
         length = 0;
       }
 
+      // TODO: column/offset indexes are not copied
+      // (it would require seeking to the end of the file for each row groups)
+      currentColumnIndexes.add(null);
+      currentOffsetIndexes.add(null);
+
       currentBlock.addColumn(ColumnChunkMetaData.get(
           chunk.getPath(),
           chunk.getPrimitiveType(),
@@ -679,12 +793,57 @@ public class ParquetFileWriter {
    */
   public void end(Map<String, String> extraMetaData) throws IOException {
     state = state.end();
+    serializeColumnIndexes(columnIndexes, blocks, out);
+    serializeOffsetIndexes(offsetIndexes, blocks, out);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
     serializeFooter(footer, out);
     out.close();
   }
 
+  private static void serializeColumnIndexes(
+      List<List<ColumnIndex>> columnIndexes,
+      List<BlockMetaData> blocks,
+      PositionOutputStream out) throws IOException {
+    LOG.debug("{}: column indexes", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        ColumnChunkMetaData column = columns.get(cIndex);
+        org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter
+            .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex));
+        if (columnIndex == null) {
+          continue;
+        }
+        long offset = out.getPos();
+        Util.writeColumnIndex(columnIndex, out);
+        column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+      }
+    }
+  }
+
+  private static void serializeOffsetIndexes(
+      List<List<OffsetIndex>> offsetIndexes,
+      List<BlockMetaData> blocks,
+      PositionOutputStream out) throws IOException {
+    LOG.debug("{}: offset indexes", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
+        if (offsetIndex == null) {
+          continue;
+        }
+        ColumnChunkMetaData column = columns.get(cIndex);
+        long offset = out.getPos();
+        Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out);
+        column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+      }
+    }
+  }
+
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
     long footerIndex = out.getPos();
     org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index fb94247..5f474fd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -24,9 +24,11 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Types;
+import org.apache.yetus.audience.InterfaceAudience.Private;
 
 /**
  * Column meta data for a block stored in the file footer and passed in the InputSplit
@@ -168,6 +170,9 @@ abstract public class ColumnChunkMetaData {
   // we save 3 references by storing together the column properties that have few distinct values
   private final ColumnChunkProperties properties;
 
+  private IndexReference columnIndexReference;
+  private IndexReference offsetIndexReference;
+
   protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
     this(null, columnChunkProperties);
   }
@@ -238,6 +243,40 @@ abstract public class ColumnChunkMetaData {
   abstract public Statistics getStatistics();
 
   /**
+   * @return the reference to the column index
+   */
+  @Private
+  public IndexReference getColumnIndexReference() {
+    return columnIndexReference;
+  }
+
+  /**
+   * @param indexReference
+   *          the reference to the column index
+   */
+  @Private
+  public void setColumnIndexReference(IndexReference indexReference) {
+    this.columnIndexReference = indexReference;
+  }
+
+  /**
+   * @return the reference to the offset index
+   */
+  @Private
+  public IndexReference getOffsetIndexReference() {
+    return offsetIndexReference;
+  }
+
+  /**
+   * @param offsetIndexReference
+   *          the reference to the offset index
+   */
+  @Private
+  public void setOffsetIndexReference(IndexReference offsetIndexReference) {
+    this.offsetIndexReference = offsetIndexReference;
+  }
+
+  /**
    * @return all the encodings used in this column
    */
   public Set<Encoding> getEncodings() {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java b/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java
new file mode 100644
index 0000000..5e02f1e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.hadoop.metadata;
+
+/**
+ * Reference to an index (OffsetIndex and ColumnIndex) for a row-group containing the offset and length values so the
+ * reader can read the referenced data.
+ */
+public class IndexReference {
+  private final long offset;
+  private final int length;
+
+  public IndexReference(long offset, int length) {
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public int getLength() {
+    return length;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 6cce32f..b9382fc 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -23,6 +23,7 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.filte
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -65,6 +66,11 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 import org.junit.Assert;
@@ -892,4 +898,60 @@ public class TestParquetMetadataConverter {
     assertEquals(ColumnOrder.undefined(), columns.get(1).getPrimitiveType().columnOrder());
     assertEquals(ColumnOrder.undefined(), columns.get(2).getPrimitiveType().columnOrder());
   }
+
+  @Test
+  public void testOffsetIndexConversion() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    builder.add(1000, 10000, 0);
+    builder.add(22000, 12000, 100);
+    OffsetIndex offsetIndex = ParquetMetadataConverter
+        .fromParquetOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(builder.build(100000)));
+    assertEquals(2, offsetIndex.getPageCount());
+    assertEquals(101000, offsetIndex.getOffset(0));
+    assertEquals(10000, offsetIndex.getCompressedPageSize(0));
+    assertEquals(0, offsetIndex.getFirstRowIndex(0));
+    assertEquals(122000, offsetIndex.getOffset(1));
+    assertEquals(12000, offsetIndex.getCompressedPageSize(1));
+    assertEquals(100, offsetIndex.getFirstRowIndex(1));
+  }
+
+  @Test
+  public void testColumnIndexConversion() {
+    PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    Statistics<?> stats = Statistics.createStats(type);
+    stats.incrementNumNulls(16);
+    stats.updateStats(-100l);
+    stats.updateStats(100l);
+    builder.add(stats);
+    stats = Statistics.createStats(type);
+    stats.incrementNumNulls(111);
+    builder.add(stats);
+    stats = Statistics.createStats(type);
+    stats.updateStats(200l);
+    stats.updateStats(500l);
+    builder.add(stats);
+    org.apache.parquet.format.ColumnIndex parquetColumnIndex = 
+        ParquetMetadataConverter.toParquetColumnIndex(type, builder.build());
+    ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex);
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages()));
+    assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts()));
+    assertTrue(Arrays.asList(
+        ByteBuffer.wrap(BytesUtils.longToBytes(-100l)),
+        ByteBuffer.allocate(0),
+        ByteBuffer.wrap(BytesUtils.longToBytes(200l))).equals(columnIndex.getMinValues()));
+    assertTrue(Arrays.asList(
+        ByteBuffer.wrap(BytesUtils.longToBytes(100l)),
+        ByteBuffer.allocate(0),
+        ByteBuffer.wrap(BytesUtils.longToBytes(500l))).equals(columnIndex.getMaxValues()));
+
+    assertNull("Should handle null column index", ParquetMetadataConverter
+        .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT32).named("test_int32"), null));
+    assertNull("Should ignore unsupported types", ParquetMetadataConverter
+        .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex));
+    assertNull("Should ignore unsupported types",
+        ParquetMetadataConverter.fromParquetColumnIndex(Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+            .length(12).as(OriginalType.INTERVAL).named("test_interval"), parquetColumnIndex));
+  }
 }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index a5381f0..b787268 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -18,8 +18,13 @@
  */
 package org.apache.parquet.hadoop;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.inOrder;
 import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.column.Encoding.RLE;
@@ -51,13 +56,23 @@ import org.apache.parquet.bytes.LittleEndianDataInputStream;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -66,6 +81,40 @@ import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 public class TestColumnChunkPageWriteStore {
 
+  // OutputFile implementation to expose the PositionOutputStream internally used by the writer
+  private static class OutputFileForTesting implements OutputFile {
+    private PositionOutputStream out;
+    private final HadoopOutputFile file;
+
+    OutputFileForTesting(Path path, Configuration conf) throws IOException {
+      file = HadoopOutputFile.fromPath(path, conf);
+    }
+
+    PositionOutputStream out() {
+      return out;
+    }
+
+    @Override
+    public PositionOutputStream create(long blockSizeHint) throws IOException {
+      return out = file.create(blockSizeHint);
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
+      return out = file.createOrOverwrite(blockSizeHint);
+    }
+
+    @Override
+    public boolean supportsBlockSize() {
+      return file.supportsBlockSize();
+    }
+
+    @Override
+    public long defaultBlockSize() {
+      return file.defaultBlockSize();
+    }
+  }
+
   private int pageSize = 1024;
   private int initialSize = 1024;
   private Configuration conf;
@@ -98,11 +147,18 @@ public class TestColumnChunkPageWriteStore {
     BytesInput data = BytesInput.fromInt(v);
     int rowCount = 5;
     int nullCount = 1;
+    statistics.incrementNumNulls(nullCount);
+    statistics.setMinMaxFromBytes(new byte[] {0, 1, 2}, new byte[] {0, 1, 2, 3});
+    long pageOffset;
+    long pageSize;
 
     {
-      ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+      OutputFileForTesting outputFile = new OutputFileForTesting(file, conf);
+      ParquetFileWriter writer = new ParquetFileWriter(outputFile, schema, Mode.CREATE,
+          ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
       writer.start();
       writer.startBlock(rowCount);
+      pageOffset = outputFile.out().getPos();
       {
         ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
         PageWriter pageWriter = store.getPageWriter(col);
@@ -112,6 +168,7 @@ public class TestColumnChunkPageWriteStore {
             dataEncoding, data,
             statistics);
         store.flushToFileWriter(writer);
+        pageSize = outputFile.out().getPos() - pageOffset;
       }
       writer.endBlock();
       writer.end(new HashMap<String, String>());
@@ -132,6 +189,20 @@ public class TestColumnChunkPageWriteStore {
       assertEquals(dataEncoding, page.getDataEncoding());
       assertEquals(v, intValue(page.getData()));
       assertEquals(statistics.toString(), page.getStatistics().toString());
+
+      // Checking column/offset indexes for the one page
+      ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
+      ColumnIndex columnIndex = reader.readColumnIndex(column);
+      assertArrayEquals(statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
+      assertArrayEquals(statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
+      assertEquals(statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
+      assertFalse(columnIndex.getNullPages().get(0));
+      OffsetIndex offsetIndex = reader.readOffsetIndex(column);
+      assertEquals(1, offsetIndex.getPageCount());
+      assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(pageOffset, offsetIndex.getOffset(0));
+
       reader.close();
     }
   }
@@ -175,8 +246,20 @@ public class TestColumnChunkPageWriteStore {
     store.flushToFileWriter(mockFileWriter);
 
     for (ColumnDescriptor col : schema.getColumns()) {
-      inOrder.verify(mockFileWriter).startColumn(
-          eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+      inOrder.verify(mockFileWriter).writeColumnChunk(
+          eq(col),
+          eq((long) fakeCount),
+          eq(UNCOMPRESSED),
+          isNull(DictionaryPage.class),
+          any(),
+          eq(fakeData.size()),
+          eq(fakeData.size()),
+          eq(fakeStats),
+          same(ColumnIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no column index
+          same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index
+          any(),
+          any(),
+          any());
     }
   }
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 095b575..a8de38c 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
@@ -41,7 +42,11 @@ import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.format.Statistics;
 import org.apache.parquet.hadoop.metadata.*;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -51,6 +56,8 @@ import org.apache.parquet.schema.Types;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.*;
 
 import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
@@ -766,4 +773,135 @@ public class TestParquetFileWriter {
     ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers, JobSummaryLevel.ALL);
   }
 
+  @Test
+  public void testColumnIndexWriteRead() throws Exception {
+    File testFile = temp.newFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 5, CODEC);
+    long c1p1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), statsC1(null, Binary.fromString("aaa")), 1, BIT_PACKED, BIT_PACKED,
+        PLAIN);
+    long c1p2Starts = w.getPos();
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), statsC1(Binary.fromString("bbb"), Binary.fromString("ccc")), 3,
+        BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Ends = w.getPos();
+    w.startColumn(C2, 6, CODEC);
+    long c2p1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), statsC2(117l, 100l), 1, BIT_PACKED, BIT_PACKED, PLAIN);
+    long c2p2Starts = w.getPos();
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), statsC2(null, null, null), 2, BIT_PACKED, BIT_PACKED, PLAIN);
+    long c2p3Starts = w.getPos();
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), statsC2(0l), 1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration),
+        ParquetReadOptions.builder().build())) {
+      ParquetMetadata footer = reader.getFooter();
+      assertEquals(3, footer.getBlocks().size());
+      BlockMetaData blockMeta = footer.getBlocks().get(1);
+      assertEquals(2, blockMeta.getColumns().size());
+
+      ColumnIndex columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(0));
+      assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+      assertTrue(Arrays.asList(1l, 0l).equals(columnIndex.getNullCounts()));
+      assertTrue(Arrays.asList(false, false).equals(columnIndex.getNullPages()));
+      List<ByteBuffer> minValues = columnIndex.getMinValues();
+      assertEquals(2, minValues.size());
+      List<ByteBuffer> maxValues = columnIndex.getMaxValues();
+      assertEquals(2, maxValues.size());
+      assertEquals("aaa", new String(minValues.get(0).array(), StandardCharsets.UTF_8));
+      assertEquals("aaa", new String(maxValues.get(0).array(), StandardCharsets.UTF_8));
+      assertEquals("bbb", new String(minValues.get(1).array(), StandardCharsets.UTF_8));
+      assertEquals("ccc", new String(maxValues.get(1).array(), StandardCharsets.UTF_8));
+
+      columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(1));
+      assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+      assertTrue(Arrays.asList(0l, 3l, 0l).equals(columnIndex.getNullCounts()));
+      assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages()));
+      minValues = columnIndex.getMinValues();
+      assertEquals(3, minValues.size());
+      maxValues = columnIndex.getMaxValues();
+      assertEquals(3, maxValues.size());
+      assertEquals(100, BytesUtils.bytesToLong(minValues.get(0).array()));
+      assertEquals(117, BytesUtils.bytesToLong(maxValues.get(0).array()));
+      assertEquals(0, minValues.get(1).array().length);
+      assertEquals(0, maxValues.get(1).array().length);
+      assertEquals(0, BytesUtils.bytesToLong(minValues.get(2).array()));
+      assertEquals(0, BytesUtils.bytesToLong(maxValues.get(2).array()));
+
+      OffsetIndex offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(0));
+      assertEquals(2, offsetIndex.getPageCount());
+      assertEquals(c1p1Starts, offsetIndex.getOffset(0));
+      assertEquals(c1p2Starts, offsetIndex.getOffset(1));
+      assertEquals(c1p2Starts - c1p1Starts, offsetIndex.getCompressedPageSize(0));
+      assertEquals(c1Ends - c1p2Starts, offsetIndex.getCompressedPageSize(1));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(1, offsetIndex.getFirstRowIndex(1));
+
+      offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(1));
+      assertEquals(3, offsetIndex.getPageCount());
+      assertEquals(c2p1Starts, offsetIndex.getOffset(0));
+      assertEquals(c2p2Starts, offsetIndex.getOffset(1));
+      assertEquals(c2p3Starts, offsetIndex.getOffset(2));
+      assertEquals(c2p2Starts - c2p1Starts, offsetIndex.getCompressedPageSize(0));
+      assertEquals(c2p3Starts - c2p2Starts, offsetIndex.getCompressedPageSize(1));
+      assertEquals(c2Ends - c2p3Starts, offsetIndex.getCompressedPageSize(2));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(1, offsetIndex.getFirstRowIndex(1));
+      assertEquals(3, offsetIndex.getFirstRowIndex(2));
+    }
+  }
+
+  private org.apache.parquet.column.statistics.Statistics<?> statsC1(Binary... values) {
+    org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics
+        .createStats(C1.getPrimitiveType());
+    for (Binary value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+      } else {
+        stats.updateStats(value);
+      }
+    }
+    return stats;
+  }
+
+  private org.apache.parquet.column.statistics.Statistics<?> statsC2(Long... values) {
+    org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics
+        .createStats(C2.getPrimitiveType());
+    for (Long value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+      } else {
+        stats.updateStats(value);
+      }
+    }
+    return stats;
+  }
 }
diff --git a/pom.xml b/pom.xml
index 7b3f36f..dbd68bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
     <hadoop1.version>1.2.1</hadoop1.version>
     <cascading.version>2.7.1</cascading.version>
     <cascading3.version>3.1.2</cascading3.version>
-    <parquet.format.version>2.4.0</parquet.format.version>
+    <parquet.format.version>2.5.0</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
     <scala.version>2.10.6</scala.version>

-- 
To stop receiving notification emails like this one, please contact
zivanfi@apache.org.

Mime
View raw message