parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-306: Add row group alignment
Date Tue, 23 Jun 2015 00:11:30 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 89321a2de -> 412ab9669


PARQUET-306: Add row group alignment

This adds `AlignmentStrategy` to the `ParquetFileWriter` that can alter the position of row groups and recommend a target size for the next row group. There are two strategies: `NoAlignment` and `PaddingAlignment`. Padding alignment is used for HDFS and no alignment is used for all other file systems. When HDFS-3689 is available, we can add a strategy to use that.

The amount of padding is controlled by a threshold between 0 and 1 that controls the fraction of the row group size that can be padded. This is interpreted as the maximum amount of padding that is acceptable, in terms of the row group size. For example, setting this to 5% will write padding when the bytes left in a HDFS block are less than 5% of the row group size. This defaults to 0%, which prevents padding from being added and matches the current behavior. The threshold is controlled by a new OutputFormat configuration property, `parquet.writer.padding-thresh`.

Author: Ryan Blue <blue@apache.org>

Closes #211 from rdblue/PARQUET-306-row-group-alignment and squashes the following commits:

0137ddf [Ryan Blue] PARQUET-306: Add MR test with padding.
6ce3f08 [Ryan Blue] PARQUET-306: Add parquet.writer.max-padding setting.
f1dc659 [Ryan Blue] PARQUET-306: Base next row group size on bytes remaining.
c6a3e97 [Ryan Blue] PARQUET-306: Add AlignmentStrategy to ParquetFileWriter.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/412ab966
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/412ab966
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/412ab966

Branch: refs/heads/master
Commit: 412ab9669810921d04f9feabfbeafa906d4de506
Parents: 89321a2
Author: Ryan Blue <blue@apache.org>
Authored: Mon Jun 22 17:11:27 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Jun 22 17:11:27 2015 -0700

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordWriter.java     |  20 +-
 .../parquet/hadoop/ParquetFileWriter.java       | 186 +++++++++--
 .../parquet/hadoop/ParquetOutputFormat.java     |  26 +-
 .../apache/parquet/hadoop/ParquetWriter.java    |   6 +-
 .../TestInputOutputFormatWithPadding.java       | 216 +++++++++++++
 .../parquet/hadoop/TestParquetFileWriter.java   | 315 ++++++++++++++++---
 6 files changed, 692 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 9968b5d..d12086d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,13 +28,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.parquet.Ints;
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.column.impl.ColumnWriteStoreV1;
-import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -54,6 +51,7 @@ class InternalParquetRecordWriter<T> {
   private final Map<String, String> extraMetaData;
   private final long rowGroupSize;
   private long rowGroupSizeThreshold;
+  private long nextRowGroupSize;
   private final int pageSize;
   private final BytesCompressor compressor;
   private final boolean validating;
@@ -92,6 +90,7 @@ class InternalParquetRecordWriter<T> {
     this.extraMetaData = extraMetaData;
     this.rowGroupSize = rowGroupSize;
     this.rowGroupSizeThreshold = rowGroupSize;
+    this.nextRowGroupSize = rowGroupSizeThreshold;
     this.pageSize = pageSize;
     this.compressor = compressor;
     this.validating = validating;
@@ -126,15 +125,17 @@ class InternalParquetRecordWriter<T> {
   private void checkBlockSizeReached() throws IOException {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
       long memSize = columnStore.getBufferedSize();
-      if (memSize > rowGroupSizeThreshold) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
+      long recordSize = memSize / recordCount;
+      // flush the row group if it is within ~2 records of the limit
+      // it is much better to be slightly under size than to be over at all
+      if (memSize > (nextRowGroupSize - 2 * recordSize)) {
+        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, nextRowGroupSize, recordCount));
         flushRowGroupToStore();
         initStore();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
       } else {
-        float recordSize = (float) memSize / recordCount;
         recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
+            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
             recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
             );
         if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
@@ -145,7 +146,7 @@ class InternalParquetRecordWriter<T> {
   private void flushRowGroupToStore()
       throws IOException {
     LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
-    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
+    if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
       LOG.warn("Too much memory used: " + columnStore.memUsageString());
     }
 
@@ -155,6 +156,9 @@ class InternalParquetRecordWriter<T> {
       pageStore.flushToFileWriter(parquetFileWriter);
       recordCount = 0;
       parquetFileWriter.endBlock();
+      this.nextRowGroupSize = Math.min(
+          parquetFileWriter.getNextRowGroupSize(),
+          rowGroupSizeThreshold);
     }
 
     columnStore = null;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 7c034b7..65423b5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
 
 import static org.apache.parquet.Log.DEBUG;
 import static org.apache.parquet.format.Util.writeFileMetaData;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -41,6 +43,7 @@ import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -69,6 +72,22 @@ public class ParquetFileWriter {
   public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
   public static final int CURRENT_VERSION = 1;
 
+  // need to supply a buffer size when setting block size. this is the default
+  // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
+  private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
+
+  // visible for testing
+  static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
+  static {
+    BLOCK_FS_SCHEMES.add("hdfs");
+    BLOCK_FS_SCHEMES.add("webhdfs");
+    BLOCK_FS_SCHEMES.add("viewfs");
+  }
+
+  private static boolean supportsBlockSize(FileSystem fs) {
+    return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
+  }
+
   // File creation modes
   public static enum Mode {
     CREATE,
@@ -79,13 +98,13 @@ public class ParquetFileWriter {
 
   private final MessageType schema;
   private final FSDataOutputStream out;
+  private final AlignmentStrategy alignment;
   private BlockMetaData currentBlock;
-  private ColumnChunkMetaData currentColumn;
   private long currentRecordCount;
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
   private long uncompressedLength;
   private long compressedLength;
-  private Set<org.apache.parquet.column.Encoding> currentEncodings;
+  private Set<Encoding> currentEncodings;
 
   private CompressionCodecName currentChunkCodec;
   private ColumnPath currentChunkPath;
@@ -157,7 +176,8 @@ public class ParquetFileWriter {
    */
   public ParquetFileWriter(Configuration configuration, MessageType schema,
       Path file) throws IOException {
-    this(configuration, schema, file, Mode.CREATE);
+    this(configuration, schema, file, Mode.CREATE, DEFAULT_BLOCK_SIZE,
+        MAX_PADDING_SIZE_DEFAULT);
   }
 
   /**
@@ -168,12 +188,60 @@ public class ParquetFileWriter {
    * @throws IOException if the file can not be created
    */
   public ParquetFileWriter(Configuration configuration, MessageType schema,
-      Path file, Mode mode) throws IOException {
-    super();
+                           Path file, Mode mode) throws IOException {
+    this(configuration, schema, file, mode, DEFAULT_BLOCK_SIZE,
+        MAX_PADDING_SIZE_DEFAULT);
+  }
+
+  /**
+   * @param configuration Hadoop configuration
+   * @param schema the schema of the data
+   * @param file the file to write to
+   * @param mode file creation mode
+   * @param rowGroupSize the row group size
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(Configuration configuration, MessageType schema,
+                           Path file, Mode mode, long rowGroupSize,
+                           int maxPaddingSize)
+      throws IOException {
     this.schema = schema;
     FileSystem fs = file.getFileSystem(configuration);
     boolean overwriteFlag = (mode == Mode.OVERWRITE);
-    this.out = fs.create(file, overwriteFlag);
+
+    if (supportsBlockSize(fs)) {
+      // use the default block size, unless row group size is larger
+      long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
+
+      this.alignment = PaddingAlignment.get(
+          dfsBlockSize, rowGroupSize, maxPaddingSize);
+      this.out = fs.create(file, overwriteFlag, DFS_BUFFER_SIZE_DEFAULT,
+          fs.getDefaultReplication(file), dfsBlockSize);
+
+    } else {
+      this.alignment = NoAlignment.get(rowGroupSize);
+      this.out = fs.create(file, overwriteFlag);
+    }
+  }
+
+  /**
+   * FOR TESTING ONLY.
+   *
+   * @param configuration Hadoop configuration
+   * @param schema the schema of the data
+   * @param file the file to write to
+   * @param rowAndBlockSize the row group size
+   * @throws IOException if the file can not be created
+   */
+  ParquetFileWriter(Configuration configuration, MessageType schema,
+                    Path file, long rowAndBlockSize, int maxPaddingSize)
+      throws IOException {
+    FileSystem fs = file.getFileSystem(configuration);
+    this.schema = schema;
+    this.alignment = PaddingAlignment.get(
+        rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
+    this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT,
+        fs.getDefaultReplication(file), rowAndBlockSize);
   }
 
   /**
@@ -195,6 +263,9 @@ public class ParquetFileWriter {
     state = state.startBlock();
     if (DEBUG) LOG.debug(out.getPos() + ": start block");
 //    out.write(MAGIC); // TODO: add a magic delimiter
+
+    alignment.alignForRowGroup(out);
+
     currentBlock = new BlockMetaData();
     currentRecordCount = recordCount;
   }
@@ -203,7 +274,6 @@ public class ParquetFileWriter {
    * start a column inside a block
    * @param descriptor the column descriptor
    * @param valueCount the value count in this column
-   * @param statistics the statistics in this column
    * @param compressionCodecName
    * @throws IOException
    */
@@ -211,8 +281,7 @@ public class ParquetFileWriter {
                           long valueCount,
                           CompressionCodecName compressionCodecName) throws IOException {
     state = state.startColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
-    currentEncodings = new HashSet<org.apache.parquet.column.Encoding>();
+    currentEncodings = new HashSet<Encoding>();
     currentChunkPath = ColumnPath.get(descriptor.getPath());
     currentChunkType = descriptor.getType();
     currentChunkCodec = compressionCodecName;
@@ -263,9 +332,9 @@ public class ParquetFileWriter {
   public void writeDataPage(
       int valueCount, int uncompressedPageSize,
       BytesInput bytes,
-      org.apache.parquet.column.Encoding rlEncoding,
-      org.apache.parquet.column.Encoding dlEncoding,
-      org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
     if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
@@ -300,9 +369,9 @@ public class ParquetFileWriter {
       int valueCount, int uncompressedPageSize,
       BytesInput bytes,
       Statistics statistics,
-      org.apache.parquet.column.Encoding rlEncoding,
-      org.apache.parquet.column.Encoding dlEncoding,
-      org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
     if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
@@ -337,7 +406,7 @@ public class ParquetFileWriter {
                        long uncompressedTotalPageSize,
                        long compressedTotalPageSize,
                        Statistics totalStats,
-                       List<org.apache.parquet.column.Encoding> encodings) throws IOException {
+                       List<Encoding> encodings) throws IOException {
     state = state.write();
     if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
     long headersSize = bytes.size() - compressedTotalPageSize;
@@ -367,8 +436,6 @@ public class ParquetFileWriter {
         currentChunkValueCount,
         compressedLength,
         uncompressedLength));
-    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
-    currentColumn = null;
     this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
     this.uncompressedLength = 0;
     this.compressedLength = 0;
@@ -464,6 +531,10 @@ public class ParquetFileWriter {
     return out.getPos();
   }
 
+  public long getNextRowGroupSize() throws IOException {
+    return alignment.nextRowGroupSize(out);
+  }
+
   /**
    * Will merge the metadata of all the footers together
    * @param footers the list files footers to merge
@@ -550,4 +621,83 @@ public class ParquetFileWriter {
     return mergedSchema.union(toMerge, strict);
   }
 
+  private interface AlignmentStrategy {
+    void alignForRowGroup(FSDataOutputStream out) throws IOException;
+
+    long nextRowGroupSize(FSDataOutputStream out) throws IOException;
+  }
+
+  private static class NoAlignment implements AlignmentStrategy {
+    public static NoAlignment get(long rowGroupSize) {
+      return new NoAlignment(rowGroupSize);
+    }
+
+    private final long rowGroupSize;
+
+    private NoAlignment(long rowGroupSize) {
+      this.rowGroupSize = rowGroupSize;
+    }
+
+    @Override
+    public void alignForRowGroup(FSDataOutputStream out) {
+    }
+
+    @Override
+    public long nextRowGroupSize(FSDataOutputStream out) {
+      return rowGroupSize;
+    }
+  }
+
+  /**
+   * Alignment strategy that pads when less than half the row group size is
+   * left before the next DFS block.
+   */
+  private static class PaddingAlignment implements AlignmentStrategy {
+    private static final byte[] zeros = new byte[4096];
+
+    public static PaddingAlignment get(long dfsBlockSize, long rowGroupSize,
+                                       int maxPaddingSize) {
+      return new PaddingAlignment(dfsBlockSize, rowGroupSize, maxPaddingSize);
+    }
+
+    protected final long dfsBlockSize;
+    protected final long rowGroupSize;
+    protected final int maxPaddingSize;
+
+    private PaddingAlignment(long dfsBlockSize, long rowGroupSize,
+                             int maxPaddingSize) {
+      this.dfsBlockSize = dfsBlockSize;
+      this.rowGroupSize = rowGroupSize;
+      this.maxPaddingSize = maxPaddingSize;
+    }
+
+    @Override
+    public void alignForRowGroup(FSDataOutputStream out) throws IOException {
+      long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+      if (isPaddingNeeded(remaining)) {
+        if (DEBUG) LOG.debug("Adding " + remaining + " bytes of padding (" +
+            "row group size=" + rowGroupSize + "B, " +
+            "block size=" + dfsBlockSize + "B)");
+        for (; remaining > 0; remaining -= zeros.length) {
+          out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
+        }
+      }
+    }
+
+    @Override
+    public long nextRowGroupSize(FSDataOutputStream out) throws IOException {
+      long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+      if (isPaddingNeeded(remaining)) {
+        return rowGroupSize;
+      }
+
+      return Math.min(remaining, rowGroupSize);
+    }
+
+    protected boolean isPaddingNeeded(long remaining) {
+      return (remaining <= maxPaddingSize);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index ea3101a..e075db3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
 import org.apache.parquet.hadoop.codec.CodecConfig;
@@ -80,6 +81,10 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
  * # To enable/disable summary metadata aggregation at the end of a MR job
  * # The default is true (enabled)
  * parquet.enable.summary-metadata=true # false to disable summary aggregation
+ *
+ * # Maximum size (in bytes) allowed as padding to align row groups
+ * # This is also the minimum size of a row group. Default: 0
+ * parquet.writer.max-padding=2097152 # 2 MB
  * </pre>
  *
  * If parquet.compression is not set, the following properties are checked (FileOutputFormat behavior).
@@ -109,6 +114,10 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
   public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
   public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
+  public static final String MAX_PADDING_BYTES    = "parquet.writer.max-padding";
+
+  // default to no padding for now
+  private static final int DEFAULT_MAX_PADDING_SIZE = 0;
 
   public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
     getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -225,6 +234,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return CodecConfig.from(taskAttemptContext).getCodec();
   }
 
+  public static void setMaxPaddingSize(JobContext jobContext, int maxPaddingSize) {
+    setMaxPaddingSize(getConfiguration(jobContext), maxPaddingSize);
+  }
+
+  public static void setMaxPaddingSize(Configuration conf, int maxPaddingSize) {
+    conf.setInt(MAX_PADDING_BYTES, maxPaddingSize);
+  }
+
+  private static int getMaxPaddingSize(Configuration conf) {
+    // default to no padding, 0% of the row group size
+    return conf.getInt(MAX_PADDING_BYTES, DEFAULT_MAX_PADDING_SIZE);
+  }
 
 
   private WriteSupport<T> writeSupport;
@@ -284,9 +305,12 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
     WriterVersion writerVersion = getWriterVersion(conf);
     if (INFO) LOG.info("Writer version is: " + writerVersion);
+    int maxPaddingSize = getMaxPaddingSize(conf);
+    if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
 
     WriteContext init = writeSupport.init(conf);
-    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
+    ParquetFileWriter w = new ParquetFileWriter(
+        conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 6253c99..c9b9eac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -44,6 +44,9 @@ public class ParquetWriter<T> implements Closeable {
   public static final WriterVersion DEFAULT_WRITER_VERSION =
       WriterVersion.PARQUET_1_0;
 
+  // max size (bytes) to write as padding and the min size of a row group
+  public static final int MAX_PADDING_SIZE_DEFAULT = 0;
+
   private final InternalParquetRecordWriter<T> writer;
 
   /**
@@ -208,8 +211,9 @@ public class ParquetWriter<T> implements Closeable {
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
 
+    // TODO: in a follow-up issue, add max padding to the builder
     ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file,
-        mode);
+        mode, blockSize, MAX_PADDING_SIZE_DEFAULT);
     fileWriter.start();
 
     CodecFactory codecFactory = new CodecFactory(conf);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
new file mode 100644
index 0000000..dcb0c59
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
@@ -0,0 +1,216 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.lang.Thread.sleep;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+public class TestInputOutputFormatWithPadding {
+  public static final String FILE_CONTENT = "" +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+  public static MessageType PARQUET_TYPE = Types.buildMessage()
+      .required(BINARY).as(UTF8).named("uuid")
+      .required(BINARY).as(UTF8).named("char")
+      .named("FormatTestObject");
+
+  /**
+   * ParquetInputFormat that will not split the input file (easier validation)
+   */
+  private static class NoSplits extends ParquetInputFormat {
+    @Override
+    protected boolean isSplitable(JobContext context, Path filename) {
+      return false;
+    }
+  }
+
+  public static class Writer extends Mapper<LongWritable, Text, Void, Group> {
+    public static final SimpleGroupFactory GROUP_FACTORY = new SimpleGroupFactory(PARQUET_TYPE);
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      // writes each character of the line with a UUID
+      String line = value.toString();
+      for (int i = 0; i < line.length(); i += 1) {
+        Group group = GROUP_FACTORY.newGroup();
+        group.add(0, Binary.fromString(UUID.randomUUID().toString()));
+        group.add(1, Binary.fromString(line.substring(i, i+1)));
+        context.write(null, group);
+      }
+    }
+  }
+
+  public static class Reader extends Mapper<Void, Group, LongWritable, Text> {
+    @Override
+    protected void map(Void key, Group value, Context context)
+        throws IOException, InterruptedException {
+      context.write(null, new Text(value.getString("char", 0)));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testBasicBehaviorWithPadding() throws Exception {
+    ParquetFileWriter.BLOCK_FS_SCHEMES.add("file");
+
+    File inputFile = temp.newFile();
+    FileOutputStream out = new FileOutputStream(inputFile);
+    out.write(FILE_CONTENT.getBytes("UTF-8"));
+    out.close();
+
+    File tempFolder = temp.newFolder();
+    tempFolder.delete();
+    Path tempPath = new Path(tempFolder.toURI());
+
+    File outputFolder = temp.newFile();
+    outputFolder.delete();
+
+    Configuration conf = new Configuration();
+    // May test against multiple hadoop versions
+    conf.set("dfs.block.size", "1024");
+    conf.set("dfs.blocksize", "1024");
+    conf.set("dfs.blockSize", "1024");
+    conf.set("fs.local.block.size", "1024");
+
+    // don't use a cached FS with a different block size
+    conf.set("fs.file.impl.disable.cache", "true");
+
+    // disable summary metadata, it isn't needed
+    conf.set("parquet.enable.summary-metadata", "false");
+    conf.set("parquet.example.schema", PARQUET_TYPE.toString());
+
+    {
+      Job writeJob = new Job(conf, "write");
+      writeJob.setInputFormatClass(TextInputFormat.class);
+      TextInputFormat.addInputPath(writeJob, new Path(inputFile.toString()));
+
+      writeJob.setOutputFormatClass(ParquetOutputFormat.class);
+      writeJob.setMapperClass(Writer.class);
+      writeJob.setNumReduceTasks(0); // write directly to Parquet without reduce
+      ParquetOutputFormat.setWriteSupportClass(writeJob, GroupWriteSupport.class);
+      ParquetOutputFormat.setBlockSize(writeJob, 1024);
+      ParquetOutputFormat.setPageSize(writeJob, 512);
+      ParquetOutputFormat.setDictionaryPageSize(writeJob, 512);
+      ParquetOutputFormat.setEnableDictionary(writeJob, true);
+      ParquetOutputFormat.setMaxPaddingSize(writeJob, 1023); // always pad
+      ParquetOutputFormat.setOutputPath(writeJob, tempPath);
+
+      waitForJob(writeJob);
+    }
+
+    // make sure padding was added
+    File parquetFile = getDataFile(tempFolder);
+    ParquetMetadata footer = ParquetFileReader.readFooter(conf,
+        new Path(parquetFile.toString()), ParquetMetadataConverter.NO_FILTER);
+    for (BlockMetaData block : footer.getBlocks()) {
+      Assert.assertTrue("Block should start at a multiple of the block size",
+          block.getStartingPos() % 1024 == 0);
+    }
+
+    {
+      Job readJob = new Job(conf, "read");
+      readJob.setInputFormatClass(NoSplits.class);
+      ParquetInputFormat.setReadSupportClass(readJob, GroupReadSupport.class);
+      TextInputFormat.addInputPath(readJob, tempPath);
+
+      readJob.setOutputFormatClass(TextOutputFormat.class);
+      readJob.setMapperClass(Reader.class);
+      readJob.setNumReduceTasks(0); // write directly to text without reduce
+      TextOutputFormat.setOutputPath(readJob, new Path(outputFolder.toString()));
+
+      waitForJob(readJob);
+    }
+
+    File dataFile = getDataFile(outputFolder);
+    Assert.assertNotNull("Should find a data file", dataFile);
+
+    StringBuilder contentBuilder = new StringBuilder();
+    for (String line : Files.readAllLines(
+        Paths.get(dataFile.toURI()), Charset.forName("UTF-8"))) {
+      contentBuilder.append(line);
+    }
+    String reconstructed = contentBuilder.toString();
+    Assert.assertEquals("Should match written file content",
+        FILE_CONTENT, reconstructed);
+
+    ParquetFileWriter.BLOCK_FS_SCHEMES.remove("file");
+  }
+
+  private void waitForJob(Job job) throws Exception {
+    job.submit();
+    while (!job.isComplete()) {
+      sleep(100);
+    }
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+
+  private File getDataFile(File location) {
+    File[] files = location.listFiles();
+    File dataFile = null;
+    if (files != null) {
+      for (File file : files) {
+        if (file.getName().startsWith("part-")) {
+          dataFile = file;
+          break;
+        }
+      }
+    }
+    return dataFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/412ab966/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 20ad3e9..2c255d5 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -20,9 +20,12 @@ package org.apache.parquet.hadoop;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesUtils;
+import org.junit.Rule;
 import org.junit.Test;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
@@ -47,6 +50,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
 import static org.junit.Assert.*;
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
 import static org.apache.parquet.column.Encoding.PLAIN;
@@ -58,18 +62,43 @@ import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroup;
 
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.junit.rules.TemporaryFolder;
 
 public class TestParquetFileWriter {
+
   private static final Log LOG = Log.getLog(TestParquetFileWriter.class);
+
+  private static final MessageType SCHEMA = MessageTypeParser.parseMessageType("" +
+      "message m {" +
+      "  required group a {" +
+      "    required binary b;" +
+      "  }" +
+      "  required group c {" +
+      "    required int64 d;" +
+      "  }" +
+      "}");
+  private static final String[] PATH1 = {"a", "b"};
+  private static final ColumnDescriptor C1 = SCHEMA.getColumnDescription(PATH1);
+  private static final String[] PATH2 = {"c", "d"};
+  private static final ColumnDescriptor C2 = SCHEMA.getColumnDescription(PATH2);
+
+  private static final byte[] BYTES1 = { 0, 1, 2, 3 };
+  private static final byte[] BYTES2 = { 1, 2, 3, 4 };
+  private static final byte[] BYTES3 = { 2, 3, 4, 5 };
+  private static final byte[] BYTES4 = { 3, 4, 5, 6 };
+  private static final CompressionCodecName CODEC = CompressionCodecName.UNCOMPRESSED;
+
+  private static final BinaryStatistics STATS1 = new BinaryStatistics();
+  private static final BinaryStatistics STATS2 = new BinaryStatistics();
+
   private String writeSchema;
 
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
   @Test
   public void testWriteMode() throws Exception {
-    File testDir = new File("target/test/TestParquetFileWriter/");
-    testDir.mkdirs();
-    File testFile = new File(testDir, "testParquetFile");
-    testFile = testFile.getAbsoluteFile();
-    testFile.createNewFile();
+    File testFile = temp.newFile();
     MessageType schema = MessageTypeParser.parseMessageType(
         "message m { required group a {required binary b;} required group "
         + "c { required int64 d; }}");
@@ -88,7 +117,7 @@ public class TestParquetFileWriter {
     exceptionThrown = false;
     try {
       writer = new ParquetFileWriter(conf, schema, path,
-          ParquetFileWriter.Mode.OVERWRITE);
+          OVERWRITE);
     } catch(IOException ioe2) {
       exceptionThrown = true;
     }
@@ -98,56 +127,238 @@ public class TestParquetFileWriter {
 
   @Test
   public void testWriteRead() throws Exception {
-
-    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    File testFile = temp.newFile();
     testFile.delete();
 
     Path path = new Path(testFile.toURI());
     Configuration configuration = new Configuration();
 
-    MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
-    String[] path1 = {"a", "b"};
-    ColumnDescriptor c1 = schema.getColumnDescription(path1);
-    String[] path2 = {"c", "d"};
-    ColumnDescriptor c2 = schema.getColumnDescription(path2);
+   ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(C1, 5, CODEC);
+    long c1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Ends = w.getPos();
+    w.startColumn(C2, 6, CODEC);
+    long c2Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
 
-    byte[] bytes1 = { 0, 1, 2, 3};
-    byte[] bytes2 = { 1, 2, 3, 4};
-    byte[] bytes3 = { 2, 3, 4, 5};
-    byte[] bytes4 = { 3, 4, 5, 6};
-    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
+    assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
+    assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
+    assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+    HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
+    expectedEncoding.add(PLAIN);
+    expectedEncoding.add(BIT_PACKED);
+    assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
 
-    BinaryStatistics stats1 = new BinaryStatistics();
-    BinaryStatistics stats2 = new BinaryStatistics();
+    { // read first block of col #1
+      ParquetFileReader r = new ParquetFileReader(configuration, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+      assertNull(r.readNextRowGroup());
+    }
+
+    { // read all blocks of col #1 and #2
+
+      ParquetFileReader r = new ParquetFileReader(configuration, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+
+      pages = r.readNextRowGroup();
+      assertEquals(4, pages.getRowCount());
+
+      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+
+      assertNull(r.readNextRowGroup());
+    }
+    PrintFooter.main(new String[] {path.toString()});
+  }
+
+  @Test
+  public void testAlignmentWithPadding() throws Exception {
+    File testFile = temp.newFile();
+
+    Path path = new Path(testFile.toURI());
+    Configuration conf = new Configuration();
+
+    // uses the test constructor
+    ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60);
 
-    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
     w.start();
     w.startBlock(3);
-    w.startColumn(c1, 5, codec);
+    w.startColumn(C1, 5, CODEC);
     long c1Starts = w.getPos();
-    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
-    w.writeDataPage(3, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
     long c1Ends = w.getPos();
-    w.startColumn(c2, 6, codec);
+    w.startColumn(C2, 6, CODEC);
     long c2Starts = w.getPos();
-    w.writeDataPage(2, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
-    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
-    w.writeDataPage(1, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
     long c2Ends = w.getPos();
     w.endBlock();
+
+    long firstRowGroupEnds = w.getPos(); // should be 109
+
     w.startBlock(4);
-    w.startColumn(c1, 7, codec);
-    w.writeDataPage(7, 4, BytesInput.from(bytes3), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
-    w.startColumn(c2, 8, codec);
-    w.writeDataPage(8, 4, BytesInput.from(bytes4), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
     w.endBlock();
+
+    long secondRowGroupEnds = w.getPos();
+
     w.end(new HashMap<String, String>());
 
-    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    FileSystem fs = path.getFileSystem(conf);
+    long fileLen = fs.getFileStatus(path).getLen();
+
+    FSDataInputStream data = fs.open(path);
+    data.seek(fileLen - 8); // 4-byte offset + "PAR1"
+    long footerLen = BytesUtils.readIntLittleEndian(data);
+    long startFooter = fileLen - footerLen - 8;
+
+    assertEquals("Footer should start after second row group without padding",
+        secondRowGroupEnds, startFooter);
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path);
+    assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
+    assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
+    assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
+    assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+    HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
+    expectedEncoding.add(PLAIN);
+    expectedEncoding.add(BIT_PACKED);
+    assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+
+    // verify block starting positions with padding
+    assertEquals("First row group should start after magic",
+        4, readFooter.getBlocks().get(0).getStartingPos());
+    assertTrue("First row group should end before the block size (120)",
+        firstRowGroupEnds < 120);
+    assertEquals("Second row group should start at the block size",
+        120, readFooter.getBlocks().get(1).getStartingPos());
+
+    { // read first block of col #1
+      ParquetFileReader r = new ParquetFileReader(conf, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+      assertNull(r.readNextRowGroup());
+    }
+
+    { // read all blocks of col #1 and #2
+
+      ParquetFileReader r = new ParquetFileReader(conf, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+
+      pages = r.readNextRowGroup();
+      assertEquals(4, pages.getRowCount());
+
+      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+
+      assertNull(r.readNextRowGroup());
+    }
+    PrintFooter.main(new String[] {path.toString()});
+  }
+
+  @Test
+  public void testAlignmentWithNoPaddingNeeded() throws Exception {
+    File testFile = temp.newFile();
+
+    Path path = new Path(testFile.toURI());
+    Configuration conf = new Configuration();
+
+    // uses the test constructor
+    ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50);
+
+    w.start();
+    w.startBlock(3);
+    w.startColumn(C1, 5, CODEC);
+    long c1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Ends = w.getPos();
+    w.startColumn(C2, 6, CODEC);
+    long c2Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+
+    long firstRowGroupEnds = w.getPos(); // should be 109
+
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), STATS1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), STATS2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+
+    long secondRowGroupEnds = w.getPos();
+
+    w.end(new HashMap<String, String>());
+
+    FileSystem fs = path.getFileSystem(conf);
+    long fileLen = fs.getFileStatus(path).getLen();
+
+    FSDataInputStream data = fs.open(path);
+    data.seek(fileLen - 8); // 4-byte offset + "PAR1"
+    long footerLen = BytesUtils.readIntLittleEndian(data);
+    long startFooter = fileLen - footerLen - 8;
+
+    assertEquals("Footer should start after second row group without padding",
+        secondRowGroupEnds, startFooter);
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path);
     assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
     assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
     assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
@@ -157,32 +368,40 @@ public class TestParquetFileWriter {
     expectedEncoding.add(BIT_PACKED);
     assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
 
+    // verify block starting positions with padding
+    assertEquals("First row group should start after magic",
+        4, readFooter.getBlocks().get(0).getStartingPos());
+    assertTrue("First row group should end before the block size (120)",
+        firstRowGroupEnds > 100);
+    assertEquals("Second row group should start after no padding",
+        109, readFooter.getBlocks().get(1).getStartingPos());
+
     { // read first block of col #1
-      ParquetFileReader r = new ParquetFileReader(configuration, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(path1)));
+      ParquetFileReader r = new ParquetFileReader(conf, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
       PageReadStore pages = r.readNextRowGroup();
       assertEquals(3, pages.getRowCount());
-      validateContains(schema, pages, path1, 2, BytesInput.from(bytes1));
-      validateContains(schema, pages, path1, 3, BytesInput.from(bytes1));
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
       assertNull(r.readNextRowGroup());
     }
 
     { // read all blocks of col #1 and #2
 
-      ParquetFileReader r = new ParquetFileReader(configuration, path, readFooter.getBlocks(), Arrays.asList(schema.getColumnDescription(path1), schema.getColumnDescription(path2)));
+      ParquetFileReader r = new ParquetFileReader(conf, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
 
       PageReadStore pages = r.readNextRowGroup();
       assertEquals(3, pages.getRowCount());
-      validateContains(schema, pages, path1, 2, BytesInput.from(bytes1));
-      validateContains(schema, pages, path1, 3, BytesInput.from(bytes1));
-      validateContains(schema, pages, path2, 2, BytesInput.from(bytes2));
-      validateContains(schema, pages, path2, 3, BytesInput.from(bytes2));
-      validateContains(schema, pages, path2, 1, BytesInput.from(bytes2));
+      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
 
       pages = r.readNextRowGroup();
       assertEquals(4, pages.getRowCount());
 
-      validateContains(schema, pages, path1, 7, BytesInput.from(bytes3));
-      validateContains(schema, pages, path2, 8, BytesInput.from(bytes4));
+      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
 
       assertNull(r.readNextRowGroup());
     }
@@ -207,8 +426,7 @@ public class TestParquetFileWriter {
 
   @Test
   public void testWriteReadStatistics() throws Exception {
-
-    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    File testFile = temp.newFile();
     testFile.delete();
 
     Path path = new Path(testFile.toURI());
@@ -296,7 +514,7 @@ public class TestParquetFileWriter {
   @Test
   public void testMetaDataFile() throws Exception {
 
-    File testDir = new File("target/test/TestParquetFileWriter/testMetaDataFileDir").getAbsoluteFile();
+    File testDir = temp.newFolder();
 
     Path testDirPath = new Path(testDir.toURI());
     Configuration configuration = new Configuration();
@@ -338,8 +556,7 @@ public class TestParquetFileWriter {
 
   @Test
   public void testWriteReadStatisticsAllNulls() throws Exception {
-
-    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    File testFile = temp.newFile();
     testFile.delete();
 
     writeSchema = "message example {\n" +


Mime
View raw message