parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tians...@apache.org
Subject [1/2] PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.
Date Fri, 05 Sep 2014 18:33:07 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 647b8a70f -> 5dafd127f


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 67c7dd7..6b89e37 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -15,9 +15,18 @@
  */
 package parquet.hadoop;
 
+import static parquet.format.converter.ParquetMetadataConverter.range;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -27,10 +36,14 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.RowGroupFilter;
 import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ContextUtil;
 import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
 /**
@@ -44,7 +57,7 @@ import parquet.schema.MessageTypeParser;
  */
 public class ParquetRecordReader<T> extends RecordReader<Void, T> {
 
-  private static final Log LOG= Log.getLog(ParquetRecordReader.class);
+  private static final Log LOG = Log.getLog(ParquetRecordReader.class);
   private final InternalParquetRecordReader<T> internalReader;
 
   /**
@@ -113,9 +126,9 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
       throws IOException, InterruptedException {
     if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
       BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context);
-    }else{
+    } else {
       LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is "
-              +context.getClass().getCanonicalName());
+              + context.getClass().getCanonicalName());
     }
 
     initializeInternalReader((ParquetInputSplit)inputSplit, ContextUtil.getConfiguration(context));
@@ -128,12 +141,53 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
   }
 
   private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
-
+    Path path = split.getPath();
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+        configuration, path, range(split.getStart(), split.getEnd()));
+    long[] rowGroupOffsets = split.getRowGroupOffsets();
+    List<BlockMetaData> filteredBlocks;
+    // if task.side.metadata is set, rowGroupOffsets is null
+    MessageType fileSchema = footer.getFileMetaData().getSchema();
+    if (rowGroupOffsets == null) {
+      // then we need to apply the predicate push down filter
+      Filter filter = ParquetInputFormat.getFilter(configuration);
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema);
+    } else {
+      // otherwise we find the row groups that were selected on the client
+      Set<Long> offsets = new HashSet<Long>();
+      for (long offset : rowGroupOffsets) {
+        offsets.add(offset);
+      }
+      filteredBlocks = new ArrayList<BlockMetaData>();
+      for (BlockMetaData block : footer.getBlocks()) {
+        if (offsets.contains(block.getStartingPos())) {
+          filteredBlocks.add(block);
+        }
+      }
+      // verify we found them all
+      if (filteredBlocks.size() != rowGroupOffsets.length) {
+        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+        }
+        // this should never happen.
+        // provide a good error message in case there's a bug
+        throw new IllegalStateException(
+            "All the offsets listed in the split should be found in the file."
+            + " expected: " + Arrays.toString(rowGroupOffsets)
+            + " found: " + filteredBlocks
+            + " out of: " + Arrays.toString(foundRowGroupOffsets)
+            + " in range " + split.getStart() + ", " + split.getEnd());
+      }
+    }
+    MessageType requestedSchema = MessageTypeParser.parseMessageType(split.getRequestedSchema());
+    Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
+    Map<String, String> readSupportMetadata = split.getReadSupportMetadata();
     internalReader.initialize(
-        MessageTypeParser.parseMessageType(split.getRequestedSchema()),
-        MessageTypeParser.parseMessageType(split.getFileSchema()),
-        split.getExtraMetadata(), split.getReadSupportMetadata(), split.getPath(),
-        split.getBlocks(), configuration);
+        requestedSchema,fileSchema,
+        fileMetaData, readSupportMetadata,
+        path,
+        filteredBlocks, configuration);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java b/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
index 68a2fa5..500b016 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
@@ -15,6 +15,7 @@
  */
 package parquet.hadoop;
 
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
 import java.net.URI;
@@ -99,7 +100,7 @@ public class PrintFooter {
             @Override
             public ParquetMetadata call() throws Exception {
               try {
-                ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile);
+                ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
                 return footer;
               } catch (Exception e) {
                 throw new ParquetDecodingException("could not read footer", e);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index 9544865..11c7c38 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -15,10 +15,11 @@
  */
 package parquet.hadoop.mapred;
 
+import static java.util.Arrays.asList;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.mapred.InputSplit;
@@ -31,7 +32,6 @@ import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.ParquetRecordReader;
 
-@SuppressWarnings("deprecation")
 public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
 
   protected ParquetInputFormat<V> realInputFormat = new ParquetInputFormat<V>();
@@ -46,22 +46,19 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     List<Footer> footers = getFooters(job);
     List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
-
-      if (splits == null) {
-        return null;
-      }
-
-      InputSplit[] resultSplits = new InputSplit[splits.size()];
-      int i = 0;
-      for (ParquetInputSplit split : splits) {
-          resultSplits[i++] = new ParquetInputSplitWrapper(split);
-      }
-
-      return resultSplits;
+    if (splits == null) {
+      return null;
+    }
+    InputSplit[] resultSplits = new InputSplit[splits.size()];
+    int i = 0;
+    for (ParquetInputSplit split : splits) {
+      resultSplits[i++] = new ParquetInputSplitWrapper(split);
+    }
+    return resultSplits;
   }
 
   public List<Footer> getFooters(JobConf job) throws IOException {
-    return realInputFormat.getFooters(job, Arrays.asList(super.listStatus(job)));
+    return realInputFormat.getFooters(job, asList(super.listStatus(job)));
   }
 
   private static class RecordReaderWrapper<V> implements RecordReader<Void, Container<V>> {
@@ -157,13 +154,10 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
     }
   }
 
-
-
   private static class ParquetInputSplitWrapper implements InputSplit {
 
     ParquetInputSplit realSplit;
 
-
     @SuppressWarnings("unused") // MapReduce instantiates this.
     public ParquetInputSplitWrapper() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
index 69bf36b..cbc9318 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -15,12 +15,6 @@
  */
 package parquet.hadoop.mapred;
 
-import parquet.Log;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.ParquetRecordWriter;
-import parquet.hadoop.codec.CodecConfig;
-import parquet.hadoop.metadata.CompressionCodecName;
-
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,9 +25,12 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-@SuppressWarnings("deprecation")
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.ParquetRecordWriter;
+import parquet.hadoop.codec.CodecConfig;
+import parquet.hadoop.metadata.CompressionCodecName;
+
 public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.FileOutputFormat<Void, V> {
-  private static final Log LOG = Log.getLog(DeprecatedParquetOutputFormat.class);
 
   public static void setWriteSupportClass(Configuration configuration,  Class<?> writeSupportClass) {
     configuration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -69,10 +66,10 @@ public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.F
   @Override
   public RecordWriter<Void, V> getRecordWriter(FileSystem fs,
       JobConf conf, String name, Progressable progress) throws IOException {
-    return new RecordWriterWrapper<V>(realOutputFormat, fs, conf, name, progress);
+    return new RecordWriterWrapper(realOutputFormat, fs, conf, name, progress);
   }
 
-  private class RecordWriterWrapper<V> implements RecordWriter<Void, V> {
+  private class RecordWriterWrapper implements RecordWriter<Void, V> {
 
     private ParquetRecordWriter<V> realWriter;
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
index 7cb6d73..6d5979a 100644
--- a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
@@ -15,25 +15,40 @@
  */
 package parquet.format.converter;
 
+import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.format.CompressionCodec.UNCOMPRESSED;
+import static parquet.format.Type.INT32;
 import static parquet.format.Util.readPageHeader;
 import static parquet.format.Util.writePageHeader;
+import static parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
+import static parquet.format.converter.ParquetMetadataConverter.getOffset;
 
-import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import parquet.column.Encoding;
 import parquet.example.Paper;
+import parquet.format.ColumnChunk;
+import parquet.format.ColumnMetaData;
 import parquet.format.ConvertedType;
 import parquet.format.FieldRepetitionType;
+import parquet.format.FileMetaData;
 import parquet.format.PageHeader;
 import parquet.format.PageType;
+import parquet.format.RowGroup;
 import parquet.format.SchemaElement;
 import parquet.format.Type;
 import parquet.schema.MessageType;
@@ -42,6 +57,8 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.Types;
 
+import com.google.common.collect.Lists;
+
 public class TestParquetMetadataConverter {
 
   @Test
@@ -117,4 +134,113 @@ public class TestParquetMetadataConverter {
     }
   }
 
+  private FileMetaData metadata(long... sizes) {
+    List<SchemaElement> schema = emptyList();
+    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
+    long offset = 0;
+    for (long size : sizes) {
+      ColumnChunk columnChunk = new ColumnChunk(offset);
+      columnChunk.setMeta_data(new ColumnMetaData(
+          INT32,
+          Collections.<parquet.format.Encoding>emptyList(),
+          Collections.<String>emptyList(),
+          UNCOMPRESSED, 10l, size * 2, size, offset));
+      rowGroups.add(new RowGroup(Arrays.asList(columnChunk), size, 1));
+      offset += size;
+    }
+    return new FileMetaData(1, schema, sizes.length, rowGroups);
+  }
+
+  private FileMetaData filter(FileMetaData md, long start, long end) {
+    return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
+  }
+
+  private void verifyMD(FileMetaData md, long... offsets) {
+    assertEquals(offsets.length, md.row_groups.size());
+    for (int i = 0; i < offsets.length; i++) {
+      long offset = offsets[i];
+      RowGroup rowGroup = md.getRow_groups().get(i);
+      assertEquals(offset, getOffset(rowGroup));
+    }
+  }
+
+  /**
+   * verifies that splits will end up being a partition of the rowgroup
+   * they are all found only once
+   * @param md
+   * @param splitWidth
+   */
+  private void verifyAllFilters(FileMetaData md, long splitWidth) {
+    Set<Long> offsetsFound = new TreeSet<Long>();
+    for (long start = 0; start < fileSize(md); start += splitWidth) {
+      FileMetaData filtered = filter(md, start, start + splitWidth);
+      for (RowGroup rg : filtered.getRow_groups()) {
+        long o = getOffset(rg);
+        if (offsetsFound.contains(o)) {
+          fail("found the offset twice: " + o);
+        } else {
+          offsetsFound.add(o);
+        }
+      }
+    }
+    if (offsetsFound.size() != md.row_groups.size()) {
+      fail("missing row groups, "
+          + "found: " + offsetsFound
+          + "\nexpected " + md.getRow_groups());
+    }
+  }
+
+  private long fileSize(FileMetaData md) {
+    long size = 0;
+    for (RowGroup rg : md.getRow_groups()) {
+      size += rg.total_byte_size;
+    }
+    return size;
+  }
+
+  @Test
+  public void testFilterMetaData() {
+    verifyMD(filter(metadata(50, 50, 50), 0, 50), 0);
+    verifyMD(filter(metadata(50, 50, 50), 50, 100), 50);
+    verifyMD(filter(metadata(50, 50, 50), 100, 150), 100);
+    // picks up first RG
+    verifyMD(filter(metadata(50, 50, 50), 25, 75), 0);
+    // picks up no RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 75));
+    // picks up second RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 76), 50);
+
+    verifyAllFilters(metadata(50, 50, 50), 10);
+    verifyAllFilters(metadata(50, 50, 50), 51);
+    verifyAllFilters(metadata(50, 50, 50), 25); // corner cases are in the middle
+    verifyAllFilters(metadata(50, 50, 50), 24);
+    verifyAllFilters(metadata(50, 50, 50), 26);
+    verifyAllFilters(metadata(50, 50, 50), 110);
+    verifyAllFilters(metadata(10, 50, 500), 110);
+    verifyAllFilters(metadata(10, 50, 500), 10);
+    verifyAllFilters(metadata(10, 50, 500), 600);
+    verifyAllFilters(metadata(11, 9, 10), 10);
+    verifyAllFilters(metadata(11, 9, 10), 9);
+    verifyAllFilters(metadata(11, 9, 10), 8);
+  }
+
+  @Test
+  public void randomTestFilterMetaData() {
+    // randomized property based testing
+    // if it fails add the case above
+    Random random = new Random(System.currentTimeMillis());
+    for (int j = 0; j < 100; j++) {
+      long[] rgs = new long[random.nextInt(50)];
+      for (int i = 0; i < rgs.length; i++) {
+        rgs[i] = random.nextInt(10000) + 1; // No empty row groups
+      }
+      int splitSize = random.nextInt(10000);
+      try {
+        verifyAllFilters(metadata(rgs), splitSize);
+      } catch (AssertionError e) {
+        throw new AssertionError("fail verifyAllFilters(metadata(" + Arrays.toString(rgs) + "), " + splitSize + ")", e);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
index 696c7b3..936c7c7 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -110,7 +110,7 @@ public class DeprecatedInputFormatTest {
   }
 
   @Test
-  public void testReadWriteWithCounteDeprecated() throws Exception {
+  public void testReadWriteWithCountDeprecated() throws Exception {
     runMapReduceJob(CompressionCodecName.GZIP);
     assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue() > 0L);
     assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue() > 0L);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index 1823716..bac4fb0 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -15,13 +15,28 @@
  */
 package parquet.hadoop;
 
+import static java.util.Collections.unmodifiableMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -54,18 +69,6 @@ import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static parquet.filter2.predicate.FilterApi.and;
-import static parquet.filter2.predicate.FilterApi.eq;
-import static parquet.filter2.predicate.FilterApi.intColumn;
-import static parquet.filter2.predicate.FilterApi.not;
-import static parquet.filter2.predicate.FilterApi.notEq;
-import static parquet.filter2.predicate.FilterApi.or;
-
 public class TestInputFormat {
 
   List<BlockMetaData> blocks;
@@ -86,7 +89,6 @@ public class TestInputFormat {
     for (int i = 0; i < 10; i++) {
       blocks.add(newBlock(i * 10, 10));
     }
-    fileStatus = new FileStatus(100, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
     schema = MessageTypeParser.parseMessageType("message doc { required binary foo; }");
     fileMetaData = new FileMetaData(schema, new HashMap<String, String>(), "parquet-mr");
   }
@@ -103,6 +105,17 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsSmallerThanMinSplitSizeTaskSide() throws IOException {
+    try {
+      generateTSSplitByMinMaxSize(50, 49);
+      fail("should throw exception when max split size is smaller than the min split size");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = 49; minSplitSize is 50"
+              , e.getMessage());
+    }
+  }
+
+  @Test
   public void testThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
     try {
       generateSplitByMinMaxSize(-100, -50);
@@ -114,6 +127,17 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testTSThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
+    try {
+      generateTSSplitByMinMaxSize(-100, -50);
+      fail("should throw exception when max split size is negative");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = -50; minSplitSize is -100"
+              , e.getMessage());
+    }
+  }
+
+  @Test
   public void testGetFilter() throws IOException {
     IntColumn intColumn = intColumn("foo");
     FilterPredicate p = or(eq(intColumn, 7), eq(intColumn, 12));
@@ -151,6 +175,21 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testTSGenerateSplitsAlignedWithHDFSBlock() throws IOException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    splits = generateSplitByMinMaxSize(0, Long.MAX_VALUE);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+  }
+
+  @Test
   public void testRowGroupNotAlignToHDFSBlock() throws IOException {
     //Test HDFS blocks size(51) is not multiple of row group size(10)
     withHDFSBlockSize(51, 51);
@@ -179,6 +218,32 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 40, 50, 10);
   }
 
+  @Test
+  public void testTSRowGroupNotAlignToHDFSBlock() throws IOException {
+    //Test HDFS blocks size(51) is not multiple of row group size(10)
+    withHDFSBlockSize(51, 51);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50, 100);
+    shouldSplitLocationBe(splits, 0, 1, 1);
+    shouldSplitLengthBe(splits, 50, 50, 2);
+
+    //Test a rowgroup is greater than the hdfsBlock boundary
+    withHDFSBlockSize(49, 49);
+    splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 50, 48);
+
+    /*
+    aaaa bbbbb c
+     */
+    withHDFSBlockSize(44,44,44);
+    splits = generateTSSplitByMinMaxSize(40, 50);
+    shouldSplitStartBe(splits, 0, 44, 88);
+    shouldSplitLocationBe(splits, 0, 1, 2);
+    shouldSplitLengthBe(splits, 44, 44, 44);
+  }
+
   /*
     when min size is 55, max size is 56, the first split will be generated with 6 row groups(size of 10 each), which satisfies split.size>min.size, but not split.size<max.size
     aaaaa abbbb
@@ -205,6 +270,28 @@ public class TestInputFormat {
 
   }
 
+  @Test
+  public void testTSGenerateSplitsNotAlignedWithHDFSBlock() throws IOException, InterruptedException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 56, 44);
+
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 56, 46);
+
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56, 112);
+    shouldSplitLocationBe(splits, 1, 2, 2);
+    shouldSplitLengthBe(splits, 56, 56, 35);
+
+  }
+
   /*
     when the max size is set to be 30, first split will be of size 30,
     and when creating second split, it will try to align it to second hdfsBlock, and therefore generates a split of size 20
@@ -237,6 +324,33 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 20, 30, 20);
   }
 
+  @Test
+  public void testTSGenerateSplitsSmallerThanMaxSizeAndAlignToHDFS() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 50, 80);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+         */
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 51, 81);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 21, 30, 21);
+
+    /*
+    aaabb cccdd
+     */
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 49, 79, 98, 128);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1, 2, 2);
+    shouldSplitLengthBe(splits, 30, 19, 30, 19, 30, 19);
+  }
+
   /*
     when the min size is set to be 25, so the second split can not be aligned with the boundary of hdfs block, there for split of size 30 will be created as the 3rd split.
     aaabb bcccd
@@ -250,6 +364,15 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 30, 30, 10);
   }
 
+  @Test
+  public void testTSGenerateSplitsCrossHDFSBlockBoundaryToSatisfyMinSize() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(25, 30);
+    shouldSplitStartBe(splits, 0, 30, 60, 90);
+    shouldSplitLocationBe(splits, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 30, 30, 30, 10);
+  }
+
   /*
     when rowGroups size is 10, but min split size is 10, max split size is 18, it will create splits of size 20 and of size 10 and align with hdfsBlocks
     aabbc ddeef
@@ -285,6 +408,37 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
   }
 
+  @Test
+  public void testTSMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 50, 68, 86);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 14, 18, 18, 14);
+
+    /*
+    aabbc ddeef
+    notice the first byte of split d is in the first hdfs block:
+    when adding the 6th row group, although the first byte of it is in the first hdfs block
+    , but the mid point of the row group is in the second hdfs block, there for a new split(d) is created including that row group
+     */
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 51, 69, 87);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 15, 18, 18, 15);
+
+    /*
+    aabbc ddeef
+    same as the case where block sizes are 50 50
+     */
+    withHDFSBlockSize(49, 49);
+    splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 49, 67, 85);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 13, 18, 18, 13);
+  }
+
   public static final class DummyUnboundRecordFilter implements UnboundRecordFilter {
     @Override
     public RecordFilter bind(Iterable<ColumnReader> readers) {
@@ -379,33 +533,63 @@ public class TestInputFormat {
     return cacheValue;
   }
 
+  private static final Map<String, String> extramd;
+  static {
+    Map<String, String> md = new HashMap<String, String>();
+    md.put("specific", "foo");
+    extramd = unmodifiableMap(md);
+  }
+
   private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
-    return ParquetInputFormat.generateSplits(
-            blocks, hdfsBlocks, fileStatus, fileMetaData, schema.toString(), new HashMap<String, String>() {{
-              put("specific", "foo");
-            }}, min, max
-    );
+    return ClientSideMetadataSplitStrategy.generateSplits(
+        blocks, hdfsBlocks,
+        fileStatus,
+        schema.toString(),
+        extramd,
+        min, max);
+  }
+
+  private List<ParquetInputSplit> generateTSSplitByMinMaxSize(long min, long max) throws IOException {
+    return TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(
+        hdfsBlocks,
+        fileStatus,
+        schema.toString(),
+        extramd,
+        min, max);
+  }
+
+  private void shouldSplitStartBe(List<ParquetInputSplit> splits, long... offsets) {
+    assertEquals(message(splits), offsets.length, splits.size());
+    for (int i = 0; i < offsets.length; i++) {
+      assertEquals(message(splits) + i, offsets[i], splits.get(i).getStart());
+    }
   }
 
   private void shouldSplitBlockSizeBe(List<ParquetInputSplit> splits, int... sizes) {
-    assertEquals(sizes.length, splits.size());
+    assertEquals(message(splits), sizes.length, splits.size());
     for (int i = 0; i < sizes.length; i++) {
-      assertEquals(sizes[i], splits.get(i).getBlocks().size());
-      assertEquals("foo", splits.get(i).getReadSupportMetadata().get("specific"));
+      assertEquals(message(splits) + i, sizes[i], splits.get(i).getRowGroupOffsets().length);
     }
   }
 
   private void shouldSplitLocationBe(List<ParquetInputSplit> splits, int... locations) throws IOException {
-    assertEquals(locations.length, splits.size());
+    assertEquals(message(splits), locations.length, splits.size());
     for (int i = 0; i < locations.length; i++) {
-      assertEquals("[foo" + locations[i] + ".datanode, bar" + locations[i] + ".datanode]", Arrays.toString(splits.get(i).getLocations()));
+      int loc = locations[i];
+      ParquetInputSplit split = splits.get(i);
+      assertEquals(message(splits) + i, "foo", split.getReadSupportMetadata().get("specific"));
+      assertEquals(message(splits) + i, "[foo" + loc + ".datanode, bar" + loc + ".datanode]", Arrays.toString(split.getLocations()));
     }
   }
 
+  private String message(List<ParquetInputSplit> splits) {
+    return String.valueOf(splits) + " " + Arrays.toString(hdfsBlocks) + "\n";
+  }
+
   private void shouldSplitLengthBe(List<ParquetInputSplit> splits, int... lengths) {
-    assertEquals(lengths.length, splits.size());
+    assertEquals(message(splits), lengths.length, splits.size());
     for (int i = 0; i < lengths.length; i++) {
-      assertEquals(lengths[i], splits.get(i).getLength());
+      assertEquals(message(splits) + i, lengths[i], splits.get(i).getLength());
     }
   }
 
@@ -417,6 +601,7 @@ public class TestInputFormat {
       hdfsBlocks[i] = new BlockLocation(new String[0], new String[]{"foo" + i + ".datanode", "bar" + i + ".datanode"}, offset, blockSize);
       offset += blockSize;
     }
+    fileStatus = new FileStatus(offset, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
   }
 
   private BlockMetaData newBlock(long start, long compressedBlockSize) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
index df593b0..908d2b1 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
@@ -278,16 +278,17 @@ public class TestParquetFileWriter {
     createFile(configuration, new Path(testDirPath, "part2"), schema);
 
     FileStatus outputStatus = fs.getFileStatus(testDirPath);
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
+    List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
     validateFooters(footers);
     ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers);
 
-    footers = ParquetFileReader.readFooters(configuration, outputStatus);
+    footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
     validateFooters(footers);
-    footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")));
+    footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")), false);
     assertEquals(1, footers.size());
 
     final FileStatus metadataFile = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_METADATA_FILE));
+    final FileStatus metadataFileLight = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE));
     final List<Footer> metadata = ParquetFileReader.readSummaryFile(configuration, metadataFile);
 
     validateFooters(metadata);
@@ -297,19 +298,20 @@ public class TestParquetFileWriter {
       public boolean accept(Path p) {
         return !p.getName().startsWith("_");
       }
-    })));
+    })), false);
     validateFooters(footers);
 
     fs.delete(metadataFile.getPath(), false);
+    fs.delete(metadataFileLight.getPath(), false);
 
-    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)));
+    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)), false);
     validateFooters(footers);
 
   }
 
   private void validateFooters(final List<Footer> metadata) {
     LOG.debug(metadata);
-    assertEquals(3, metadata.size());
+    assertEquals(String.valueOf(metadata), 3, metadata.size());
     for (Footer footer : metadata) {
       final File file = new File(footer.getFile().toUri());
       assertTrue(file.getName(), file.getName().startsWith("part"));

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
deleted file mode 100644
index f94b90a..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class TestParquetInputSplit {
-
-    @Test
-    public void testStringCompression() {
-      String[] strings = {"this is a string",
-         "this is a string with a \n newline",
-         "a:chararray, b:{t:(c:chararray, d:chararray)}",
-         "message pig_schema {\n" +
-         "  optional binary a;\n" +
-         "  optional group b {\n" +
-         "    repeated group t {\n" +
-         "      optional binary c;\n" +
-         "      optional binary d;\n" +
-         "    }\n" +
-         "  }\n" +
-         "}\n"
-         };
-      ParquetInputSplit split = new ParquetInputSplit();
-      for (String s : strings) {
-        byte[] bytes = split.compressString(s);
-        String uncs = split.decompressString(bytes);
-        assertEquals("strings should be same after decompressing", s, uncs);
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
index a541a9f..e65f03c 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -152,9 +153,14 @@ public class TestInputOutputFormat {
       context.write(new LongWritable(value.getInteger("line", 0)), new Text("dummy"));
     }
   }
-
   private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-
+    runMapReduceJob(codec, Collections.<String, String>emptyMap());
+  }
+  private void runMapReduceJob(CompressionCodecName codec, Map<String, String> extraConf) throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = new Configuration(this.conf);
+    for (Map.Entry<String, String> entry : extraConf.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
     final FileSystem fileSystem = parquetPath.getFileSystem(conf);
     fileSystem.delete(parquetPath, true);
     fileSystem.delete(outputPath, true);
@@ -193,7 +199,10 @@ public class TestInputOutputFormat {
   }
 
   private void testReadWrite(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-    runMapReduceJob(codec);
+    testReadWrite(codec, Collections.<String, String>emptyMap());
+  }
+  private void testReadWrite(CompressionCodecName codec, Map<String, String> conf) throws IOException, ClassNotFoundException, InterruptedException {
+    runMapReduceJob(codec, conf);
     final BufferedReader in = new BufferedReader(new FileReader(new File(inputPath.toString())));
     final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
     String lineIn;
@@ -204,8 +213,8 @@ public class TestInputOutputFormat {
       lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
       assertEquals("line " + lineNumber, lineIn, lineOut);
     }
-    assertNull("line " + lineNumber, lineIn);
     assertNull("line " + lineNumber, out.readLine());
+    assertNull("line " + lineNumber, lineIn);
     in.close();
     out.close();
   }
@@ -219,9 +228,14 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testReadWriteTaskSideMD() throws IOException, ClassNotFoundException, InterruptedException {
+    testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() {{ put("parquet.task.side.metadata", "true"); }});
+  }
+
+  @Test
   public void testProjection() throws Exception{
     readSchema=partialSchema;
-    writeMapperClass=PartialWriteMapper.class;
+    writeMapperClass = PartialWriteMapper.class;
     runMapReduceJob(CompressionCodecName.GZIP);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 9e98781..796a0f5 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -13,9 +13,9 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.read;
 
+import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +35,6 @@ import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.FileMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ContextUtil;
@@ -194,45 +193,33 @@ public class ParquetRecordReaderWrapper  implements RecordReader<Void, ArrayWrit
       final InputSplit oldSplit,
       final JobConf conf
       ) throws IOException {
-    ParquetInputSplit split;
     if (oldSplit instanceof FileSplit) {
-      final Path finalPath = ((FileSplit) oldSplit).getPath();
+      FileSplit fileSplit = (FileSplit) oldSplit;
+      final long splitStart = fileSplit.getStart();
+      final long splitLength = fileSplit.getLength();
+      final Path finalPath = fileSplit.getPath();
       final JobConf cloneJob = hiveBinding.pushProjectionsAndFilters(conf, finalPath.getParent());
 
-      final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
-      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+      final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath, SKIP_ROW_GROUPS);
       final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
-      final ReadContext readContext = new DataWritableReadSupport()
-          .init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
-      schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-          .get(DataWritableReadSupport.HIVE_SCHEMA_KEY)).getFieldCount();
-      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
-      final long splitStart = ((FileSplit) oldSplit).getStart();
-      final long splitLength = ((FileSplit) oldSplit).getLength();
-      for (final BlockMetaData block : blocks) {
-        final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
-        if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
-          splitGroup.add(block);
-        }
-      }
-      if (splitGroup.isEmpty()) {
-        LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit);
-        split = null;
-      } else {
-        split = new ParquetInputSplit(finalPath,
+      final ReadContext readContext =
+          new DataWritableReadSupport()
+            .init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
+
+      schemaSize = MessageTypeParser.parseMessageType(
+            readContext.getReadSupportMetadata().get(DataWritableReadSupport.HIVE_SCHEMA_KEY)
+          ).getFieldCount();
+       return new ParquetInputSplit(
+                finalPath,
                 splitStart,
+                splitStart + splitLength,
                 splitLength,
-                ((FileSplit) oldSplit).getLocations(),
-                splitGroup,
+                fileSplit.getLocations(),
+                null,
                 readContext.getRequestedSchema().toString(),
-                fileMetaData.getSchema().toString(),
-                fileMetaData.getKeyValueMetaData(),
                 readContext.getReadSupportMetadata());
-      }
     } else {
       throw new IllegalArgumentException("Unknown split type: " + oldSplit);
     }
-    return split;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index 91dc0da..2e96ecf 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -98,34 +98,34 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   public ParquetLoader(String requestedSchemaStr) {
     this(parsePigSchema(requestedSchemaStr), false);
   }
-  
+
   /**
-   * To read only a subset of the columns in the file optionally assigned by 
+   * To read only a subset of the columns in the file optionally assigned by
    * column positions.  Using column positions allows for renaming the fields
    * and is more inline with the "schema-on-read" approach to accessing file
    * data.
-   * 
-   * Example: 
+   *
+   * Example:
    * File Schema:  'c1:int, c2:float, c3:double, c4:long'
    * ParquetLoader('n1:int, n2:float, n3:double, n4:long', 'true');
-   * 
+   *
    * This will use the names provided in the requested schema and assign them
    * to column positions indicated by order.
-   * 
+   *
    * @param requestedSchemaStr a subset of the original pig schema in the file
    * @param columnIndexAccess use column index positions as opposed to name (default: false)
    */
   public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
     this(parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
   }
-  
+
   /**
    * Use the provided schema to access the underlying file data.
-   * 
+   *
    * The same as the string based constructor but for programmatic use.
-   * 
+   *
    * @param requestedSchema a subset of the original pig schema in the file
-   * @param columnIndexAccess  
+   * @param columnIndexAccess
    */
   public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
     this.requestedSchema = requestedSchema;
@@ -135,7 +135,7 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   @Override
   public void setLocation(String location, Job job) throws IOException {
     if (DEBUG) LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
-    
+
     setInput(location, job);
   }
 
@@ -143,25 +143,25 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
     this.setLocationHasBeenCalled  = true;
     this.location = location;
     setInputPaths(job, location);
-    
+
     //This is prior to load because the initial value comes from the constructor
     //not file metadata or pig framework and would get overwritten in initSchema().
     if(UDFContext.getUDFContext().isFrontend()) {
       storeInUDFContext(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
     }
-    
+
     schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
     requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
     columnIndexAccess = Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
-    
+
     initSchema(job);
-    
+
     if(UDFContext.getUDFContext().isFrontend()) {
       //Setting for task-side loading via initSchema()
       storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
       storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
     }
-    
+
     //Used by task-side loader via TupleReadSupport
     getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
@@ -335,14 +335,14 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
       throws FrontendException {
     this.requiredFieldList = requiredFieldList;
-    
+
     if (requiredFieldList == null)
       return null;
-    
+
     schema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
     storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
-    
+
     return new RequiredFieldResponse(true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
index 4076134..7f07be7 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
@@ -69,18 +69,18 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
    */
   static RequiredFieldList getRequiredFields(Configuration configuration) {
     String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
-    
+
     if(requiredFieldString == null) {
       return null;
     }
-    
+
     try {
       return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
     } catch (IOException iOException) {
       throw new RuntimeException("Failed to deserialize pushProjection");
     }
   }
-  
+
   /**
    * @param fileSchema the parquet schema from the file
    * @param keyValueMetaData the extra meta data from the files
@@ -156,13 +156,13 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
     Schema pigSchema = getPigSchema(initContext.getConfiguration());
     RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration());
     boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
-    
+
     if (pigSchema == null) {
       return new ReadContext(initContext.getFileSchema());
     } else {
-      
+
       // project the file schema according to the requested Pig schema
-      MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);;
+      MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);
       return new ReadContext(parquetRequestedSchema);
     }
   }
@@ -175,7 +175,7 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
       ReadContext readContext) {
     MessageType requestedSchema = readContext.getRequestedSchema();
     Schema requestedPigSchema = getPigSchema(configuration);
-    
+
     if (requestedPigSchema == null) {
       throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf");
     }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7b2e59f..65969e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@
     <shade.prefix>parquet</shade.prefix>
     <hadoop.version>1.1.0</hadoop.version>
     <cascading.version>2.5.3</cascading.version>
-    <parquet.format.version>2.1.0</parquet.format.version>
+    <parquet.format.version>2.2.0-rc1</parquet.format.version>
     <log4j.version>1.2.17</log4j.version>
     <previous.version>1.6.0rc1</previous.version>
     <thrift.executable>thrift</thrift.executable>
@@ -209,6 +209,7 @@
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
                      <exclude>parquet/org/**</exclude>
+                     <exclude>parquet/hadoop/ParquetInputSplit</exclude>
                    </excludes>
                  </requireBackwardCompatibility>
                </rules>


Mime
View raw message