parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tians...@apache.org
Subject [2/2] git commit: PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.
Date Fri, 05 Sep 2014 18:33:08 GMT
PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.

This will improve reading big datasets with a large schema (thousands of columns)
Instead rowgroup metadata can be read in the tasks where each tasks reads only the metadata of the file it's reading

Author: julien <julien@twitter.com>

Closes #45 from julienledem/skip_reading_row_groups and squashes the following commits:

ccdd08c [julien] fix parquet-hive
24a2050 [julien] Merge branch 'master' into skip_reading_row_groups
3d7e35a [julien] adress review feedback
5b6bd1b [julien] more tests
323d254 [julien] sdd unit tests
f599259 [julien] review feedback
fb11f02 [julien] fix backward compatibility check
2c20b46 [julien] cleanup readFooters methods
3da37d8 [julien] fix read summary
ab95a45 [julien] cleanup
4d16df3 [julien] implement task side metadata
9bb8059 [julien] first stab at integrating skipping row groups


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/5dafd127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/5dafd127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/5dafd127

Branch: refs/heads/master
Commit: 5dafd127f3de7c516ce9c1b7329087a01ab2fc57
Parents: 647b8a7
Author: julien <julien@twitter.com>
Authored: Fri Sep 5 11:32:46 2014 -0700
Committer: Tianshuo Deng <tdeng@twitter.com>
Committed: Fri Sep 5 11:32:46 2014 -0700

----------------------------------------------------------------------
 .../converter/ParquetMetadataConverter.java     | 181 +++++-
 .../java/parquet/hadoop/ParquetFileReader.java  | 213 +++++--
 .../java/parquet/hadoop/ParquetFileWriter.java  |  31 +-
 .../java/parquet/hadoop/ParquetInputFormat.java | 638 ++++++++++++-------
 .../java/parquet/hadoop/ParquetInputSplit.java  | 340 ++++------
 .../parquet/hadoop/ParquetOutputCommitter.java  |   1 -
 .../parquet/hadoop/ParquetOutputFormat.java     |   1 -
 .../main/java/parquet/hadoop/ParquetReader.java |   6 +-
 .../parquet/hadoop/ParquetRecordReader.java     |  70 +-
 .../main/java/parquet/hadoop/PrintFooter.java   |   3 +-
 .../mapred/DeprecatedParquetInputFormat.java    |  30 +-
 .../mapred/DeprecatedParquetOutputFormat.java   |  17 +-
 .../converter/TestParquetMetadataConverter.java | 128 +++-
 .../hadoop/DeprecatedInputFormatTest.java       |   2 +-
 .../java/parquet/hadoop/TestInputFormat.java    | 235 ++++++-
 .../parquet/hadoop/TestParquetFileWriter.java   |  14 +-
 .../parquet/hadoop/TestParquetInputSplit.java   |  45 --
 .../hadoop/example/TestInputOutputFormat.java   |  24 +-
 .../read/ParquetRecordReaderWrapper.java        |  51 +-
 .../main/java/parquet/pig/ParquetLoader.java    |  38 +-
 .../main/java/parquet/pig/TupleReadSupport.java |  14 +-
 pom.xml                                         |   3 +-
 22 files changed, 1361 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 5bd6869..76834d5 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import parquet.Log;
 import parquet.common.schema.ColumnPath;
 import parquet.format.ColumnChunk;
+import parquet.format.ColumnMetaData;
 import parquet.format.ConvertedType;
 import parquet.format.DataPageHeader;
 import parquet.format.DictionaryPageHeader;
@@ -58,7 +59,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.TypeVisitor;
 import parquet.schema.Types;
-
+import static java.lang.Math.min;
 import static parquet.format.Util.readFileMetaData;
 import static parquet.format.Util.writePageHeader;
 
@@ -340,8 +341,124 @@ public class ParquetMetadataConverter {
     fileMetaData.addToKey_value_metadata(keyValue);
   }
 
+  private static interface MetadataFilterVisitor<T, E extends Throwable> {
+    T visit(NoFilter filter) throws E;
+    T visit(SkipMetadataFilter filter) throws E;
+    T visit(RangeMetadataFilter filter) throws E;
+  }
+
+  public abstract static class MetadataFilter {
+    private MetadataFilter() {}
+    abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
+  }
+  public static final MetadataFilter NO_FILTER = new NoFilter();
+  public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
+  /**
+   * [ startOffset, endOffset )
+   * @param startOffset
+   * @param endOffset
+   * @return the filter
+   */
+  public static final MetadataFilter range(long startOffset, long endOffset) {
+    return new RangeMetadataFilter(startOffset, endOffset);
+  }
+  private static final class NoFilter extends MetadataFilter {
+    private NoFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "NO_FILTER";
+    }
+  }
+  private static final class SkipMetadataFilter extends MetadataFilter {
+    private SkipMetadataFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "SKIP_ROW_GROUPS";
+    }
+  }
+  /**
+   * [ startOffset, endOffset )
+   * @author Julien Le Dem
+   */
+  static final class RangeMetadataFilter extends MetadataFilter {
+    final long startOffset;
+    final long endOffset;
+    RangeMetadataFilter(long startOffset, long endOffset) {
+      super();
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    boolean contains(long offset) {
+      return offset >= this.startOffset && offset < this.endOffset;
+    }
+    @Override
+    public String toString() {
+      return "range(s:" + startOffset + ", e:" + endOffset + ")";
+    }
+  }
+
+  @Deprecated
   public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
-    FileMetaData fileMetaData = readFileMetaData(from);
+    return readParquetMetadata(from, NO_FILTER);
+  }
+
+  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
+    List<RowGroup> rowGroups = metaData.getRow_groups();
+    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+    for (RowGroup rowGroup : rowGroups) {
+      long totalSize = 0;
+      long startIndex = getOffset(rowGroup.getColumns().get(0));
+      for (ColumnChunk col : rowGroup.getColumns()) {
+        totalSize += col.getMeta_data().getTotal_compressed_size();
+      }
+      long midPoint = startIndex + totalSize / 2;
+      if (filter.contains(midPoint)) {
+        newRowGroups.add(rowGroup);
+      }
+    }
+    metaData.setRow_groups(newRowGroups);
+    return metaData;
+  }
+
+  static long getOffset(RowGroup rowGroup) {
+    return getOffset(rowGroup.getColumns().get(0));
+  }
+  static long getOffset(ColumnChunk columnChunk) {
+    ColumnMetaData md = columnChunk.getMeta_data();
+    long offset = md.getData_page_offset();
+    if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
+      offset = md.getDictionary_page_offset();
+    }
+    return offset;
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+      @Override
+      public FileMetaData visit(NoFilter filter) throws IOException {
+        return readFileMetaData(from);
+      }
+      @Override
+      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
+        return readFileMetaData(from, true);
+      }
+      @Override
+      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
+        return filterFileMetaData(readFileMetaData(from), filter);
+      }
+    });
     if (Log.DEBUG) LOG.debug(fileMetaData);
     ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
     if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
@@ -352,37 +469,39 @@ public class ParquetMetadataConverter {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
-    for (RowGroup rowGroup : row_groups) {
-      BlockMetaData blockMetaData = new BlockMetaData();
-      blockMetaData.setRowCount(rowGroup.getNum_rows());
-      blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
-      List<ColumnChunk> columns = rowGroup.getColumns();
-      String filePath = columns.get(0).getFile_path();
-      for (ColumnChunk columnChunk : columns) {
-        if ((filePath == null && columnChunk.getFile_path() != null)
-            || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
-          throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+    if (row_groups != null) {
+      for (RowGroup rowGroup : row_groups) {
+        BlockMetaData blockMetaData = new BlockMetaData();
+        blockMetaData.setRowCount(rowGroup.getNum_rows());
+        blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        List<ColumnChunk> columns = rowGroup.getColumns();
+        String filePath = columns.get(0).getFile_path();
+        for (ColumnChunk columnChunk : columns) {
+          if ((filePath == null && columnChunk.getFile_path() != null)
+              || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
+            throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+          }
+          parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
+          ColumnPath path = getPath(metaData);
+          ColumnChunkMetaData column = ColumnChunkMetaData.get(
+              path,
+              messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
+              CompressionCodecName.fromParquet(metaData.codec),
+              fromFormatEncodings(metaData.encodings),
+              fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+              metaData.data_page_offset,
+              metaData.dictionary_page_offset,
+              metaData.num_values,
+              metaData.total_compressed_size,
+              metaData.total_uncompressed_size);
+          // TODO
+          // index_page_offset
+          // key_value_metadata
+          blockMetaData.addColumn(column);
         }
-        parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
-        ColumnPath path = getPath(metaData);
-        ColumnChunkMetaData column = ColumnChunkMetaData.get(
-            path,
-            messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
-            CompressionCodecName.fromParquet(metaData.codec),
-            fromFormatEncodings(metaData.encodings),
-            fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
-            metaData.data_page_offset,
-            metaData.dictionary_page_offset,
-            metaData.num_values,
-            metaData.total_compressed_size,
-            metaData.total_uncompressed_size);
-        // TODO
-        // index_page_offset
-        // key_value_metadata
-        blockMetaData.addColumn(column);
+        blockMetaData.setPath(filePath);
+        blocks.add(blockMetaData);
       }
-      blockMetaData.setPath(filePath);
-      blocks.add(blockMetaData);
     }
     Map<String, String> keyValueMetaData = new HashMap<String, String>();
     List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index e660c9f..49f1fab 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,13 +15,21 @@
  */
 package parquet.hadoop;
 
+import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static parquet.hadoop.ParquetFileWriter.*;
+
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -39,6 +47,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.Utils;
 
 import parquet.Log;
@@ -51,6 +60,7 @@ import parquet.common.schema.ColumnPath;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
+import parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
 import parquet.hadoop.metadata.BlockMetaData;
@@ -59,11 +69,6 @@ import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.io.ParquetDecodingException;
 
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
 /**
  * Internal implementation of the Parquet file reader as a block container
  *
@@ -84,7 +89,28 @@ public class ParquetFileReader implements Closeable {
    * @return the footers for those files using the summary file if possible.
    * @throws IOException
    */
-  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+  @Deprecated
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
+  }
+
+  private static MetadataFilter filter(boolean skipRowGroups) {
+    return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
+  }
+
+  /**
+   * for files provided, check if there's a summary file.
+   * If a summary file is found it is used otherwise the file footer is used.
+   * @param configuration the hadoop conf to connect to the file system;
+   * @param partFiles the part files to read
+   * @param skipRowGroups to skipRowGroups in the footers
+   * @return the footers for those files using the summary file if possible.
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
+      final Configuration configuration,
+      final Collection<FileStatus> partFiles,
+      final boolean skipRowGroups) throws IOException {
 
     // figure out list of all parents to part files
     Set<Path> parents = new HashSet<Path>();
@@ -98,12 +124,17 @@ public class ParquetFileReader implements Closeable {
       summaries.add(new Callable<Map<Path, Footer>>() {
         @Override
         public Map<Path, Footer> call() throws Exception {
-          // fileSystem is thread-safe
-          FileSystem fileSystem = path.getFileSystem(configuration);
-          Path summaryFile = new Path(path, PARQUET_METADATA_FILE);
-          if (fileSystem.exists(summaryFile)) {
-            if (Log.INFO) LOG.info("reading summary file: " + summaryFile);
-            final List<Footer> footers = readSummaryFile(configuration, fileSystem.getFileStatus(summaryFile));
+          ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
+          if (mergedMetadata != null) {
+            final List<Footer> footers;
+            if (skipRowGroups) {
+              footers = new ArrayList<Footer>();
+              for (FileStatus f : partFiles) {
+                footers.add(new Footer(f.getPath(), mergedMetadata));
+              }
+            } else {
+              footers = footersFromSummaryFile(path, mergedMetadata);
+            }
             Map<Path, Footer> map = new HashMap<Path, Footer>();
             for (Footer footer : footers) {
               // the folder may have been moved
@@ -143,7 +174,7 @@ public class ParquetFileReader implements Closeable {
     if (toRead.size() > 0) {
       // read the footers of the files that did not have a summary file
       if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
-      result.addAll(readAllFootersInParallel(configuration, toRead));
+      result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
     }
 
     return result;
@@ -170,14 +201,28 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  @Deprecated
   public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallel(configuration, partFiles, false);
+  }
+
+  /**
+   * read all the footers of the files provided
+   * (not using summary files)
+   * @param configuration the conf to access the File System
+   * @param partFiles the files to read
+   * @param skipRowGroups to skip the rowGroup info
+   * @return the footers
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
     List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
     for (final FileStatus currentFile : partFiles) {
       footers.add(new Callable<Footer>() {
         @Override
         public Footer call() throws Exception {
           try {
-            return new Footer(currentFile.getPath(), ParquetFileReader.readFooter(configuration, currentFile));
+            return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
           } catch (IOException e) {
             throw new IOException("Could not read footer for file " + currentFile, e);
           }
@@ -191,38 +236,103 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * not using summary files.
+   * rowGroups are not skipped
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
   public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
-    final FileSystem fs = fileStatus.getPath().getFileSystem(configuration);
-    List<FileStatus> statuses;
-    if (fileStatus.isDir()) {
-      statuses = Arrays.asList(fs.listStatus(fileStatus.getPath(), new Utils.OutputFileUtils.OutputFilesFilter()));
-    } else {
-      statuses = new ArrayList<FileStatus>();
-      statuses.add(fileStatus);
-    }
-    return readAllFootersInParallel(configuration, statuses);
+    List<FileStatus> statuses = listFiles(configuration, fileStatus);
+    return readAllFootersInParallel(configuration, statuses, false);
   }
 
+  @Deprecated
+  public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
+    return readFooters(configuration, status(configuration, path));
+  }
+
+  private static FileStatus status(Configuration configuration, Path path) throws IOException {
+    return path.getFileSystem(configuration).getFileStatus(path);
+  }
+
+  /**
+   * this always returns the row groups
+   * @param configuration
+   * @param pathStatus
+   * @return
+   * @throws IOException
+   */
+  @Deprecated
   public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
-    try {
-      if (pathStatus.isDir()) {
-        Path summaryPath = new Path(pathStatus.getPath(), PARQUET_METADATA_FILE);
-        FileSystem fs = summaryPath.getFileSystem(configuration);
-        if (fs.exists(summaryPath)) {
-          FileStatus summaryStatus = fs.getFileStatus(summaryPath);
-          return readSummaryFile(configuration, summaryStatus);
+    return readFooters(configuration, pathStatus, false);
+  }
+
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * using summary files if possible
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
+  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
+    List<FileStatus> files = listFiles(configuration, pathStatus);
+    return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
+  }
+
+  private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
+    if (fileStatus.isDir()) {
+      FileSystem fs = fileStatus.getPath().getFileSystem(conf);
+      FileStatus[] list = fs.listStatus(fileStatus.getPath(), new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          return !p.getName().startsWith("_") && !p.getName().startsWith(".");
         }
+      });
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      for (FileStatus sub : list) {
+        result.addAll(listFiles(conf, sub));
       }
-    } catch (IOException e) {
-      LOG.warn("can not read summary file for " + pathStatus.getPath(), e);
+      return result;
+    } else {
+      return Arrays.asList(fileStatus);
     }
-    return readAllFootersInParallel(configuration, pathStatus);
-
   }
 
+  /**
+   * Specifically reads a given summary file
+   * @param configuration
+   * @param summaryStatus
+   * @return the metadata translated for each file
+   * @throws IOException
+   */
   public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
     final Path parent = summaryStatus.getPath().getParent();
-    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus);
+    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
+    return footersFromSummaryFile(parent, mergedFooters);
+  }
+
+  static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
+    Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
+    Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
+    FileSystem fileSystem = basePath.getFileSystem(configuration);
+    if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
+      // reading the summary file that does not contain the row groups
+      if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
+      return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
+    } else if (fileSystem.exists(metadataFile)) {
+      if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
+      return readFooter(configuration, metadataFile, filter(skipRowGroups));
+    } else {
+      return null;
+    }
+  }
+
+  static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
     Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
     List<BlockMetaData> blocks = mergedFooters.getBlocks();
     for (BlockMetaData block : blocks) {
@@ -249,25 +359,42 @@ public class ParquetFileReader implements Closeable {
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
+  @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
-    FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooter(configuration, fileSystem.getFileStatus(file));
+    return readFooter(configuration, file, NO_FILTER);
   }
 
-
-  public static final List<Footer> readFooters(Configuration configuration, Path file) throws IOException {
+  /**
+   * Reads the meta data in the footer of the file.
+   * Skipping row groups (or not) based on the provided filter
+   * @param configuration
+   * @param file the Parquet File
+   * @param filter the filter to apply to row groups
+   * @return the metadata with row groups filtered.
+   * @throws IOException  if an error occurs while reading the file
+   */
+  public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooters(configuration, fileSystem.getFileStatus(file));
+    return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+  }
+
+  /**
+   * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
+   */
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+    return readFooter(configuration, file, NO_FILTER);
   }
 
   /**
    * Reads the meta data block in the footer of the file
    * @param configuration
    * @param file the parquet File
+   * @param filter the filter to apply to row groups
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
-  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getPath().getFileSystem(configuration);
     FSDataInputStream f = fileSystem.open(file.getPath());
     try {
@@ -293,7 +420,7 @@ public class ParquetFileReader implements Closeable {
         throw new RuntimeException("corrupted file: the footer index is not within the file");
       }
       f.seek(footerIndex);
-      return parquetMetadataConverter.readParquetMetadata(f);
+      return parquetMetadataConverter.readParquetMetadata(f, filter);
     } finally {
       f.close();
     }
@@ -430,7 +557,7 @@ public class ParquetFileReader implements Closeable {
                     this.readAsBytesInput(pageHeader.compressed_page_size),
                     pageHeader.data_page_header.num_values,
                     pageHeader.uncompressed_page_size,
-                    parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
+                    ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index f3ef61b..42d91a4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -15,6 +15,9 @@
  */
 package parquet.hadoop;
 
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -49,9 +52,6 @@ import parquet.io.ParquetEncodingException;
 import parquet.schema.MessageType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
 /**
  * Internal implementation of the Parquet file writer as a block container
  *
@@ -62,6 +62,7 @@ public class ParquetFileWriter {
   private static final Log LOG = Log.getLog(ParquetFileWriter.class);
 
   public static final String PARQUET_METADATA_FILE = "_metadata";
+  public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
   public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
   public static final int CURRENT_VERSION = 1;
 
@@ -83,7 +84,7 @@ public class ParquetFileWriter {
   private long currentChunkFirstDataPage;
   private long currentChunkDictionaryPageOffset;
   private long currentChunkValueCount;
-  
+
   private Statistics currentStatistics;
 
   /**
@@ -387,19 +388,26 @@ public class ParquetFileWriter {
   }
 
   /**
-   * writes a _metadata file
+   * writes a _metadata and _common_metadata file
    * @param configuration the configuration to use to get the FileSystem
    * @param outputPath the directory to write the _metadata file to
    * @param footers the list of footers to merge
    * @throws IOException
    */
   public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
-    Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
     FileSystem fs = outputPath.getFileSystem(configuration);
     outputPath = outputPath.makeQualified(fs);
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+    metadataFooter.getBlocks().clear();
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+  }
+
+  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+      throws IOException {
+    Path metaDataPath = new Path(outputPath, parquetMetadataFile);
     FSDataOutputStream metadata = fs.create(metaDataPath);
     metadata.write(MAGIC);
-    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
     serializeFooter(metadataFooter, metadata);
     metadata.close();
   }
@@ -439,11 +447,10 @@ public class ParquetFileWriter {
    * @param footers the list files footers to merge
    * @return the global meta data for all the footers
    */
-  
   static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
     return getGlobalMetaData(footers, true);
   }
-  
+
   static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
     GlobalMetaData fileMetaData = null;
     for (Footer footer : footers) {
@@ -464,7 +471,7 @@ public class ParquetFileWriter {
       GlobalMetaData mergedMetadata) {
     return mergeInto(toMerge, mergedMetadata, true);
   }
-  
+
   static GlobalMetaData mergeInto(
       FileMetaData toMerge,
       GlobalMetaData mergedMetadata,
@@ -505,7 +512,7 @@ public class ParquetFileWriter {
   static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
     return mergeInto(toMerge, mergedSchema, true);
   }
-  
+
   /**
    * will return the result of merging toMerge into mergedSchema
    * @param toMerge the schema to merge into mergedSchema
@@ -517,7 +524,7 @@ public class ParquetFileWriter {
     if (mergedSchema == null) {
       return toMerge;
     }
-    
+
     return mergedSchema.union(toMerge, strict);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 0231ccd..d79ca51 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -15,9 +15,12 @@
  */
 package parquet.hadoop;
 
+import static parquet.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -62,8 +65,6 @@ import parquet.io.ParquetDecodingException;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
-import static parquet.Preconditions.checkArgument;
-
 /**
  * The input format to read a Parquet file.
  *
@@ -89,7 +90,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * key to configure the filter
    */
   public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
-  
+
   /**
    * key to configure type checking for conflicting schemas (default: true)
    */
@@ -100,11 +101,17 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
 
+  public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
+
   private static final int MIN_FOOTER_CACHE_SIZE = 100;
 
-  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+  public static void setTaskSideMetaData(Job job,  boolean taskSideMetadata) {
+    ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
+  }
 
-  private Class<?> readSupportClass;
+  public static boolean isTaskSideMetaData(Configuration configuration) {
+    return configuration.getBoolean(TASK_SIDE_METADATA, Boolean.FALSE);
+  }
 
   public static void setReadSupportClass(Job job,  Class<?> readSupportClass) {
     ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
@@ -181,6 +188,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
   }
 
+  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+
+  private Class<?> readSupportClass;
+
   /**
    * Hadoop will instantiate using this constructor
    */
@@ -202,11 +213,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public RecordReader<Void, T> createRecordReader(
       InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
-    ReadSupport<T> readSupport = getReadSupport(ContextUtil.getConfiguration(taskAttemptContext));
-
     Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
-
+    ReadSupport<T> readSupport = getReadSupport(conf);
     return new ParquetRecordReader<T>(readSupport, getFilter(conf));
   }
 
@@ -217,6 +225,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public ReadSupport<T> getReadSupport(Configuration configuration){
     try {
       if (readSupportClass == null) {
+        // TODO: fix this weird caching independent of the conf parameter
         readSupportClass = getReadSupportClass(configuration);
       }
       return (ReadSupport<T>)readSupportClass.newInstance();
@@ -227,195 +236,13 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
-  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
-  private static class HDFSBlocks {
-    BlockLocation[] hdfsBlocks;
-    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
-    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
-
-    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
-      this.hdfsBlocks = hdfsBlocks;
-      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
-        @Override
-        public int compare(BlockLocation b1, BlockLocation b2) {
-          return Long.signum(b1.getOffset() - b2.getOffset());
-        }
-      };
-      Arrays.sort(hdfsBlocks, comparator);
-    }
-
-    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
-      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
-      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
-    }
-
-    /**
-     * @param rowGroupMetadata
-     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
-     * return false if the mid point of row group is in the same hdfs block
-     */
-    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
-      boolean isNewHdfsBlock = false;
-      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
-
-      //if mid point is not in the current HDFS block any more, return true
-      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
-        isNewHdfsBlock = true;
-        currentMidPointHDFSBlockIndex++;
-        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
-                  + rowGroupMidPoint
-                  + ", the end of the hdfs block is "
-                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
-      }
-
-      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
-        currentStartHdfsBlockIndex++;
-        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
-                  + rowGroupMetadata.getStartingPos()
-                  + " but the end of hdfs blocks of file is "
-                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
-      }
-      return isNewHdfsBlock;
-    }
-
-    public BlockLocation get(int hdfsBlockIndex) {
-      return hdfsBlocks[hdfsBlockIndex];
-    }
-
-    public BlockLocation getCurrentBlock() {
-      return hdfsBlocks[currentStartHdfsBlockIndex];
-    }
-  }
-
-  private static class SplitInfo {
-    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
-    BlockLocation hdfsBlock;
-    long compressedByteSize = 0L;
-
-    public SplitInfo(BlockLocation currentBlock) {
-      this.hdfsBlock = currentBlock;
-    }
-
-    private void addRowGroup(BlockMetaData rowGroup) {
-      this.rowGroups.add(rowGroup);
-      this.compressedByteSize += rowGroup.getCompressedSize();
-    }
-
-    public long getCompressedByteSize() {
-      return compressedByteSize;
-    }
-
-    public List<BlockMetaData> getRowGroups() {
-      return rowGroups;
-    }
-
-    int getRowGroupCount() {
-      return rowGroups.size();
-    }
-
-    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, FileMetaData fileMetaData, String requestedSchema, Map<String, String> readSupportMetadata, String fileSchema) throws IOException {
-      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
-      long length = 0;
-
-      for (BlockMetaData block : this.getRowGroups()) {
-        List<ColumnChunkMetaData> columns = block.getColumns();
-        for (ColumnChunkMetaData column : columns) {
-          if (requested.containsPath(column.getPath().toArray())) {
-            length += column.getTotalSize();
-          }
-        }
-      }
-      return new ParquetInputSplit(
-              fileStatus.getPath(),
-              hdfsBlock.getOffset(),
-              length,
-              hdfsBlock.getHosts(),
-              this.getRowGroups(),
-              requestedSchema,
-              fileSchema,
-              fileMetaData.getKeyValueMetaData(),
-              readSupportMetadata
-      );
-    }
-  }
-
-  /**
-   * groups together all the data blocks for the same HDFS block
-   *
-   * @param rowGroupBlocks      data blocks (row groups)
-   * @param hdfsBlocksArray     hdfs blocks
-   * @param fileStatus          the containing file
-   * @param fileMetaData        file level meta data
-   * @param requestedSchema     the schema requested by the user
-   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
-   * @param minSplitSize        the mapred.min.split.size
-   * @param maxSplitSize        the mapred.max.split.size
-   * @return the splits (one per HDFS block)
-   * @throws IOException If hosts can't be retrieved for the HDFS block
-   */
-  static <T> List<ParquetInputSplit> generateSplits(
-          List<BlockMetaData> rowGroupBlocks,
-          BlockLocation[] hdfsBlocksArray,
-          FileStatus fileStatus,
-          FileMetaData fileMetaData,
-          String requestedSchema,
-          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
-    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
-    }
-    String fileSchema = fileMetaData.getSchema().toString().intern();
-    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
-    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
-    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-
-    //assign rowGroups to splits
-    List<SplitInfo> splitRowGroups = new ArrayList<SplitInfo>();
-    checkSorted(rowGroupBlocks);//assert row groups are sorted
-    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
-      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
-             && currentSplit.getCompressedByteSize() >= minSplitSize
-             && currentSplit.getCompressedByteSize() > 0)
-           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
-        //create a new split
-        splitRowGroups.add(currentSplit);//finish previous split
-        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-      }
-      currentSplit.addRowGroup(rowGroupMetadata);
-    }
-
-    if (currentSplit.getRowGroupCount() > 0) {
-      splitRowGroups.add(currentSplit);
-    }
-
-    //generate splits from rowGroups of each split
-    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
-    for (SplitInfo splitInfo : splitRowGroups) {
-      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, fileMetaData, requestedSchema, readSupportMetadata, fileSchema);
-      resultSplits.add(split);
-    }
-    return resultSplits;
-  }
-
-  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
-    long previousOffset = 0L;
-    for(BlockMetaData rowGroup: rowGroupBlocks) {
-      long currentOffset = rowGroup.getStartingPos();
-      if (currentOffset < previousOffset) {
-        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
-      }
-    }
-  }
-
   /**
    * {@inheritDoc}
    */
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    splits.addAll(getSplits(ContextUtil.getConfiguration(jobContext), getFooters(jobContext)));
-    return splits;
+    Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    return new ArrayList<InputSplit>(getSplits(configuration, getFooters(jobContext)));
   }
 
   /**
@@ -425,63 +252,20 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    */
   public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
+    boolean taskSideMetaData = isTaskSideMetaData(configuration);
+    boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
     final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
     if (maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
+      throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
     }
-    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
+    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
     ReadContext readContext = getReadSupport(configuration).init(new InitContext(
         configuration,
         globalMetaData.getKeyValueMetaData(),
         globalMetaData.getSchema()));
 
-    Filter filter = getFilter(configuration);
-
-    long rowGroupsDropped = 0;
-    long totalRowGroups = 0;
-
-    for (Footer footer : footers) {
-      final Path file = footer.getFile();
-      LOG.debug(file);
-      FileSystem fs = file.getFileSystem(configuration);
-      FileStatus fileStatus = fs.getFileStatus(file);
-      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
-      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
-
-      List<BlockMetaData> filteredBlocks = blocks;
-
-      totalRowGroups += blocks.size();
-      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
-      rowGroupsDropped += blocks.size() - filteredBlocks.size();
-
-      if (filteredBlocks.isEmpty()) {
-        continue;
-      }
-
-      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      splits.addAll(
-          generateSplits(
-              filteredBlocks,
-              fileBlockLocations,
-              fileStatus,
-              parquetMetaData.getFileMetaData(),
-              readContext.getRequestedSchema().toString(),
-              readContext.getReadSupportMetadata(),
-              minSplitSize,
-              maxSplitSize)
-          );
-    }
-
-    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
-      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
-      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
-    } else {
-      LOG.info("There were no row groups that could be dropped due to filter predicates");
-    }
-
-    return splits;
+    return SplitStrategy.getSplitStrategy(taskSideMetaData).getSplits(configuration, footers, maxSplitSize, minSplitSize, readContext);
   }
 
   /*
@@ -539,7 +323,6 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     if (statuses.isEmpty()) {
       return Collections.emptyList();
     }
-
     Configuration config = ContextUtil.getConfiguration(jobContext);
     List<Footer> footers = new ArrayList<Footer>(statuses.size());
     Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
@@ -575,8 +358,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
       return footers;
     }
 
-    List<Footer> newFooters =
-            getFooters(config, new ArrayList<FileStatus>(missingStatuses));
+    List<Footer> newFooters = getFooters(config, missingStatuses);
     for (Footer newFooter : newFooters) {
       // Use the original file status objects to make sure we store a
       // conservative (older) modification time (i.e. in case the files and
@@ -590,6 +372,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     return footers;
   }
 
+  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+    return getFooters(configuration, (Collection<FileStatus>)statuses);
+  }
+
   /**
    * the footers for the files
    * @param configuration to connect to the file system
@@ -597,9 +383,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * @return the footers of the files
    * @throws IOException
    */
-  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+  public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
     if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
-    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
+    boolean taskSideMetaData = isTaskSideMetaData(configuration);
+    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
   }
 
   /**
@@ -688,3 +475,362 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
 }
+abstract class SplitStrategy {
+  private static final Log LOG = Log.getLog(SplitStrategy.class);
+
+  static SplitStrategy getSplitStrategy(boolean taskSideMetaData) {
+    if (taskSideMetaData) {
+      LOG.info("Using Task Side Metadata Split Strategy");
+      return new TaskSideMetadataSplitStrategy();
+    } else {
+      LOG.info("Using Client Side Metadata Split Strategy");
+      return new ClientSideMetadataSplitStrategy();
+    }
+  }
+
+  abstract List<ParquetInputSplit> getSplits(
+      Configuration configuration,
+      List<Footer> footers,
+      final long maxSplitSize, final long minSplitSize,
+      ReadContext readContext) throws IOException;
+}
+class TaskSideMetadataSplitStrategy extends SplitStrategy {
+
+  @Override
+  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+      long maxSplitSize, long minSplitSize, ReadContext readContext) throws IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    for (Footer footer : footers) {
+      // TODO: keep status in Footer
+      final Path file = footer.getFile();
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus fileStatus = fs.getFileStatus(file);
+      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      splits.addAll(generateTaskSideMDSplits(
+          fileBlockLocations,
+          fileStatus,
+          readContext.getRequestedSchema().toString(),
+          readContext.getReadSupportMetadata(),
+          minSplitSize,
+          maxSplitSize));
+
+    }
+    return splits;
+  }
+
+  private static int findBlockIndex(BlockLocation[] hdfsBlocksArray, long endOffset) {
+    for (int i = 0; i < hdfsBlocksArray.length; i++) {
+      BlockLocation block = hdfsBlocksArray[i];
+      // end offset is exclusive. We want the block that contains the point right before.
+      if (endOffset > block.getOffset() && endOffset <= (block.getOffset() + block.getLength())) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  static <T> List<ParquetInputSplit> generateTaskSideMDSplits(
+      BlockLocation[] hdfsBlocksArray,
+      FileStatus fileStatus,
+      String requestedSchema,
+      Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    // [startOffset, endOffset)
+    long startOffset = 0;
+    long endOffset = 0;
+    // they should already be sorted
+    Arrays.sort(hdfsBlocksArray, new Comparator<BlockLocation>() {
+      @Override
+      public int compare(BlockLocation o1, BlockLocation o2) {
+        return compare(o1.getOffset(), o2.getOffset());
+      }
+      private int compare(long x, long y) {
+        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+      }
+    });
+    final BlockLocation lastBlock =
+        hdfsBlocksArray.length == 0 ? null : hdfsBlocksArray[hdfsBlocksArray.length - 1];
+    while (endOffset < fileStatus.getLen()) {
+      startOffset = endOffset;
+      BlockLocation blockLocation;
+      final int nextBlockMin = findBlockIndex(hdfsBlocksArray, startOffset + minSplitSize);
+      final int nextBlockMax = findBlockIndex(hdfsBlocksArray, startOffset + maxSplitSize);
+      if (nextBlockMax == nextBlockMin && nextBlockMax != -1) {
+        // no block boundary between min and max
+        // => use max for the size of the split
+        endOffset = startOffset + maxSplitSize;
+        blockLocation = hdfsBlocksArray[nextBlockMax];
+      } else if (nextBlockMin > -1) {
+        // block boundary between min and max
+        // we end the split at the first block boundary
+        blockLocation = hdfsBlocksArray[nextBlockMin];
+        endOffset = blockLocation.getOffset() + blockLocation.getLength();
+      } else {
+        // min and max after last block
+        // small last split
+        endOffset = fileStatus.getLen();
+        blockLocation = lastBlock;
+      }
+      resultSplits.add(
+          new ParquetInputSplit(
+              fileStatus.getPath(),
+              startOffset, endOffset, endOffset - startOffset,
+              blockLocation == null ? new String[0] : blockLocation.getHosts(),
+              null,
+              requestedSchema, readSupportMetadata));
+    }
+    return resultSplits;
+  }
+}
+class ClientSideMetadataSplitStrategy extends SplitStrategy {
+  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
+  private static class HDFSBlocks {
+    BlockLocation[] hdfsBlocks;
+    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
+    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
+
+    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
+      this.hdfsBlocks = hdfsBlocks;
+      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
+        @Override
+        public int compare(BlockLocation b1, BlockLocation b2) {
+          return Long.signum(b1.getOffset() - b2.getOffset());
+        }
+      };
+      Arrays.sort(hdfsBlocks, comparator);
+    }
+
+    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
+      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
+      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
+    }
+
+    /**
+     * @param rowGroupMetadata
+     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
+     * return false if the mid point of row group is in the same hdfs block
+     */
+    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
+      boolean isNewHdfsBlock = false;
+      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
+
+      //if mid point is not in the current HDFS block any more, return true
+      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
+        isNewHdfsBlock = true;
+        currentMidPointHDFSBlockIndex++;
+        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+                  + rowGroupMidPoint
+                  + ", the end of the hdfs block is "
+                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
+      }
+
+      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
+        currentStartHdfsBlockIndex++;
+        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+                  + rowGroupMetadata.getStartingPos()
+                  + " but the end of hdfs blocks of file is "
+                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
+      }
+      return isNewHdfsBlock;
+    }
+
+    public BlockLocation getCurrentBlock() {
+      return hdfsBlocks[currentStartHdfsBlockIndex];
+    }
+  }
+
+  static class SplitInfo {
+    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
+    BlockLocation hdfsBlock;
+    long compressedByteSize = 0L;
+
+    public SplitInfo(BlockLocation currentBlock) {
+      this.hdfsBlock = currentBlock;
+    }
+
+    private void addRowGroup(BlockMetaData rowGroup) {
+      this.rowGroups.add(rowGroup);
+      this.compressedByteSize += rowGroup.getCompressedSize();
+    }
+
+    public long getCompressedByteSize() {
+      return compressedByteSize;
+    }
+
+    public List<BlockMetaData> getRowGroups() {
+      return rowGroups;
+    }
+
+    int getRowGroupCount() {
+      return rowGroups.size();
+    }
+
+    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
+      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+      long length = 0;
+
+      for (BlockMetaData block : this.getRowGroups()) {
+        List<ColumnChunkMetaData> columns = block.getColumns();
+        for (ColumnChunkMetaData column : columns) {
+          if (requested.containsPath(column.getPath().toArray())) {
+            length += column.getTotalSize();
+          }
+        }
+      }
+
+      BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
+      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+      long[] rowGroupOffsets = new long[this.getRowGroupCount()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
+      }
+
+      return new ParquetInputSplit(
+              fileStatus.getPath(),
+              hdfsBlock.getOffset(),
+              end,
+              length,
+              hdfsBlock.getHosts(),
+              rowGroupOffsets,
+              requestedSchema,
+              readSupportMetadata
+      );
+    }
+  }
+
+  private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
+
+  @Override
+  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+      long maxSplitSize, long minSplitSize, ReadContext readContext)
+      throws IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    Filter filter = ParquetInputFormat.getFilter(configuration);
+
+    long rowGroupsDropped = 0;
+    long totalRowGroups = 0;
+
+    for (Footer footer : footers) {
+      final Path file = footer.getFile();
+      LOG.debug(file);
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus fileStatus = fs.getFileStatus(file);
+      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
+      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+      List<BlockMetaData> filteredBlocks;
+
+      totalRowGroups += blocks.size();
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+      rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+      if (filteredBlocks.isEmpty()) {
+        continue;
+      }
+
+      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      splits.addAll(
+          generateSplits(
+              filteredBlocks,
+              fileBlockLocations,
+              fileStatus,
+              readContext.getRequestedSchema().toString(),
+              readContext.getReadSupportMetadata(),
+              minSplitSize,
+              maxSplitSize)
+          );
+    }
+
+    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+    } else {
+      LOG.info("There were no row groups that could be dropped due to filter predicates");
+    }
+    return splits;
+  }
+
+  /**
+   * groups together all the data blocks for the same HDFS block
+   *
+   * @param rowGroupBlocks      data blocks (row groups)
+   * @param hdfsBlocksArray     hdfs blocks
+   * @param fileStatus          the containing file
+   * @param requestedSchema     the schema requested by the user
+   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
+   * @param minSplitSize        the mapred.min.split.size
+   * @param maxSplitSize        the mapred.max.split.size
+   * @return the splits (one per HDFS block)
+   * @throws IOException If hosts can't be retrieved for the HDFS block
+   */
+  static <T> List<ParquetInputSplit> generateSplits(
+          List<BlockMetaData> rowGroupBlocks,
+          BlockLocation[] hdfsBlocksArray,
+          FileStatus fileStatus,
+          String requestedSchema,
+          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+
+    List<SplitInfo> splitRowGroups =
+        generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
+
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    for (SplitInfo splitInfo : splitRowGroups) {
+      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
+      resultSplits.add(split);
+    }
+    return resultSplits;
+  }
+
+  static List<SplitInfo> generateSplitInfo(
+      List<BlockMetaData> rowGroupBlocks,
+      BlockLocation[] hdfsBlocksArray,
+      long minSplitSize, long maxSplitSize) {
+    List<SplitInfo> splitRowGroups;
+
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
+    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
+    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
+    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+
+    //assign rowGroups to splits
+    splitRowGroups = new ArrayList<SplitInfo>();
+    checkSorted(rowGroupBlocks);//assert row groups are sorted
+    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
+      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
+             && currentSplit.getCompressedByteSize() >= minSplitSize
+             && currentSplit.getCompressedByteSize() > 0)
+           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
+        //create a new split
+        splitRowGroups.add(currentSplit);//finish previous split
+        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+      }
+      currentSplit.addRowGroup(rowGroupMetadata);
+    }
+
+    if (currentSplit.getRowGroupCount() > 0) {
+      splitRowGroups.add(currentSplit);
+    }
+
+    return splitRowGroups;
+  }
+
+  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
+    long previousOffset = 0L;
+    for(BlockMetaData rowGroup: rowGroupBlocks) {
+      long currentOffset = rowGroup.getStartingPos();
+      if (currentOffset < previousOffset) {
+        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index da0c2ec..399be64 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -15,54 +15,47 @@
  */
 package parquet.hadoop;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-import parquet.Log;
-import parquet.column.Encoding;
-import parquet.column.statistics.IntStatistics;
-import parquet.common.schema.ColumnPath;
+import parquet.bytes.BytesUtils;
 import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 /**
  * An input split for the Parquet format
  * It contains the information to read one block of the file.
  *
+ * This class is private to the ParquetInputFormat.
+ * Backward compatibility is not maintained.
+ *
  * @author Julien Le Dem
  */
+@Private
 public class ParquetInputSplit extends FileSplit implements Writable {
 
-  private static final Log LOG = Log.getLog(ParquetInputSplit.class);
 
-  private List<BlockMetaData> blocks;
+  private long end;
+  private long[] rowGroupOffsets;
   private String requestedSchema;
-  private String fileSchema;
-  private Map<String, String> extraMetadata;
   private Map<String, String> readSupportMetadata;
 
-
   /**
    * Writables must have a parameterless constructor
    */
@@ -71,19 +64,19 @@ public class ParquetInputSplit extends FileSplit implements Writable {
   }
 
   /**
-   * Used by {@link ParquetInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
-   * @param path the path to the file
-   * @param start the offset of the block in the file
-   * @param length the size of the block in the file
-   * @param hosts the hosts where this block can be found
-   * @param blocks the block meta data (Columns locations)
-   * @param schema the file schema
-   * @param readSupportClass the class used to materialize records
-   * @param requestedSchema the requested schema for materialization
-   * @param fileSchema the schema of the file
-   * @param extraMetadata the app specific meta data in the file
-   * @param readSupportMetadata the read support specific metadata
+   * For compatibility only
+   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[], String, Map)}
+   * @param path
+   * @param start
+   * @param length
+   * @param hosts
+   * @param blocks
+   * @param requestedSchema
+   * @param fileSchema
+   * @param extraMetadata
+   * @param readSupportMetadata
    */
+  @Deprecated
   public ParquetInputSplit(
       Path path,
       long start,
@@ -94,212 +87,168 @@ public class ParquetInputSplit extends FileSplit implements Writable {
       String fileSchema,
       Map<String, String> extraMetadata,
       Map<String, String> readSupportMetadata) {
-    super(path, start, length, hosts);
-    this.blocks = blocks;
-    this.requestedSchema = requestedSchema;
-    this.fileSchema = fileSchema;
-    this.extraMetadata = extraMetadata;
-    this.readSupportMetadata = readSupportMetadata;
+    this(
+        path, start, length, end(blocks), hosts,
+        offsets(blocks),
+        requestedSchema, readSupportMetadata
+        );
+  }
+
+  private static long end(List<BlockMetaData> blocks) {
+    BlockMetaData last = blocks.get(blocks.size() - 1);
+    return last.getStartingPos() + last.getCompressedSize();
+  }
+
+  private static long[] offsets(List<BlockMetaData> blocks) {
+    long[] offsets = new long[blocks.size()];
+    for (int i = 0; i < offsets.length; i++) {
+      offsets[i] = blocks.get(0).getStartingPos();
+    }
+    return offsets;
   }
 
   /**
-   * @return the block meta data
+   * @param file the path of the file for that split
+   * @param start the start offset in the file
+   * @param end the end offset in the file
+   * @param length the actual size in bytes that we expect to read
+   * @param hosts the hosts with the replicas of this data
+   * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
+   * @param requestedSchema the user requested schema
+   * @param readSupportMetadata metadata from the read support
    */
-  public List<BlockMetaData> getBlocks() {
-    return blocks;
+  public ParquetInputSplit(
+      Path file, long start, long end, long length, String[] hosts,
+      long[] rowGroupOffsets,
+      String requestedSchema,
+      Map<String, String> readSupportMetadata) {
+    super(file, start, length, hosts);
+    this.end = end;
+    this.rowGroupOffsets = rowGroupOffsets;
+    this.requestedSchema = requestedSchema;
+    this.readSupportMetadata = readSupportMetadata;
   }
 
   /**
    * @return the requested schema
    */
-  public String getRequestedSchema() {
+  String getRequestedSchema() {
     return requestedSchema;
   }
 
   /**
-   * @return the file schema
+   * @return the end offset of that split
    */
-  public String getFileSchema() {
-    return fileSchema;
+  long getEnd() {
+    return end;
   }
 
   /**
-   * @return app specific metadata from the file
+   * @return app specific metadata provided by the read support in the init phase
    */
-  public Map<String, String> getExtraMetadata() {
-    return extraMetadata;
+  Map<String, String> getReadSupportMetadata() {
+    return readSupportMetadata;
   }
 
   /**
-   * @return app specific metadata provided by the read support in the init phase
+   * @return the offsets of the row group selected if this has been determined on the client side
    */
-  public Map<String, String> getReadSupportMetadata() {
-    return readSupportMetadata;
+  long[] getRowGroupOffsets() {
+    return rowGroupOffsets;
+  }
+
+  @Override
+  public String toString() {
+    String hosts;
+    try{
+       hosts = Arrays.toString(getLocations());
+    } catch (Exception e) {
+      // IOException/InterruptedException could be thrown
+      hosts = "(" + e + ")";
+    }
+
+    return this.getClass().getSimpleName() + "{" +
+           "part: " + getPath()
+        + " start: " + getStart()
+        + " end: " + getEnd()
+        + " length: " + getLength()
+        + " hosts: " + hosts
+        + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
+        + " requestedSchema: " +  requestedSchema
+        + " readSupportMetadata: " + readSupportMetadata
+        + "}";
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void readFields(DataInput in) throws IOException {
+  final public void readFields(DataInput hin) throws IOException {
+    byte[] bytes = readArray(hin);
+    DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
     super.readFields(in);
-    int blocksSize = in.readInt();
-    this.blocks = new ArrayList<BlockMetaData>(blocksSize);
-    for (int i = 0; i < blocksSize; i++) {
-      blocks.add(readBlock(in));
+    this.end = in.readLong();
+    if (in.readBoolean()) {
+      this.rowGroupOffsets = new long[in.readInt()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = in.readLong();
+      }
     }
-    this.requestedSchema = decompressString(in);
-    this.fileSchema = decompressString(in);
-    this.extraMetadata = readKeyValues(in);
+    this.requestedSchema = readUTF8(in);
     this.readSupportMetadata = readKeyValues(in);
+    in.close();
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void write(DataOutput out) throws IOException {
+  final public void write(DataOutput hout) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
     super.write(out);
-    out.writeInt(blocks.size());
-    for (BlockMetaData block : blocks) {
-      writeBlock(out, block);
-    }
-    byte[] compressedSchema = compressString(requestedSchema);
-    out.writeInt(compressedSchema.length);
-    out.write(compressedSchema);
-    compressedSchema = compressString(fileSchema);
-    out.writeInt(compressedSchema.length);
-    out.write(compressedSchema);
-    writeKeyValues(out, extraMetadata);
-    writeKeyValues(out, readSupportMetadata);
-  }
-
-  byte[] compressString(String str) {
-    ByteArrayOutputStream obj = new ByteArrayOutputStream();
-    GZIPOutputStream gzip;
-    try {
-      gzip = new GZIPOutputStream(obj);
-      gzip.write(str.getBytes("UTF-8"));
-      gzip.close();
-    } catch (IOException e) {
-      // Not really sure how we can get here. I guess the best thing to do is to croak.
-      LOG.error("Unable to gzip InputSplit string " + str, e);
-      throw new RuntimeException("Unable to gzip InputSplit string", e);
-    }
-    return obj.toByteArray();
-  }
-
-  String decompressString(DataInput in) throws IOException {
-    int len = in.readInt();
-    byte[] bytes = new byte[len];
-    in.readFully(bytes);
-    return decompressString(bytes);
-  }
-
-  String decompressString(byte[] bytes) {
-    ByteArrayInputStream obj = new ByteArrayInputStream(bytes);
-    GZIPInputStream gzip = null;
-    String outStr = "";
-    try {
-      gzip = new GZIPInputStream(obj);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(gzip, "UTF-8"));
-      char[] buffer = new char[1024];
-      int n = 0;
-      StringBuilder sb = new StringBuilder();
-      while (-1 != (n = reader.read(buffer))) {
-        sb.append(buffer, 0, n);
-      }
-      outStr = sb.toString();
-    } catch (IOException e) {
-      // Not really sure how we can get here. I guess the best thing to do is to croak.
-      LOG.error("Unable to uncompress InputSplit string", e);
-      throw new RuntimeException("Unable to uncompress InputSplit String", e);
-    } finally {
-      if (null != gzip) {
-        try {
-          gzip.close();
-        } catch (IOException e) {
-          LOG.error("Unable to uncompress InputSplit", e);
-          throw new RuntimeException("Unable to uncompress InputSplit String", e);
-        }
+    out.writeLong(end);
+    out.writeBoolean(rowGroupOffsets != null);
+    if (rowGroupOffsets != null) {
+      out.writeInt(rowGroupOffsets.length);
+      for (long o : rowGroupOffsets) {
+        out.writeLong(o);
       }
     }
-    return outStr;
+    writeUTF8(out, requestedSchema);
+    writeKeyValues(out, readSupportMetadata);
+    out.close();
+    writeArray(hout, baos.toByteArray());
   }
 
-  private BlockMetaData readBlock(DataInput in) throws IOException {
-    final BlockMetaData block = new BlockMetaData();
-    int size = in.readInt();
-    for (int i = 0; i < size; i++) {
-      block.addColumn(readColumn(in));
-    }
-    block.setRowCount(in.readLong());
-    block.setTotalByteSize(in.readLong());
-    if (!in.readBoolean()) {
-      block.setPath(in.readUTF().intern());
-    }
-    return block;
+  private static void writeUTF8(DataOutput out, String string) throws IOException {
+    byte[] bytes = string.getBytes(BytesUtils.UTF8);
+    writeArray(out, bytes);
   }
 
-  private void writeBlock(DataOutput out, BlockMetaData block)
-      throws IOException {
-    out.writeInt(block.getColumns().size());
-    for (ColumnChunkMetaData column : block.getColumns()) {
-      writeColumn(out, column);
-    }
-    out.writeLong(block.getRowCount());
-    out.writeLong(block.getTotalByteSize());
-    out.writeBoolean(block.getPath() == null);
-    if (block.getPath() != null) {
-      out.writeUTF(block.getPath());
-    }
+  private static String readUTF8(DataInput in) throws IOException {
+    byte[] bytes = readArray(in);
+    return new String(bytes, BytesUtils.UTF8).intern();
   }
 
-  private ColumnChunkMetaData readColumn(DataInput in)
-      throws IOException {
-    CompressionCodecName codec = CompressionCodecName.values()[in.readInt()];
-    String[] columnPath = new String[in.readInt()];
-    for (int i = 0; i < columnPath.length; i++) {
-      columnPath[i] = in.readUTF().intern();
-    }
-    PrimitiveTypeName type = PrimitiveTypeName.values()[in.readInt()];
-    int encodingsSize = in.readInt();
-    Set<Encoding> encodings = new HashSet<Encoding>(encodingsSize);
-    for (int i = 0; i < encodingsSize; i++) {
-      encodings.add(Encoding.values()[in.readInt()]);
-    }
-    IntStatistics emptyStats = new IntStatistics();
-    ColumnChunkMetaData column = ColumnChunkMetaData.get(
-        ColumnPath.get(columnPath), type, codec, encodings, emptyStats,
-        in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
-    return column;
+  private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
+    out.writeInt(bytes.length);
+    out.write(bytes, 0, bytes.length);
   }
 
-  private void writeColumn(DataOutput out, ColumnChunkMetaData column)
-      throws IOException {
-    out.writeInt(column.getCodec().ordinal());
-    out.writeInt(column.getPath().size());
-    for (String s : column.getPath()) {
-      out.writeUTF(s);
-    }
-    out.writeInt(column.getType().ordinal());
-    out.writeInt(column.getEncodings().size());
-    for (Encoding encoding : column.getEncodings()) {
-      out.writeInt(encoding.ordinal());
-    }
-    out.writeLong(column.getFirstDataPageOffset());
-    out.writeLong(column.getDictionaryPageOffset());
-    out.writeLong(column.getValueCount());
-    out.writeLong(column.getTotalSize());
-    out.writeLong(column.getTotalUncompressedSize());
+  private static byte[] readArray(DataInput in) throws IOException {
+    int len = in.readInt();
+    byte[] bytes = new byte[len];
+    in.readFully(bytes);
+    return bytes;
   }
 
   private Map<String, String> readKeyValues(DataInput in) throws IOException {
     int size = in.readInt();
     Map<String, String> map = new HashMap<String, String>(size);
     for (int i = 0; i < size; i++) {
-      String key = decompressString(in).intern();
-      String value = decompressString(in).intern();
+      String key = readUTF8(in).intern();
+      String value = readUTF8(in).intern();
       map.put(key, value);
     }
     return map;
@@ -311,35 +260,10 @@ public class ParquetInputSplit extends FileSplit implements Writable {
     } else {
       out.writeInt(map.size());
       for (Entry<String, String> entry : map.entrySet()) {
-        byte[] compr = compressString(entry.getKey());
-        out.writeInt(compr.length);
-        out.write(compr);
-        compr = compressString(entry.getValue());
-        out.writeInt(compr.length);
-        out.write(compr);
+        writeUTF8(out, entry.getKey());
+        writeUTF8(out, entry.getValue());
       }
     }
   }
 
-
-  @Override
-  public String toString() {
-    String hosts[] = {};
-    try{
-       hosts = getLocations();
-    }catch(Exception ignore){} // IOException/InterruptedException could be thrown
-
-    return this.getClass().getSimpleName() + "{" +
-           "part: " + getPath()
-        + " start: " + getStart()
-        + " length: " + getLength()
-        + " hosts: " + Arrays.toString(hosts)
-        + " blocks: " + blocks.size()
-        + " requestedSchema: " + (fileSchema.equals(requestedSchema) ? "same as file" : requestedSchema)
-        + " fileSchema: " + fileSchema
-        + " extraMetadata: " + extraMetadata
-        + " readSupportMetadata: " + readSupportMetadata
-        + "}";
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 31917d2..940b893 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -18,7 +18,6 @@ package parquet.hadoop;
 import java.io.IOException;
 import java.util.List;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 6703001..74f4051 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import parquet.Log;
-import parquet.Preconditions;
 import parquet.column.ParquetProperties.WriterVersion;
 import parquet.hadoop.api.WriteSupport;
 import parquet.hadoop.api.WriteSupport.WriteContext;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index c56a402..ec839e2 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -15,6 +15,8 @@
  */
 package parquet.hadoop;
 
+import static parquet.Preconditions.checkNotNull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -39,8 +41,6 @@ import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.GlobalMetaData;
 import parquet.schema.MessageType;
 
-import static parquet.Preconditions.checkNotNull;
-
 /**
  * Read records from a Parquet file.
  * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
@@ -114,7 +114,7 @@ public class ParquetReader<T> implements Closeable {
 
     FileSystem fs = file.getFileSystem(conf);
     List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
     this.footersIterator = footers.iterator();
     globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
     MessageType schema = globalMetaData.getSchema();


Mime
View raw message