parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [05/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:02 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
deleted file mode 100644
index 75ec1a2..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop.util.counters.mapreduce;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.hadoop.util.counters.CounterLoader;
-import parquet.hadoop.util.counters.ICounter;
-
-/**
- * Concrete factory for counters in mapred API,
- * get a counter using mapreduce API when the corresponding flag is set, otherwise return a NullCounter
- * @author Tianshuo Deng
- */
-public class MapReduceCounterLoader implements CounterLoader {
-  private TaskInputOutputContext<?, ?, ?, ?> context;
-
-  public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
-    this.context = context;
-  }
-
-  @Override
-  public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
-    if (ContextUtil.getConfiguration(context).getBoolean(counterFlag, true)) {
-      return new MapReduceCounterAdapter(ContextUtil.getCounter(context, groupName, counterName));
-    } else {
-      return new BenchmarkCounter.NullCounter();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
new file mode 100644
index 0000000..14877ab
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
@@ -0,0 +1,102 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.hadoop.TestInputFormat.makeBlockFromStats;
+
+public class TestRowGroupFilter {
+  @Test
+  public void testApplyRowGroupFilters() {
+
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+    IntStatistics stats1 = new IntStatistics();
+    stats1.setMinMax(10, 100);
+    stats1.setNumNulls(4);
+    BlockMetaData b1 = makeBlockFromStats(stats1, 301);
+    blocks.add(b1);
+
+    IntStatistics stats2 = new IntStatistics();
+    stats2.setMinMax(8, 102);
+    stats2.setNumNulls(0);
+    BlockMetaData b2 = makeBlockFromStats(stats2, 302);
+    blocks.add(b2);
+
+    IntStatistics stats3 = new IntStatistics();
+    stats3.setMinMax(100, 102);
+    stats3.setNumNulls(12);
+    BlockMetaData b3 = makeBlockFromStats(stats3, 303);
+    blocks.add(b3);
+
+
+    IntStatistics stats4 = new IntStatistics();
+    stats4.setMinMax(0, 0);
+    stats4.setNumNulls(304);
+    BlockMetaData b4 = makeBlockFromStats(stats4, 304);
+    blocks.add(b4);
+
+
+    IntStatistics stats5 = new IntStatistics();
+    stats5.setMinMax(50, 50);
+    stats5.setNumNulls(7);
+    BlockMetaData b5 = makeBlockFromStats(stats5, 305);
+    blocks.add(b5);
+
+    IntStatistics stats6 = new IntStatistics();
+    stats6.setMinMax(0, 0);
+    stats6.setNumNulls(12);
+    BlockMetaData b6 = makeBlockFromStats(stats6, 306);
+    blocks.add(b6);
+
+    MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
+    IntColumn foo = intColumn("foo");
+
+    List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b5), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), blocks, schema);
+    assertEquals(Arrays.asList(b6), filtered);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
new file mode 100644
index 0000000..7acda93
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -0,0 +1,269 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class PhoneBookWriter {
+  private static final String schemaString =
+      "message user {\n"
+          + "  required int64 id;\n"
+          + "  optional binary name (UTF8);\n"
+          + "  optional group location {\n"
+          + "    optional double lon;\n"
+          + "    optional double lat;\n"
+          + "  }\n"
+          + "  optional group phoneNumbers {\n"
+          + "    repeated group phone {\n"
+          + "      required int64 number;\n"
+          + "      optional binary kind (UTF8);\n"
+          + "    }\n"
+          + "  }\n"
+          + "}\n";
+
+  private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+  public static class Location {
+    private final Double lon;
+    private final Double lat;
+
+    public Location(Double lon, Double lat) {
+      this.lon = lon;
+      this.lat = lat;
+    }
+
+    public Double getLon() {
+      return lon;
+    }
+
+    public Double getLat() {
+      return lat;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Location location = (Location) o;
+
+      if (lat != null ? !lat.equals(location.lat) : location.lat != null) return false;
+      if (lon != null ? !lon.equals(location.lon) : location.lon != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = lon != null ? lon.hashCode() : 0;
+      result = 31 * result + (lat != null ? lat.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static class PhoneNumber {
+    private final long number;
+    private final String kind;
+
+    public PhoneNumber(long number, String kind) {
+      this.number = number;
+      this.kind = kind;
+    }
+
+    public long getNumber() {
+      return number;
+    }
+
+    public String getKind() {
+      return kind;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      PhoneNumber that = (PhoneNumber) o;
+
+      if (number != that.number) return false;
+      if (kind != null ? !kind.equals(that.kind) : that.kind != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (number ^ (number >>> 32));
+      result = 31 * result + (kind != null ? kind.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static class User {
+    private final long id;
+    private final String name;
+    private final List<PhoneNumber> phoneNumbers;
+    private final Location location;
+
+    public User(long id, String name, List<PhoneNumber> phoneNumbers, Location location) {
+      this.id = id;
+      this.name = name;
+      this.phoneNumbers = phoneNumbers;
+      this.location = location;
+    }
+
+    public long getId() {
+      return id;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public List<PhoneNumber> getPhoneNumbers() {
+      return phoneNumbers;
+    }
+
+    public Location getLocation() {
+      return location;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      User user = (User) o;
+
+      if (id != user.id) return false;
+      if (location != null ? !location.equals(user.location) : user.location != null) return false;
+      if (name != null ? !name.equals(user.name) : user.name != null) return false;
+      if (phoneNumbers != null ? !phoneNumbers.equals(user.phoneNumbers) : user.phoneNumbers != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (id ^ (id >>> 32));
+      result = 31 * result + (name != null ? name.hashCode() : 0);
+      result = 31 * result + (phoneNumbers != null ? phoneNumbers.hashCode() : 0);
+      result = 31 * result + (location != null ? location.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static SimpleGroup groupFromUser(User user) {
+    SimpleGroup root = new SimpleGroup(schema);
+    root.append("id", user.getId());
+
+    if (user.getName() != null) {
+      root.append("name", user.getName());
+    }
+
+    if (user.getPhoneNumbers() != null) {
+      Group phoneNumbers = root.addGroup("phoneNumbers");
+      for (PhoneNumber number : user.getPhoneNumbers()) {
+        Group phone = phoneNumbers.addGroup("phone");
+        phone.append("number", number.getNumber());
+        if (number.getKind() != null) {
+          phone.append("kind", number.getKind());
+        }
+      }
+    }
+
+    if (user.getLocation() != null) {
+      Group location = root.addGroup("location");
+      if (user.getLocation().getLon() != null) {
+        location.append("lon", user.getLocation().getLon());
+      }
+      if (user.getLocation().getLat() != null) {
+        location.append("lat", user.getLocation().getLat());
+      }
+    }
+    return root;
+  }
+
+  public static File writeToFile(List<User> users) throws IOException {
+    File f = File.createTempFile("phonebook", ".parquet");
+    f.deleteOnExit();
+    if (!f.delete()) {
+      throw new IOException("couldn't delete tmp file" + f);
+    }
+
+    writeToFile(f, users);
+
+    return f;
+  }
+
+  public static void writeToFile(File f, List<User> users) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
+    for (User u : users) {
+      writer.write(groupFromUser(u));
+    }
+    writer.close();
+  }
+
+  public static List<Group> readFile(File f, Filter filter) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    ParquetReader<Group> reader =
+        ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
+                     .withConf(conf)
+                     .withFilter(filter)
+                     .build();
+
+    Group current;
+    List<Group> users = new ArrayList<Group>();
+
+    current = reader.read();
+    while (current != null) {
+      users.add(current);
+      current = reader.read();
+    }
+
+    return users;
+  }
+
+  public static void main(String[] args) throws IOException {
+    File f = new File(args[0]);
+    writeToFile(f, TestRecordLevelFilters.makeUsers());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
new file mode 100644
index 0000000..5a7d02f
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
@@ -0,0 +1,276 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.HashSet;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+
+public class TestRecordLevelFilters {
+
+  public static List<User> makeUsers() {
+    List<User> users = new ArrayList<User>();
+
+    users.add(new User(17, null, null, null));
+
+    users.add(new User(18, "bob", null, null));
+
+    users.add(new User(19, "alice", new ArrayList<PhoneNumber>(), null));
+
+    users.add(new User(20, "thing1", Arrays.asList(new PhoneNumber(5555555555L, null)), null));
+
+    users.add(new User(27, "thing2", Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+    users.add(new User(28, "popular", Arrays.asList(
+        new PhoneNumber(1111111111L, "home"),
+        new PhoneNumber(2222222222L, null),
+        new PhoneNumber(3333333333L, "mobile")
+    ), null));
+
+    users.add(new User(30, null, Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+    for (int i = 100; i < 200; i++) {
+      Location location = null;
+      if (i % 3 == 1) {
+        location = new Location((double)i, (double)i*2);
+      }
+      if (i % 3 == 2) {
+        location = new Location((double)i, null);
+      }
+      users.add(new User(i, "p" + i, Arrays.asList(new PhoneNumber(i, "cell")), location));
+    }
+
+    return users;
+  }
+
+  private static File phonebookFile;
+  private static List<User> users;
+
+  @BeforeClass
+  public static void setup() throws IOException{
+    users = makeUsers();
+    phonebookFile = PhoneBookWriter.writeToFile(users);
+  }
+
+  private static interface UserFilter {
+    boolean keep(User u);
+  }
+
+  private static List<Group> getExpected(UserFilter f) {
+    List<Group> expected = new ArrayList<Group>();
+    for (User u : users) {
+      if (f.keep(u)) {
+        expected.add(PhoneBookWriter.groupFromUser(u));
+      }
+    }
+    return expected;
+  }
+
+  private static void assertFilter(List<Group> found, UserFilter f) {
+    List<Group> expected = getExpected(f);
+    assertEquals(expected.size(), found.size());
+    Iterator<Group> expectedIter = expected.iterator();
+    Iterator<Group> foundIter = found.iterator();
+    while(expectedIter.hasNext()) {
+      assertEquals(expectedIter.next().toString(), foundIter.next().toString());
+    }
+  }
+
+  @Test
+  public void testNoFilter() throws Exception {
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.NOOP);
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return true;
+      }
+    });
+  }
+
+  @Test
+  public void testAllFilter() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = eq(name, Binary.fromString("no matches"));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+    assertEquals(new ArrayList<Group>(), found);
+  }
+
+  @Test
+  public void testNameNotNull() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = notEq(name, null);
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return u.getName() != null;
+      }
+    });
+  }
+
+  public static class StartWithP extends UserDefinedPredicate<Binary> {
+
+    @Override
+    public boolean keep(Binary value) {
+      if (value == null) {
+        return false;
+      }
+      return value.toStringUsingUTF8().startsWith("p");
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Binary> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Binary> statistics) {
+      return false;
+    }
+  }
+  
+  public static class SetInFilter extends UserDefinedPredicate<Long> implements Serializable {
+
+    private HashSet<Long> hSet;
+
+    public SetInFilter(HashSet<Long> phSet) {
+      hSet = phSet;
+    }
+
+    @Override
+    public boolean keep(Long value) {
+      if (value == null) {
+        return false;
+      }
+
+      return hSet.contains(value);
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Long> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Long> statistics) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testNameNotStartWithP() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = not(userDefined(name, StartWithP.class));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return u.getName() == null || !u.getName().startsWith("p");
+      }
+    });
+  }
+  
+  @Test
+  public void testUserDefinedByInstance() throws Exception {
+    LongColumn name = longColumn("id");
+
+    final HashSet<Long> h = new HashSet<Long>();
+    h.add(20L); 
+    h.add(27L);
+    h.add(28L);
+    
+    FilterPredicate pred = userDefined(name, new SetInFilter(h));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return u != null && h.contains(u.getId());
+      }
+    });
+  }
+
+  @Test
+  public void testComplex() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+    DoubleColumn lon = doubleColumn("location.lon");
+    DoubleColumn lat = doubleColumn("location.lat");
+
+    FilterPredicate pred = or(and(gt(lon, 150.0), notEq(lat, null)), eq(name, Binary.fromString("alice")));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        String name = u.getName();
+        Double lat = null;
+        Double lon = null;
+        if (u.getLocation() != null) {
+          lat = u.getLocation().getLat();
+          lon = u.getLocation().getLon();
+        }
+
+        return (lon != null && lon > 150.0 && lat != null) || "alice".equals(name);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
new file mode 100644
index 0000000..4e3fc7c
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -0,0 +1,325 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.statisticslevel;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop;
+
+public class TestStatisticsFilter {
+
+  private static ColumnChunkMetaData getIntColumnMeta(IntStatistics stats, long valueCount) {
+    return ColumnChunkMetaData.get(ColumnPath.get("int", "column"),
+        PrimitiveTypeName.INT32,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        0L, 0L, valueCount, 0L, 0L);
+  }
+
+  private static ColumnChunkMetaData getDoubleColumnMeta(DoubleStatistics stats, long valueCount) {
+    return ColumnChunkMetaData.get(ColumnPath.get("double", "column"),
+        PrimitiveTypeName.DOUBLE,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        0L, 0L, valueCount, 0L, 0L);
+  }
+
+  private static final IntColumn intColumn = intColumn("int.column");
+  private static final DoubleColumn doubleColumn = doubleColumn("double.column");
+
+  private static final IntStatistics intStats = new IntStatistics();
+  private static final IntStatistics nullIntStats = new IntStatistics();
+  private static final DoubleStatistics doubleStats = new DoubleStatistics();
+
+  static {
+    intStats.setMinMax(10, 100);
+    doubleStats.setMinMax(10, 100);
+
+    nullIntStats.setMinMax(0, 0);
+    nullIntStats.setNumNulls(177);
+  }
+
+  private static final List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+      getIntColumnMeta(intStats, 177L),
+      getDoubleColumnMeta(doubleStats, 177L));
+
+  private static final List<ColumnChunkMetaData> nullColumnMetas = Arrays.asList(
+      getIntColumnMeta(nullIntStats, 177L), // column of all nulls
+      getDoubleColumnMeta(doubleStats, 177L));
+
+
+  @Test
+  public void testEqNonNull() {
+    assertTrue(canDrop(eq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(eq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(eq(intColumn, 100), columnMetas));
+    assertTrue(canDrop(eq(intColumn, 101), columnMetas));
+
+    // drop columns of all nulls when looking for non-null value
+    assertTrue(canDrop(eq(intColumn, 0), nullColumnMetas));
+  }
+
+  @Test
+  public void testEqNull() {
+    IntStatistics statsNoNulls = new IntStatistics();
+    statsNoNulls.setMinMax(10, 100);
+    statsNoNulls.setNumNulls(0);
+
+    IntStatistics statsSomeNulls = new IntStatistics();
+    statsSomeNulls.setMinMax(10, 100);
+    statsSomeNulls.setNumNulls(3);
+
+    assertTrue(canDrop(eq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsNoNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(eq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsSomeNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+  }
+
+  @Test
+  public void testNotEqNonNull() {
+    assertFalse(canDrop(notEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 100), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 101), columnMetas));
+
+    IntStatistics allSevens = new IntStatistics();
+    allSevens.setMinMax(7, 7);
+    assertTrue(canDrop(notEq(intColumn, 7), Arrays.asList(
+        getIntColumnMeta(allSevens, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+  }
+
+  @Test
+  public void testNotEqNull() {
+    IntStatistics statsNoNulls = new IntStatistics();
+    statsNoNulls.setMinMax(10, 100);
+    statsNoNulls.setNumNulls(0);
+
+    IntStatistics statsSomeNulls = new IntStatistics();
+    statsSomeNulls.setMinMax(10, 100);
+    statsSomeNulls.setNumNulls(3);
+
+    IntStatistics statsAllNulls = new IntStatistics();
+    statsAllNulls.setMinMax(0, 0);
+    statsAllNulls.setNumNulls(177);
+
+    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsNoNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsSomeNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertTrue(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsAllNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+  }
+
+  @Test
+  public void testLt() {
+    assertTrue(canDrop(lt(intColumn, 9), columnMetas));
+    assertTrue(canDrop(lt(intColumn, 10), columnMetas));
+    assertFalse(canDrop(lt(intColumn, 100), columnMetas));
+    assertFalse(canDrop(lt(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(lt(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(lt(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testLtEq() {
+    assertTrue(canDrop(ltEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 100), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(ltEq(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(ltEq(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testGt() {
+    assertFalse(canDrop(gt(intColumn, 9), columnMetas));
+    assertFalse(canDrop(gt(intColumn, 10), columnMetas));
+    assertTrue(canDrop(gt(intColumn, 100), columnMetas));
+    assertTrue(canDrop(gt(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(gt(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(gt(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testGtEq() {
+    assertFalse(canDrop(gtEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(gtEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(gtEq(intColumn, 100), columnMetas));
+    assertTrue(canDrop(gtEq(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(gtEq(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(gtEq(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testAnd() {
+    FilterPredicate yes = eq(intColumn, 9);
+    FilterPredicate no = eq(doubleColumn, 50D);
+    assertTrue(canDrop(and(yes, yes), columnMetas));
+    assertTrue(canDrop(and(yes, no), columnMetas));
+    assertTrue(canDrop(and(no, yes), columnMetas));
+    assertFalse(canDrop(and(no, no), columnMetas));
+  }
+
+  @Test
+  public void testOr() {
+    FilterPredicate yes = eq(intColumn, 9);
+    FilterPredicate no = eq(doubleColumn, 50D);
+    assertTrue(canDrop(or(yes, yes), columnMetas));
+    assertFalse(canDrop(or(yes, no), columnMetas));
+    assertFalse(canDrop(or(no, yes), columnMetas));
+    assertFalse(canDrop(or(no, no), columnMetas));
+  }
+
+  public static class SevensAndEightsUdp extends UserDefinedPredicate<Integer> {
+
+    @Override
+    public boolean keep(Integer value) {
+      throw new RuntimeException("this method should not be called");
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Integer> statistics) {
+      return statistics.getMin() == 7 && statistics.getMax() == 7;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Integer> statistics) {
+      return statistics.getMin() == 8 && statistics.getMax() == 8;
+    }
+  }
+
+  @Test
+  public void testUdp() {
+    FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class);
+    FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class)));
+
+    IntStatistics seven = new IntStatistics();
+    seven.setMinMax(7, 7);
+
+    IntStatistics eight = new IntStatistics();
+    eight.setMinMax(8, 8);
+
+    IntStatistics neither = new IntStatistics();
+    neither.setMinMax(1 , 2);
+
+    assertTrue(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(seven, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(eight, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(neither, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(seven, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertTrue(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(eight, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(neither, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+  }
+
+  @Test
+  public void testClearExceptionForNots() {
+    List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+        getDoubleColumnMeta(new DoubleStatistics(), 0L),
+        getIntColumnMeta(new IntStatistics(), 0L));
+
+    FilterPredicate pred = and(not(eq(doubleColumn, 12.0)), eq(intColumn, 17));
+
+    try {
+      canDrop(pred, columnMetas);
+      fail("This should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter?"
+          + " not(eq(double.column, 12.0))", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMissingColumn() {
+    List<ColumnChunkMetaData> columnMetas = Arrays.asList(getIntColumnMeta(new IntStatistics(), 0L));
+    try {
+      canDrop(and(eq(doubleColumn, 12.0), eq(intColumn, 17)), columnMetas);
+      fail("This should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Column double.column not found in schema!", e.getMessage());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
new file mode 100644
index 0000000..b4f56fa
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -0,0 +1,255 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.format.converter;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.format.CompressionCodec.UNCOMPRESSED;
+import static org.apache.parquet.format.Type.INT32;
+import static org.apache.parquet.format.Util.readPageHeader;
+import static org.apache.parquet.format.Util.writePageHeader;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.getOffset;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.example.Paper;
+import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.ConvertedType;
+import org.apache.parquet.format.FieldRepetitionType;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.RowGroup;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.Type;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import com.google.common.collect.Lists;
+
+public class TestParquetMetadataConverter {
+
+  @Test
+  public void testPageHeader() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    PageType type = PageType.DATA_PAGE;
+    int compSize = 10;
+    int uncSize = 20;
+    PageHeader pageHeader = new PageHeader(type, uncSize, compSize);
+    writePageHeader(pageHeader, out);
+    PageHeader readPageHeader = readPageHeader(new ByteArrayInputStream(out.toByteArray()));
+    assertEquals(pageHeader, readPageHeader);
+  }
+
+  @Test
+  public void testSchemaConverter() {
+    ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+    List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema);
+    MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema);
+    assertEquals(Paper.schema, schema);
+  }
+
+  @Test
+  public void testSchemaConverterDecimal() {
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    List<SchemaElement> schemaElements = converter.toParquetSchema(
+        Types.buildMessage()
+            .required(PrimitiveTypeName.BINARY)
+                .as(OriginalType.DECIMAL).precision(9).scale(2)
+                .named("aBinaryDecimal")
+            .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4)
+                .as(OriginalType.DECIMAL).precision(9).scale(2)
+                .named("aFixedDecimal")
+            .named("Message")
+    );
+    List<SchemaElement> expected = Lists.newArrayList(
+        new SchemaElement("Message").setNum_children(2),
+        new SchemaElement("aBinaryDecimal")
+            .setRepetition_type(FieldRepetitionType.REQUIRED)
+            .setType(Type.BYTE_ARRAY)
+            .setConverted_type(ConvertedType.DECIMAL)
+            .setPrecision(9).setScale(2),
+        new SchemaElement("aFixedDecimal")
+            .setRepetition_type(FieldRepetitionType.OPTIONAL)
+            .setType(Type.FIXED_LEN_BYTE_ARRAY)
+            .setType_length(4)
+            .setConverted_type(ConvertedType.DECIMAL)
+            .setPrecision(9).setScale(2)
+    );
+    Assert.assertEquals(expected, schemaElements);
+  }
+
+  @Test
+  public void testEnumEquivalence() {
+    ParquetMetadataConverter c = new ParquetMetadataConverter();
+    for (Encoding encoding : Encoding.values()) {
+      assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
+    }
+    for (org.apache.parquet.format.Encoding encoding : org.apache.parquet.format.Encoding.values()) {
+      assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
+    }
+    for (Repetition repetition : Repetition.values()) {
+      assertEquals(repetition, c.fromParquetRepetition(c.toParquetRepetition(repetition)));
+    }
+    for (FieldRepetitionType repetition : FieldRepetitionType.values()) {
+      assertEquals(repetition, c.toParquetRepetition(c.fromParquetRepetition(repetition)));
+    }
+    for (PrimitiveTypeName primitiveTypeName : PrimitiveTypeName.values()) {
+      assertEquals(primitiveTypeName, c.getPrimitive(c.getType(primitiveTypeName)));
+    }
+    for (Type type : Type.values()) {
+      assertEquals(type, c.getType(c.getPrimitive(type)));
+    }
+    for (OriginalType original : OriginalType.values()) {
+      assertEquals(original, c.getOriginalType(c.getConvertedType(original)));
+    }
+    for (ConvertedType converted : ConvertedType.values()) {
+      assertEquals(converted, c.getConvertedType(c.getOriginalType(converted)));
+    }
+  }
+
+  private FileMetaData metadata(long... sizes) {
+    List<SchemaElement> schema = emptyList();
+    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
+    long offset = 0;
+    for (long size : sizes) {
+      ColumnChunk columnChunk = new ColumnChunk(offset);
+      columnChunk.setMeta_data(new ColumnMetaData(
+          INT32,
+          Collections.<org.apache.parquet.format.Encoding>emptyList(),
+          Collections.<String>emptyList(),
+          UNCOMPRESSED, 10l, size * 2, size, offset));
+      rowGroups.add(new RowGroup(Arrays.asList(columnChunk), size, 1));
+      offset += size;
+    }
+    return new FileMetaData(1, schema, sizes.length, rowGroups);
+  }
+
+  private FileMetaData filter(FileMetaData md, long start, long end) {
+    return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
+  }
+
+  private void verifyMD(FileMetaData md, long... offsets) {
+    assertEquals(offsets.length, md.row_groups.size());
+    for (int i = 0; i < offsets.length; i++) {
+      long offset = offsets[i];
+      RowGroup rowGroup = md.getRow_groups().get(i);
+      assertEquals(offset, getOffset(rowGroup));
+    }
+  }
+
+  /**
+   * verifies that splits will end up being a partition of the rowgroup
+   * they are all found only once
+   * @param md
+   * @param splitWidth
+   */
+  private void verifyAllFilters(FileMetaData md, long splitWidth) {
+    Set<Long> offsetsFound = new TreeSet<Long>();
+    for (long start = 0; start < fileSize(md); start += splitWidth) {
+      FileMetaData filtered = filter(md, start, start + splitWidth);
+      for (RowGroup rg : filtered.getRow_groups()) {
+        long o = getOffset(rg);
+        if (offsetsFound.contains(o)) {
+          fail("found the offset twice: " + o);
+        } else {
+          offsetsFound.add(o);
+        }
+      }
+    }
+    if (offsetsFound.size() != md.row_groups.size()) {
+      fail("missing row groups, "
+          + "found: " + offsetsFound
+          + "\nexpected " + md.getRow_groups());
+    }
+  }
+
+  private long fileSize(FileMetaData md) {
+    long size = 0;
+    for (RowGroup rg : md.getRow_groups()) {
+      size += rg.total_byte_size;
+    }
+    return size;
+  }
+
+  @Test
+  public void testFilterMetaData() {
+    verifyMD(filter(metadata(50, 50, 50), 0, 50), 0);
+    verifyMD(filter(metadata(50, 50, 50), 50, 100), 50);
+    verifyMD(filter(metadata(50, 50, 50), 100, 150), 100);
+    // picks up first RG
+    verifyMD(filter(metadata(50, 50, 50), 25, 75), 0);
+    // picks up no RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 75));
+    // picks up second RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 76), 50);
+
+    verifyAllFilters(metadata(50, 50, 50), 10);
+    verifyAllFilters(metadata(50, 50, 50), 51);
+    verifyAllFilters(metadata(50, 50, 50), 25); // corner cases are in the middle
+    verifyAllFilters(metadata(50, 50, 50), 24);
+    verifyAllFilters(metadata(50, 50, 50), 26);
+    verifyAllFilters(metadata(50, 50, 50), 110);
+    verifyAllFilters(metadata(10, 50, 500), 110);
+    verifyAllFilters(metadata(10, 50, 500), 10);
+    verifyAllFilters(metadata(10, 50, 500), 600);
+    verifyAllFilters(metadata(11, 9, 10), 10);
+    verifyAllFilters(metadata(11, 9, 10), 9);
+    verifyAllFilters(metadata(11, 9, 10), 8);
+  }
+
+  @Test
+  public void randomTestFilterMetaData() {
+    // randomized property based testing
+    // if it fails add the case above
+    Random random = new Random(System.currentTimeMillis());
+    for (int j = 0; j < 100; j++) {
+      long[] rgs = new long[random.nextInt(50)];
+      for (int i = 0; i < rgs.length; i++) {
+        rgs[i] = random.nextInt(10000) + 1; // No empty row groups
+      }
+      int splitSize = random.nextInt(10000);
+      try {
+        verifyAllFilters(metadata(rgs), splitSize);
+      } catch (AssertionError e) {
+	  throw (AssertionError) new AssertionError("fail verifyAllFilters(metadata(" + Arrays.toString(rgs) + "), " + splitSize + ")").initCause(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
new file mode 100644
index 0000000..e293483
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -0,0 +1,182 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.IOException;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
+ * different parameters than ParquetInputFormat
+ * @author Tianshuo Deng
+ */
+public class DeprecatedInputFormatTest {
+  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
+  final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
+  Job writeJob;
+  JobConf jobConf;
+  RunningJob mapRedJob;
+  private String writeSchema;
+  private String readSchema;
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    jobConf = new JobConf();
+    writeSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "required binary content;\n" +
+            "}";
+
+    readSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "required binary content;\n" +
+            "}";
+  }
+
+  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      writeJob = new Job(conf, "write");
+      TextInputFormat.addInputPath(writeJob, inputPath);
+      writeJob.setInputFormatClass(TextInputFormat.class);
+      writeJob.setNumReduceTasks(0);
+      ExampleOutputFormat.setCompression(writeJob, codec);
+      ExampleOutputFormat.setOutputPath(writeJob, parquetPath);
+      writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+      writeJob.setMapperClass(ReadMapper.class);
+      ExampleOutputFormat.setSchema(
+              writeJob,
+              MessageTypeParser.parseMessageType(
+                      writeSchema));
+      writeJob.submit();
+      waitForJob(writeJob);
+    }
+    {
+      jobConf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
+      jobConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, GroupReadSupport.class.getCanonicalName());
+      jobConf.setInputFormat(MyDeprecatedInputFormat.class);
+      MyDeprecatedInputFormat.setInputPaths(jobConf, parquetPath);
+      jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
+      org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(jobConf, outputPath);
+      jobConf.setMapperClass(DeprecatedWriteMapper.class);
+      jobConf.setNumReduceTasks(0);
+      mapRedJob = JobClient.runJob(jobConf);
+    }
+  }
+
+  @Test
+  public void testReadWriteWithCountDeprecated() throws Exception {
+    runMapReduceJob(CompressionCodecName.GZIP);
+    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue() > 0L);
+    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue() > 0L);
+    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue()
+            == mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue());
+    //not testing the time read counter since it could be zero due to the size of data is too small
+  }
+
+  @Test
+  public void testReadWriteWithoutCounter() throws Exception {
+    jobConf.set("parquet.benchmark.time.read", "false");
+    jobConf.set("parquet.benchmark.bytes.total", "false");
+    jobConf.set("parquet.benchmark.bytes.read", "false");
+    runMapReduceJob(CompressionCodecName.GZIP);
+    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue(), 0L);
+    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue(), 0L);
+    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("timeread").getValue(), 0L);
+  }
+
+  private void waitForJob(Job job) throws InterruptedException, IOException {
+    while (!job.isComplete()) {
+      System.out.println("waiting for job " + job.getJobName());
+      sleep(100);
+    }
+    System.out.println("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+
+  public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
+    private SimpleGroupFactory factory;
+
+    protected void setup(Context context) throws IOException, InterruptedException {
+      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
+    }
+
+    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+      Group group = factory.newGroup()
+              .append("line", (int) key.get())
+              .append("content", value.toString());
+      context.write(null, group);
+    }
+  }
+
+  public static class DeprecatedWriteMapper implements org.apache.hadoop.mapred.Mapper<Void, Container<Group>, LongWritable, Text> {
+
+    @Override
+    public void map(Void aVoid, Container<Group> valueContainer, OutputCollector<LongWritable, Text> longWritableTextOutputCollector, Reporter reporter) throws IOException {
+      Group value = valueContainer.get();
+      longWritableTextOutputCollector.collect(new LongWritable(value.getInteger("line", 0)), new Text(value.getString("content", 0)));
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf entries) {
+    }
+  }
+
+  static class MyDeprecatedInputFormat extends DeprecatedParquetInputFormat<Group> {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
new file mode 100644
index 0000000..73ae131
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
@@ -0,0 +1,109 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.IOException;
+
+/**
+ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
+ * different parameters than ParquetInputFormat
+ * @author Tianshuo Deng
+ */
+public class DeprecatedOutputFormatTest {
+  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
+  final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
+  JobConf jobConf;
+  RunningJob mapRedJob;
+  private String writeSchema;
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    jobConf = new JobConf();
+    writeSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "required binary content;\n" +
+            "}";
+  }
+
+  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      jobConf.setInputFormat(TextInputFormat.class);
+      TextInputFormat.addInputPath(jobConf, inputPath);
+      jobConf.setNumReduceTasks(0);
+
+      jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+      DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
+      DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
+      DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, GroupWriteSupport.class);
+      GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), jobConf);
+
+      jobConf.setMapperClass(DeprecatedMapper.class);
+      mapRedJob = JobClient.runJob(jobConf);
+    }
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    runMapReduceJob(CompressionCodecName.GZIP);
+    assert(mapRedJob.isSuccessful());
+  }
+
+  public static class DeprecatedMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, Void, Group> {
+    private SimpleGroupFactory factory;
+
+    public void configure(JobConf job) {
+      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, OutputCollector<Void, Group> outputCollector, Reporter reporter) throws IOException {
+      Group group = factory.newGroup()
+              .append("line", (int) key.get())
+              .append("content", value.toString());
+      outputCollector.collect(null, group);
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..c8b3778
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.Types;
+
+public class TestColumnChunkPageWriteStore {
+
+  private int pageSize = 1024;
+  private int initialSize = 1024;
+  private Configuration conf;
+
+  @Before
+  public void initConfiguration() {
+    this.conf = new Configuration();
+  }
+
+  @Test
+  public void test() throws Exception {
+    Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
+    Path root = file.getParent();
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(root)) {
+      fs.delete(root, true);
+    }
+    fs.mkdirs(root);
+    MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    Encoding dataEncoding = PLAIN;
+    int valueCount = 10;
+    int d = 1;
+    int r = 2;
+    int v = 3;
+    BytesInput definitionLevels = BytesInput.fromInt(d);
+    BytesInput repetitionLevels = BytesInput.fromInt(r);
+    Statistics<?> statistics = new BinaryStatistics();
+    BytesInput data = BytesInput.fromInt(v);
+    int rowCount = 5;
+    int nullCount = 1;
+
+    {
+      ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+      writer.start();
+      writer.startBlock(rowCount);
+      {
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
+        PageWriter pageWriter = store.getPageWriter(col);
+        pageWriter.writePageV2(
+            rowCount, nullCount, valueCount,
+            repetitionLevels, definitionLevels,
+            dataEncoding, data,
+            statistics);
+        store.flushToFileWriter(writer);
+      }
+      writer.endBlock();
+      writer.end(new HashMap<String, String>());
+    }
+
+    {
+      ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
+      ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
+      PageReadStore rowGroup = reader.readNextRowGroup();
+      PageReader pageReader = rowGroup.getPageReader(col);
+      DataPageV2 page = (DataPageV2)pageReader.readPage();
+      assertEquals(rowCount, page.getRowCount());
+      assertEquals(nullCount, page.getNullCount());
+      assertEquals(valueCount, page.getValueCount());
+      assertEquals(d, intValue(page.getDefinitionLevels()));
+      assertEquals(r, intValue(page.getRepetitionLevels()));
+      assertEquals(dataEncoding, page.getDataEncoding());
+      assertEquals(v, intValue(page.getData()));
+      assertEquals(statistics.toString(), page.getStatistics().toString());
+      reader.close();
+    }
+  }
+
+  private int intValue(BytesInput in) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    in.writeAllTo(baos);
+    LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    int i = os.readInt();
+    os.close();
+    return i;
+  }
+
+  @Test
+  public void testColumnOrderV1() throws IOException {
+    ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
+    InOrder inOrder = inOrder(mockFileWriter);
+    MessageType schema = Types.buildMessage()
+        .required(BINARY).as(UTF8).named("a_string")
+        .required(INT32).named("an_int")
+        .required(INT64).named("a_long")
+        .required(FLOAT).named("a_float")
+        .required(DOUBLE).named("a_double")
+        .named("order_test");
+
+    BytesInput fakeData = BytesInput.fromInt(34);
+    int fakeCount = 3;
+    BinaryStatistics fakeStats = new BinaryStatistics();
+
+    ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
+        compressor(UNCOMPRESSED), schema, initialSize);
+
+    for (ColumnDescriptor col : schema.getColumns()) {
+      PageWriter pageWriter = store.getPageWriter(col);
+      pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
+    }
+
+    // flush to the mock writer
+    store.flushToFileWriter(mockFileWriter);
+
+    for (ColumnDescriptor col : schema.getColumns()) {
+      inOrder.verify(mockFileWriter).startColumn(
+          eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+    }
+  }
+
+  private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
+    return new CodecFactory(conf).getCompressor(codec, pageSize);
+  }
+}


Mime
View raw message