http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
deleted file mode 100644
index 75ec1a2..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop.util.counters.mapreduce;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.hadoop.util.counters.CounterLoader;
-import parquet.hadoop.util.counters.ICounter;
-
-/**
- * Concrete factory for counters in mapred API,
- * get a counter using mapreduce API when the corresponding flag is set, otherwise return a NullCounter
- * @author Tianshuo Deng
- */
-public class MapReduceCounterLoader implements CounterLoader {
- private TaskInputOutputContext<?, ?, ?, ?> context;
-
- public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
- this.context = context;
- }
-
- @Override
- public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
- if (ContextUtil.getConfiguration(context).getBoolean(counterFlag, true)) {
- return new MapReduceCounterAdapter(ContextUtil.getCounter(context, groupName, counterName));
- } else {
- return new BenchmarkCounter.NullCounter();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
new file mode 100644
index 0000000..14877ab
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.hadoop.TestInputFormat.makeBlockFromStats;
+
+public class TestRowGroupFilter {
+ @Test
+ public void testApplyRowGroupFilters() {
+
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+ IntStatistics stats1 = new IntStatistics();
+ stats1.setMinMax(10, 100);
+ stats1.setNumNulls(4);
+ BlockMetaData b1 = makeBlockFromStats(stats1, 301);
+ blocks.add(b1);
+
+ IntStatistics stats2 = new IntStatistics();
+ stats2.setMinMax(8, 102);
+ stats2.setNumNulls(0);
+ BlockMetaData b2 = makeBlockFromStats(stats2, 302);
+ blocks.add(b2);
+
+ IntStatistics stats3 = new IntStatistics();
+ stats3.setMinMax(100, 102);
+ stats3.setNumNulls(12);
+ BlockMetaData b3 = makeBlockFromStats(stats3, 303);
+ blocks.add(b3);
+
+
+ IntStatistics stats4 = new IntStatistics();
+ stats4.setMinMax(0, 0);
+ stats4.setNumNulls(304);
+ BlockMetaData b4 = makeBlockFromStats(stats4, 304);
+ blocks.add(b4);
+
+
+ IntStatistics stats5 = new IntStatistics();
+ stats5.setMinMax(50, 50);
+ stats5.setNumNulls(7);
+ BlockMetaData b5 = makeBlockFromStats(stats5, 305);
+ blocks.add(b5);
+
+ IntStatistics stats6 = new IntStatistics();
+ stats6.setMinMax(0, 0);
+ stats6.setNumNulls(12);
+ BlockMetaData b6 = makeBlockFromStats(stats6, 306);
+ blocks.add(b6);
+
+ MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
+ IntColumn foo = intColumn("foo");
+
+ List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
+ assertEquals(Arrays.asList(b1, b2, b5), filtered);
+
+ filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), blocks, schema);
+ assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+ filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), blocks, schema);
+ assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);
+
+ filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), blocks, schema);
+ assertEquals(Arrays.asList(b1, b2, b3, b5, b6), filtered);
+
+ filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), blocks, schema);
+ assertEquals(Arrays.asList(b6), filtered);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
new file mode 100644
index 0000000..7acda93
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class PhoneBookWriter {
+ private static final String schemaString =
+ "message user {\n"
+ + " required int64 id;\n"
+ + " optional binary name (UTF8);\n"
+ + " optional group location {\n"
+ + " optional double lon;\n"
+ + " optional double lat;\n"
+ + " }\n"
+ + " optional group phoneNumbers {\n"
+ + " repeated group phone {\n"
+ + " required int64 number;\n"
+ + " optional binary kind (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + "}\n";
+
+ private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+ public static class Location {
+ private final Double lon;
+ private final Double lat;
+
+ public Location(Double lon, Double lat) {
+ this.lon = lon;
+ this.lat = lat;
+ }
+
+ public Double getLon() {
+ return lon;
+ }
+
+ public Double getLat() {
+ return lat;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Location location = (Location) o;
+
+ if (lat != null ? !lat.equals(location.lat) : location.lat != null) return false;
+ if (lon != null ? !lon.equals(location.lon) : location.lon != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = lon != null ? lon.hashCode() : 0;
+ result = 31 * result + (lat != null ? lat.hashCode() : 0);
+ return result;
+ }
+ }
+
+ public static class PhoneNumber {
+ private final long number;
+ private final String kind;
+
+ public PhoneNumber(long number, String kind) {
+ this.number = number;
+ this.kind = kind;
+ }
+
+ public long getNumber() {
+ return number;
+ }
+
+ public String getKind() {
+ return kind;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PhoneNumber that = (PhoneNumber) o;
+
+ if (number != that.number) return false;
+ if (kind != null ? !kind.equals(that.kind) : that.kind != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (number ^ (number >>> 32));
+ result = 31 * result + (kind != null ? kind.hashCode() : 0);
+ return result;
+ }
+ }
+
+ public static class User {
+ private final long id;
+ private final String name;
+ private final List<PhoneNumber> phoneNumbers;
+ private final Location location;
+
+ public User(long id, String name, List<PhoneNumber> phoneNumbers, Location location) {
+ this.id = id;
+ this.name = name;
+ this.phoneNumbers = phoneNumbers;
+ this.location = location;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<PhoneNumber> getPhoneNumbers() {
+ return phoneNumbers;
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ User user = (User) o;
+
+ if (id != user.id) return false;
+ if (location != null ? !location.equals(user.location) : user.location != null) return false;
+ if (name != null ? !name.equals(user.name) : user.name != null) return false;
+ if (phoneNumbers != null ? !phoneNumbers.equals(user.phoneNumbers) : user.phoneNumbers != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (id ^ (id >>> 32));
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (phoneNumbers != null ? phoneNumbers.hashCode() : 0);
+ result = 31 * result + (location != null ? location.hashCode() : 0);
+ return result;
+ }
+ }
+
+ public static SimpleGroup groupFromUser(User user) {
+ SimpleGroup root = new SimpleGroup(schema);
+ root.append("id", user.getId());
+
+ if (user.getName() != null) {
+ root.append("name", user.getName());
+ }
+
+ if (user.getPhoneNumbers() != null) {
+ Group phoneNumbers = root.addGroup("phoneNumbers");
+ for (PhoneNumber number : user.getPhoneNumbers()) {
+ Group phone = phoneNumbers.addGroup("phone");
+ phone.append("number", number.getNumber());
+ if (number.getKind() != null) {
+ phone.append("kind", number.getKind());
+ }
+ }
+ }
+
+ if (user.getLocation() != null) {
+ Group location = root.addGroup("location");
+ if (user.getLocation().getLon() != null) {
+ location.append("lon", user.getLocation().getLon());
+ }
+ if (user.getLocation().getLat() != null) {
+ location.append("lat", user.getLocation().getLat());
+ }
+ }
+ return root;
+ }
+
+ public static File writeToFile(List<User> users) throws IOException {
+ File f = File.createTempFile("phonebook", ".parquet");
+ f.deleteOnExit();
+ if (!f.delete()) {
+ throw new IOException("couldn't delete tmp file" + f);
+ }
+
+ writeToFile(f, users);
+
+ return f;
+ }
+
+ public static void writeToFile(File f, List<User> users) throws IOException {
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
+ for (User u : users) {
+ writer.write(groupFromUser(u));
+ }
+ writer.close();
+ }
+
+ public static List<Group> readFile(File f, Filter filter) throws IOException {
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ ParquetReader<Group> reader =
+ ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
+ .withConf(conf)
+ .withFilter(filter)
+ .build();
+
+ Group current;
+ List<Group> users = new ArrayList<Group>();
+
+ current = reader.read();
+ while (current != null) {
+ users.add(current);
+ current = reader.read();
+ }
+
+ return users;
+ }
+
+ public static void main(String[] args) throws IOException {
+ File f = new File(args[0]);
+ writeToFile(f, TestRecordLevelFilters.makeUsers());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
new file mode 100644
index 0000000..5a7d02f
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.HashSet;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+
+public class TestRecordLevelFilters {
+
+ public static List<User> makeUsers() {
+ List<User> users = new ArrayList<User>();
+
+ users.add(new User(17, null, null, null));
+
+ users.add(new User(18, "bob", null, null));
+
+ users.add(new User(19, "alice", new ArrayList<PhoneNumber>(), null));
+
+ users.add(new User(20, "thing1", Arrays.asList(new PhoneNumber(5555555555L, null)), null));
+
+ users.add(new User(27, "thing2", Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+ users.add(new User(28, "popular", Arrays.asList(
+ new PhoneNumber(1111111111L, "home"),
+ new PhoneNumber(2222222222L, null),
+ new PhoneNumber(3333333333L, "mobile")
+ ), null));
+
+ users.add(new User(30, null, Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+ for (int i = 100; i < 200; i++) {
+ Location location = null;
+ if (i % 3 == 1) {
+ location = new Location((double)i, (double)i*2);
+ }
+ if (i % 3 == 2) {
+ location = new Location((double)i, null);
+ }
+ users.add(new User(i, "p" + i, Arrays.asList(new PhoneNumber(i, "cell")), location));
+ }
+
+ return users;
+ }
+
+ private static File phonebookFile;
+ private static List<User> users;
+
+ @BeforeClass
+ public static void setup() throws IOException{
+ users = makeUsers();
+ phonebookFile = PhoneBookWriter.writeToFile(users);
+ }
+
+ private static interface UserFilter {
+ boolean keep(User u);
+ }
+
+ private static List<Group> getExpected(UserFilter f) {
+ List<Group> expected = new ArrayList<Group>();
+ for (User u : users) {
+ if (f.keep(u)) {
+ expected.add(PhoneBookWriter.groupFromUser(u));
+ }
+ }
+ return expected;
+ }
+
+ private static void assertFilter(List<Group> found, UserFilter f) {
+ List<Group> expected = getExpected(f);
+ assertEquals(expected.size(), found.size());
+ Iterator<Group> expectedIter = expected.iterator();
+ Iterator<Group> foundIter = found.iterator();
+ while(expectedIter.hasNext()) {
+ assertEquals(expectedIter.next().toString(), foundIter.next().toString());
+ }
+ }
+
+ @Test
+ public void testNoFilter() throws Exception {
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.NOOP);
+ assertFilter(found, new UserFilter() {
+ @Override
+ public boolean keep(User u) {
+ return true;
+ }
+ });
+ }
+
+ @Test
+ public void testAllFilter() throws Exception {
+ BinaryColumn name = binaryColumn("name");
+
+ FilterPredicate pred = eq(name, Binary.fromString("no matches"));
+
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+ assertEquals(new ArrayList<Group>(), found);
+ }
+
+ @Test
+ public void testNameNotNull() throws Exception {
+ BinaryColumn name = binaryColumn("name");
+
+ FilterPredicate pred = notEq(name, null);
+
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+ assertFilter(found, new UserFilter() {
+ @Override
+ public boolean keep(User u) {
+ return u.getName() != null;
+ }
+ });
+ }
+
+ public static class StartWithP extends UserDefinedPredicate<Binary> {
+
+ @Override
+ public boolean keep(Binary value) {
+ if (value == null) {
+ return false;
+ }
+ return value.toStringUsingUTF8().startsWith("p");
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Binary> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Binary> statistics) {
+ return false;
+ }
+ }
+
+ public static class SetInFilter extends UserDefinedPredicate<Long> implements Serializable {
+
+ private HashSet<Long> hSet;
+
+ public SetInFilter(HashSet<Long> phSet) {
+ hSet = phSet;
+ }
+
+ @Override
+ public boolean keep(Long value) {
+ if (value == null) {
+ return false;
+ }
+
+ return hSet.contains(value);
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ return false;
+ }
+ }
+
+ @Test
+ public void testNameNotStartWithP() throws Exception {
+ BinaryColumn name = binaryColumn("name");
+
+ FilterPredicate pred = not(userDefined(name, StartWithP.class));
+
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+ assertFilter(found, new UserFilter() {
+ @Override
+ public boolean keep(User u) {
+ return u.getName() == null || !u.getName().startsWith("p");
+ }
+ });
+ }
+
+ @Test
+ public void testUserDefinedByInstance() throws Exception {
+ LongColumn name = longColumn("id");
+
+ final HashSet<Long> h = new HashSet<Long>();
+ h.add(20L);
+ h.add(27L);
+ h.add(28L);
+
+ FilterPredicate pred = userDefined(name, new SetInFilter(h));
+
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+ assertFilter(found, new UserFilter() {
+ @Override
+ public boolean keep(User u) {
+ return u != null && h.contains(u.getId());
+ }
+ });
+ }
+
+ @Test
+ public void testComplex() throws Exception {
+ BinaryColumn name = binaryColumn("name");
+ DoubleColumn lon = doubleColumn("location.lon");
+ DoubleColumn lat = doubleColumn("location.lat");
+
+ FilterPredicate pred = or(and(gt(lon, 150.0), notEq(lat, null)), eq(name, Binary.fromString("alice")));
+
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+ assertFilter(found, new UserFilter() {
+ @Override
+ public boolean keep(User u) {
+ String name = u.getName();
+ Double lat = null;
+ Double lon = null;
+ if (u.getLocation() != null) {
+ lat = u.getLocation().getLat();
+ lon = u.getLocation().getLon();
+ }
+
+ return (lon != null && lon > 150.0 && lat != null) || "alice".equals(name);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
new file mode 100644
index 0000000..4e3fc7c
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.statisticslevel;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop;
+
+public class TestStatisticsFilter {
+
+ private static ColumnChunkMetaData getIntColumnMeta(IntStatistics stats, long valueCount) {
+ return ColumnChunkMetaData.get(ColumnPath.get("int", "column"),
+ PrimitiveTypeName.INT32,
+ CompressionCodecName.GZIP,
+ new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+ stats,
+ 0L, 0L, valueCount, 0L, 0L);
+ }
+
+ private static ColumnChunkMetaData getDoubleColumnMeta(DoubleStatistics stats, long valueCount) {
+ return ColumnChunkMetaData.get(ColumnPath.get("double", "column"),
+ PrimitiveTypeName.DOUBLE,
+ CompressionCodecName.GZIP,
+ new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+ stats,
+ 0L, 0L, valueCount, 0L, 0L);
+ }
+
+ private static final IntColumn intColumn = intColumn("int.column");
+ private static final DoubleColumn doubleColumn = doubleColumn("double.column");
+
+ private static final IntStatistics intStats = new IntStatistics();
+ private static final IntStatistics nullIntStats = new IntStatistics();
+ private static final DoubleStatistics doubleStats = new DoubleStatistics();
+
+ static {
+ intStats.setMinMax(10, 100);
+ doubleStats.setMinMax(10, 100);
+
+ nullIntStats.setMinMax(0, 0);
+ nullIntStats.setNumNulls(177);
+ }
+
+ private static final List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+ getIntColumnMeta(intStats, 177L),
+ getDoubleColumnMeta(doubleStats, 177L));
+
+ private static final List<ColumnChunkMetaData> nullColumnMetas = Arrays.asList(
+ getIntColumnMeta(nullIntStats, 177L), // column of all nulls
+ getDoubleColumnMeta(doubleStats, 177L));
+
+
+ @Test
+ public void testEqNonNull() {
+ assertTrue(canDrop(eq(intColumn, 9), columnMetas));
+ assertFalse(canDrop(eq(intColumn, 10), columnMetas));
+ assertFalse(canDrop(eq(intColumn, 100), columnMetas));
+ assertTrue(canDrop(eq(intColumn, 101), columnMetas));
+
+ // drop columns of all nulls when looking for non-null value
+ assertTrue(canDrop(eq(intColumn, 0), nullColumnMetas));
+ }
+
+ @Test
+ public void testEqNull() {
+ IntStatistics statsNoNulls = new IntStatistics();
+ statsNoNulls.setMinMax(10, 100);
+ statsNoNulls.setNumNulls(0);
+
+ IntStatistics statsSomeNulls = new IntStatistics();
+ statsSomeNulls.setMinMax(10, 100);
+ statsSomeNulls.setNumNulls(3);
+
+ assertTrue(canDrop(eq(intColumn, null), Arrays.asList(
+ getIntColumnMeta(statsNoNulls, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(eq(intColumn, null), Arrays.asList(
+ getIntColumnMeta(statsSomeNulls, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ }
+
+ @Test
+ public void testNotEqNonNull() {
+ assertFalse(canDrop(notEq(intColumn, 9), columnMetas));
+ assertFalse(canDrop(notEq(intColumn, 10), columnMetas));
+ assertFalse(canDrop(notEq(intColumn, 100), columnMetas));
+ assertFalse(canDrop(notEq(intColumn, 101), columnMetas));
+
+ IntStatistics allSevens = new IntStatistics();
+ allSevens.setMinMax(7, 7);
+ assertTrue(canDrop(notEq(intColumn, 7), Arrays.asList(
+ getIntColumnMeta(allSevens, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ }
+
+ @Test
+ public void testNotEqNull() {
+ IntStatistics statsNoNulls = new IntStatistics();
+ statsNoNulls.setMinMax(10, 100);
+ statsNoNulls.setNumNulls(0);
+
+ IntStatistics statsSomeNulls = new IntStatistics();
+ statsSomeNulls.setMinMax(10, 100);
+ statsSomeNulls.setNumNulls(3);
+
+ IntStatistics statsAllNulls = new IntStatistics();
+ statsAllNulls.setMinMax(0, 0);
+ statsAllNulls.setNumNulls(177);
+
+ assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+ getIntColumnMeta(statsNoNulls, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+ getIntColumnMeta(statsSomeNulls, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertTrue(canDrop(notEq(intColumn, null), Arrays.asList(
+ getIntColumnMeta(statsAllNulls, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+ }
+
+ @Test
+ public void testLt() {
+ assertTrue(canDrop(lt(intColumn, 9), columnMetas));
+ assertTrue(canDrop(lt(intColumn, 10), columnMetas));
+ assertFalse(canDrop(lt(intColumn, 100), columnMetas));
+ assertFalse(canDrop(lt(intColumn, 101), columnMetas));
+
+ assertTrue(canDrop(lt(intColumn, 0), nullColumnMetas));
+ assertTrue(canDrop(lt(intColumn, 7), nullColumnMetas));
+ }
+
+ @Test
+ public void testLtEq() {
+ assertTrue(canDrop(ltEq(intColumn, 9), columnMetas));
+ assertFalse(canDrop(ltEq(intColumn, 10), columnMetas));
+ assertFalse(canDrop(ltEq(intColumn, 100), columnMetas));
+ assertFalse(canDrop(ltEq(intColumn, 101), columnMetas));
+
+ assertTrue(canDrop(ltEq(intColumn, 0), nullColumnMetas));
+ assertTrue(canDrop(ltEq(intColumn, 7), nullColumnMetas));
+ }
+
+ @Test
+ public void testGt() {
+ assertFalse(canDrop(gt(intColumn, 9), columnMetas));
+ assertFalse(canDrop(gt(intColumn, 10), columnMetas));
+ assertTrue(canDrop(gt(intColumn, 100), columnMetas));
+ assertTrue(canDrop(gt(intColumn, 101), columnMetas));
+
+ assertTrue(canDrop(gt(intColumn, 0), nullColumnMetas));
+ assertTrue(canDrop(gt(intColumn, 7), nullColumnMetas));
+ }
+
+ @Test
+ public void testGtEq() {
+ assertFalse(canDrop(gtEq(intColumn, 9), columnMetas));
+ assertFalse(canDrop(gtEq(intColumn, 10), columnMetas));
+ assertFalse(canDrop(gtEq(intColumn, 100), columnMetas));
+ assertTrue(canDrop(gtEq(intColumn, 101), columnMetas));
+
+ assertTrue(canDrop(gtEq(intColumn, 0), nullColumnMetas));
+ assertTrue(canDrop(gtEq(intColumn, 7), nullColumnMetas));
+ }
+
+ @Test
+ public void testAnd() {
+ FilterPredicate yes = eq(intColumn, 9);
+ FilterPredicate no = eq(doubleColumn, 50D);
+ assertTrue(canDrop(and(yes, yes), columnMetas));
+ assertTrue(canDrop(and(yes, no), columnMetas));
+ assertTrue(canDrop(and(no, yes), columnMetas));
+ assertFalse(canDrop(and(no, no), columnMetas));
+ }
+
+ @Test
+ public void testOr() {
+ FilterPredicate yes = eq(intColumn, 9);
+ FilterPredicate no = eq(doubleColumn, 50D);
+ assertTrue(canDrop(or(yes, yes), columnMetas));
+ assertFalse(canDrop(or(yes, no), columnMetas));
+ assertFalse(canDrop(or(no, yes), columnMetas));
+ assertFalse(canDrop(or(no, no), columnMetas));
+ }
+
+ public static class SevensAndEightsUdp extends UserDefinedPredicate<Integer> {
+
+ @Override
+ public boolean keep(Integer value) {
+ throw new RuntimeException("this method should not be called");
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return statistics.getMin() == 7 && statistics.getMax() == 7;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return statistics.getMin() == 8 && statistics.getMax() == 8;
+ }
+ }
+
+ @Test
+ public void testUdp() {
+ FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class);
+ FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class)));
+
+ IntStatistics seven = new IntStatistics();
+ seven.setMinMax(7, 7);
+
+ IntStatistics eight = new IntStatistics();
+ eight.setMinMax(8, 8);
+
+ IntStatistics neither = new IntStatistics();
+ neither.setMinMax(1 , 2);
+
+ assertTrue(canDrop(pred, Arrays.asList(
+ getIntColumnMeta(seven, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(pred, Arrays.asList(
+ getIntColumnMeta(eight, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(pred, Arrays.asList(
+ getIntColumnMeta(neither, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(invPred, Arrays.asList(
+ getIntColumnMeta(seven, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertTrue(canDrop(invPred, Arrays.asList(
+ getIntColumnMeta(eight, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+
+ assertFalse(canDrop(invPred, Arrays.asList(
+ getIntColumnMeta(neither, 177L),
+ getDoubleColumnMeta(doubleStats, 177L))));
+ }
+
+ @Test
+ public void testClearExceptionForNots() {
+ List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+ getDoubleColumnMeta(new DoubleStatistics(), 0L),
+ getIntColumnMeta(new IntStatistics(), 0L));
+
+ FilterPredicate pred = and(not(eq(doubleColumn, 12.0)), eq(intColumn, 17));
+
+ try {
+ canDrop(pred, columnMetas);
+ fail("This should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter?"
+ + " not(eq(double.column, 12.0))", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMissingColumn() {
+ List<ColumnChunkMetaData> columnMetas = Arrays.asList(getIntColumnMeta(new IntStatistics(), 0L));
+ try {
+ canDrop(and(eq(doubleColumn, 12.0), eq(intColumn, 17)), columnMetas);
+ fail("This should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column double.column not found in schema!", e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
new file mode 100644
index 0000000..b4f56fa
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.format.converter;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.format.CompressionCodec.UNCOMPRESSED;
+import static org.apache.parquet.format.Type.INT32;
+import static org.apache.parquet.format.Util.readPageHeader;
+import static org.apache.parquet.format.Util.writePageHeader;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.getOffset;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.example.Paper;
+import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.ConvertedType;
+import org.apache.parquet.format.FieldRepetitionType;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.RowGroup;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.Type;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import com.google.common.collect.Lists;
+
+public class TestParquetMetadataConverter {
+
+ @Test
+ public void testPageHeader() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PageType type = PageType.DATA_PAGE;
+ int compSize = 10;
+ int uncSize = 20;
+ PageHeader pageHeader = new PageHeader(type, uncSize, compSize);
+ writePageHeader(pageHeader, out);
+ PageHeader readPageHeader = readPageHeader(new ByteArrayInputStream(out.toByteArray()));
+ assertEquals(pageHeader, readPageHeader);
+ }
+
+ @Test
+ public void testSchemaConverter() {
+ ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+ List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema);
+ MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema);
+ assertEquals(Paper.schema, schema);
+ }
+
+ @Test
+ public void testSchemaConverterDecimal() {
+ ParquetMetadataConverter converter = new ParquetMetadataConverter();
+ List<SchemaElement> schemaElements = converter.toParquetSchema(
+ Types.buildMessage()
+ .required(PrimitiveTypeName.BINARY)
+ .as(OriginalType.DECIMAL).precision(9).scale(2)
+ .named("aBinaryDecimal")
+ .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4)
+ .as(OriginalType.DECIMAL).precision(9).scale(2)
+ .named("aFixedDecimal")
+ .named("Message")
+ );
+ List<SchemaElement> expected = Lists.newArrayList(
+ new SchemaElement("Message").setNum_children(2),
+ new SchemaElement("aBinaryDecimal")
+ .setRepetition_type(FieldRepetitionType.REQUIRED)
+ .setType(Type.BYTE_ARRAY)
+ .setConverted_type(ConvertedType.DECIMAL)
+ .setPrecision(9).setScale(2),
+ new SchemaElement("aFixedDecimal")
+ .setRepetition_type(FieldRepetitionType.OPTIONAL)
+ .setType(Type.FIXED_LEN_BYTE_ARRAY)
+ .setType_length(4)
+ .setConverted_type(ConvertedType.DECIMAL)
+ .setPrecision(9).setScale(2)
+ );
+ Assert.assertEquals(expected, schemaElements);
+ }
+
+ @Test
+ public void testEnumEquivalence() {
+ ParquetMetadataConverter c = new ParquetMetadataConverter();
+ for (Encoding encoding : Encoding.values()) {
+ assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
+ }
+ for (org.apache.parquet.format.Encoding encoding : org.apache.parquet.format.Encoding.values()) {
+ assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
+ }
+ for (Repetition repetition : Repetition.values()) {
+ assertEquals(repetition, c.fromParquetRepetition(c.toParquetRepetition(repetition)));
+ }
+ for (FieldRepetitionType repetition : FieldRepetitionType.values()) {
+ assertEquals(repetition, c.toParquetRepetition(c.fromParquetRepetition(repetition)));
+ }
+ for (PrimitiveTypeName primitiveTypeName : PrimitiveTypeName.values()) {
+ assertEquals(primitiveTypeName, c.getPrimitive(c.getType(primitiveTypeName)));
+ }
+ for (Type type : Type.values()) {
+ assertEquals(type, c.getType(c.getPrimitive(type)));
+ }
+ for (OriginalType original : OriginalType.values()) {
+ assertEquals(original, c.getOriginalType(c.getConvertedType(original)));
+ }
+ for (ConvertedType converted : ConvertedType.values()) {
+ assertEquals(converted, c.getConvertedType(c.getOriginalType(converted)));
+ }
+ }
+
+ private FileMetaData metadata(long... sizes) {
+ List<SchemaElement> schema = emptyList();
+ List<RowGroup> rowGroups = new ArrayList<RowGroup>();
+ long offset = 0;
+ for (long size : sizes) {
+ ColumnChunk columnChunk = new ColumnChunk(offset);
+ columnChunk.setMeta_data(new ColumnMetaData(
+ INT32,
+ Collections.<org.apache.parquet.format.Encoding>emptyList(),
+ Collections.<String>emptyList(),
+ UNCOMPRESSED, 10l, size * 2, size, offset));
+ rowGroups.add(new RowGroup(Arrays.asList(columnChunk), size, 1));
+ offset += size;
+ }
+ return new FileMetaData(1, schema, sizes.length, rowGroups);
+ }
+
+ private FileMetaData filter(FileMetaData md, long start, long end) {
+ return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
+ }
+
+ private void verifyMD(FileMetaData md, long... offsets) {
+ assertEquals(offsets.length, md.row_groups.size());
+ for (int i = 0; i < offsets.length; i++) {
+ long offset = offsets[i];
+ RowGroup rowGroup = md.getRow_groups().get(i);
+ assertEquals(offset, getOffset(rowGroup));
+ }
+ }
+
+ /**
+ * verifies that splits will end up being a partition of the rowgroup
+ * they are all found only once
+ * @param md
+ * @param splitWidth
+ */
+ private void verifyAllFilters(FileMetaData md, long splitWidth) {
+ Set<Long> offsetsFound = new TreeSet<Long>();
+ for (long start = 0; start < fileSize(md); start += splitWidth) {
+ FileMetaData filtered = filter(md, start, start + splitWidth);
+ for (RowGroup rg : filtered.getRow_groups()) {
+ long o = getOffset(rg);
+ if (offsetsFound.contains(o)) {
+ fail("found the offset twice: " + o);
+ } else {
+ offsetsFound.add(o);
+ }
+ }
+ }
+ if (offsetsFound.size() != md.row_groups.size()) {
+ fail("missing row groups, "
+ + "found: " + offsetsFound
+ + "\nexpected " + md.getRow_groups());
+ }
+ }
+
+ private long fileSize(FileMetaData md) {
+ long size = 0;
+ for (RowGroup rg : md.getRow_groups()) {
+ size += rg.total_byte_size;
+ }
+ return size;
+ }
+
+ @Test
+ public void testFilterMetaData() {
+ verifyMD(filter(metadata(50, 50, 50), 0, 50), 0);
+ verifyMD(filter(metadata(50, 50, 50), 50, 100), 50);
+ verifyMD(filter(metadata(50, 50, 50), 100, 150), 100);
+ // picks up first RG
+ verifyMD(filter(metadata(50, 50, 50), 25, 75), 0);
+ // picks up no RG
+ verifyMD(filter(metadata(50, 50, 50), 26, 75));
+ // picks up second RG
+ verifyMD(filter(metadata(50, 50, 50), 26, 76), 50);
+
+ verifyAllFilters(metadata(50, 50, 50), 10);
+ verifyAllFilters(metadata(50, 50, 50), 51);
+ verifyAllFilters(metadata(50, 50, 50), 25); // corner cases are in the middle
+ verifyAllFilters(metadata(50, 50, 50), 24);
+ verifyAllFilters(metadata(50, 50, 50), 26);
+ verifyAllFilters(metadata(50, 50, 50), 110);
+ verifyAllFilters(metadata(10, 50, 500), 110);
+ verifyAllFilters(metadata(10, 50, 500), 10);
+ verifyAllFilters(metadata(10, 50, 500), 600);
+ verifyAllFilters(metadata(11, 9, 10), 10);
+ verifyAllFilters(metadata(11, 9, 10), 9);
+ verifyAllFilters(metadata(11, 9, 10), 8);
+ }
+
+ @Test
+ public void randomTestFilterMetaData() {
+ // randomized property based testing
+ // if it fails add the case above
+ Random random = new Random(System.currentTimeMillis());
+ for (int j = 0; j < 100; j++) {
+ long[] rgs = new long[random.nextInt(50)];
+ for (int i = 0; i < rgs.length; i++) {
+ rgs[i] = random.nextInt(10000) + 1; // No empty row groups
+ }
+ int splitSize = random.nextInt(10000);
+ try {
+ verifyAllFilters(metadata(rgs), splitSize);
+ } catch (AssertionError e) {
+ throw (AssertionError) new AssertionError("fail verifyAllFilters(metadata(" + Arrays.toString(rgs) + "), " + splitSize + ")").initCause(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
new file mode 100644
index 0000000..e293483
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.IOException;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
+ * different parameters than ParquetInputFormat
+ * @author Tianshuo Deng
+ */
+public class DeprecatedInputFormatTest {
+ final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
+ final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+ final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
+ Job writeJob;
+ JobConf jobConf;
+ RunningJob mapRedJob;
+ private String writeSchema;
+ private String readSchema;
+ private Configuration conf;
+
+ @Before
+ public void setUp() {
+ conf = new Configuration();
+ jobConf = new JobConf();
+ writeSchema = "message example {\n" +
+ "required int32 line;\n" +
+ "required binary content;\n" +
+ "}";
+
+ readSchema = "message example {\n" +
+ "required int32 line;\n" +
+ "required binary content;\n" +
+ "}";
+ }
+
+ private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+
+ final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+ fileSystem.delete(parquetPath, true);
+ fileSystem.delete(outputPath, true);
+ {
+ writeJob = new Job(conf, "write");
+ TextInputFormat.addInputPath(writeJob, inputPath);
+ writeJob.setInputFormatClass(TextInputFormat.class);
+ writeJob.setNumReduceTasks(0);
+ ExampleOutputFormat.setCompression(writeJob, codec);
+ ExampleOutputFormat.setOutputPath(writeJob, parquetPath);
+ writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+ writeJob.setMapperClass(ReadMapper.class);
+ ExampleOutputFormat.setSchema(
+ writeJob,
+ MessageTypeParser.parseMessageType(
+ writeSchema));
+ writeJob.submit();
+ waitForJob(writeJob);
+ }
+ {
+ jobConf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
+ jobConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, GroupReadSupport.class.getCanonicalName());
+ jobConf.setInputFormat(MyDeprecatedInputFormat.class);
+ MyDeprecatedInputFormat.setInputPaths(jobConf, parquetPath);
+ jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
+ org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(jobConf, outputPath);
+ jobConf.setMapperClass(DeprecatedWriteMapper.class);
+ jobConf.setNumReduceTasks(0);
+ mapRedJob = JobClient.runJob(jobConf);
+ }
+ }
+
+ @Test
+ public void testReadWriteWithCountDeprecated() throws Exception {
+ runMapReduceJob(CompressionCodecName.GZIP);
+ assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue() > 0L);
+ assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue() > 0L);
+ assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue()
+ == mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue());
+ //not testing the time read counter since it could be zero due to the size of data is too small
+ }
+
+ @Test
+ public void testReadWriteWithoutCounter() throws Exception {
+ jobConf.set("parquet.benchmark.time.read", "false");
+ jobConf.set("parquet.benchmark.bytes.total", "false");
+ jobConf.set("parquet.benchmark.bytes.read", "false");
+ runMapReduceJob(CompressionCodecName.GZIP);
+ assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue(), 0L);
+ assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue(), 0L);
+ assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("timeread").getValue(), 0L);
+ }
+
+ private void waitForJob(Job job) throws InterruptedException, IOException {
+ while (!job.isComplete()) {
+ System.out.println("waiting for job " + job.getJobName());
+ sleep(100);
+ }
+ System.out.println("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
+ if (!job.isSuccessful()) {
+ throw new RuntimeException("job failed " + job.getJobName());
+ }
+ }
+
+ public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
+ private SimpleGroupFactory factory;
+
+ protected void setup(Context context) throws IOException, InterruptedException {
+ factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
+ }
+
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Group group = factory.newGroup()
+ .append("line", (int) key.get())
+ .append("content", value.toString());
+ context.write(null, group);
+ }
+ }
+
+ public static class DeprecatedWriteMapper implements org.apache.hadoop.mapred.Mapper<Void, Container<Group>, LongWritable, Text> {
+
+ @Override
+ public void map(Void aVoid, Container<Group> valueContainer, OutputCollector<LongWritable, Text> longWritableTextOutputCollector, Reporter reporter) throws IOException {
+ Group value = valueContainer.get();
+ longWritableTextOutputCollector.collect(new LongWritable(value.getInteger("line", 0)), new Text(value.getString("content", 0)));
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf entries) {
+ }
+ }
+
+ static class MyDeprecatedInputFormat extends DeprecatedParquetInputFormat<Group> {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
new file mode 100644
index 0000000..73ae131
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.IOException;
+
+/**
+ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
+ * different parameters than ParquetInputFormat
+ * @author Tianshuo Deng
+ */
+public class DeprecatedOutputFormatTest {
+ final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
+ final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+ final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
+ JobConf jobConf;
+ RunningJob mapRedJob;
+ private String writeSchema;
+ private Configuration conf;
+
+ @Before
+ public void setUp() {
+ conf = new Configuration();
+ jobConf = new JobConf();
+ writeSchema = "message example {\n" +
+ "required int32 line;\n" +
+ "required binary content;\n" +
+ "}";
+ }
+
+ private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+
+ final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+ fileSystem.delete(parquetPath, true);
+ fileSystem.delete(outputPath, true);
+ {
+ jobConf.setInputFormat(TextInputFormat.class);
+ TextInputFormat.addInputPath(jobConf, inputPath);
+ jobConf.setNumReduceTasks(0);
+
+ jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
+ DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
+ DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, GroupWriteSupport.class);
+ GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), jobConf);
+
+ jobConf.setMapperClass(DeprecatedMapper.class);
+ mapRedJob = JobClient.runJob(jobConf);
+ }
+ }
+
+ @Test
+ public void testReadWrite() throws Exception {
+ runMapReduceJob(CompressionCodecName.GZIP);
+ assert(mapRedJob.isSuccessful());
+ }
+
+ public static class DeprecatedMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, Void, Group> {
+ private SimpleGroupFactory factory;
+
+ public void configure(JobConf job) {
+ factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, OutputCollector<Void, Group> outputCollector, Reporter reporter) throws IOException {
+ Group group = factory.newGroup()
+ .append("line", (int) key.get())
+ .append("content", value.toString());
+ outputCollector.collect(null, group);
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..c8b3778
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.Types;
+
+public class TestColumnChunkPageWriteStore {
+
+ private int pageSize = 1024;
+ private int initialSize = 1024;
+ private Configuration conf;
+
+ @Before
+ public void initConfiguration() {
+ this.conf = new Configuration();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
+ Path root = file.getParent();
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(root)) {
+ fs.delete(root, true);
+ }
+ fs.mkdirs(root);
+ MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
+ ColumnDescriptor col = schema.getColumns().get(0);
+ Encoding dataEncoding = PLAIN;
+ int valueCount = 10;
+ int d = 1;
+ int r = 2;
+ int v = 3;
+ BytesInput definitionLevels = BytesInput.fromInt(d);
+ BytesInput repetitionLevels = BytesInput.fromInt(r);
+ Statistics<?> statistics = new BinaryStatistics();
+ BytesInput data = BytesInput.fromInt(v);
+ int rowCount = 5;
+ int nullCount = 1;
+
+ {
+ ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+ writer.start();
+ writer.startBlock(rowCount);
+ {
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
+ PageWriter pageWriter = store.getPageWriter(col);
+ pageWriter.writePageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ statistics);
+ store.flushToFileWriter(writer);
+ }
+ writer.endBlock();
+ writer.end(new HashMap<String, String>());
+ }
+
+ {
+ ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
+ ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
+ PageReadStore rowGroup = reader.readNextRowGroup();
+ PageReader pageReader = rowGroup.getPageReader(col);
+ DataPageV2 page = (DataPageV2)pageReader.readPage();
+ assertEquals(rowCount, page.getRowCount());
+ assertEquals(nullCount, page.getNullCount());
+ assertEquals(valueCount, page.getValueCount());
+ assertEquals(d, intValue(page.getDefinitionLevels()));
+ assertEquals(r, intValue(page.getRepetitionLevels()));
+ assertEquals(dataEncoding, page.getDataEncoding());
+ assertEquals(v, intValue(page.getData()));
+ assertEquals(statistics.toString(), page.getStatistics().toString());
+ reader.close();
+ }
+ }
+
+ private int intValue(BytesInput in) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ in.writeAllTo(baos);
+ LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ int i = os.readInt();
+ os.close();
+ return i;
+ }
+
+ @Test
+ public void testColumnOrderV1() throws IOException {
+ ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
+ InOrder inOrder = inOrder(mockFileWriter);
+ MessageType schema = Types.buildMessage()
+ .required(BINARY).as(UTF8).named("a_string")
+ .required(INT32).named("an_int")
+ .required(INT64).named("a_long")
+ .required(FLOAT).named("a_float")
+ .required(DOUBLE).named("a_double")
+ .named("order_test");
+
+ BytesInput fakeData = BytesInput.fromInt(34);
+ int fakeCount = 3;
+ BinaryStatistics fakeStats = new BinaryStatistics();
+
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
+ compressor(UNCOMPRESSED), schema, initialSize);
+
+ for (ColumnDescriptor col : schema.getColumns()) {
+ PageWriter pageWriter = store.getPageWriter(col);
+ pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
+ }
+
+ // flush to the mock writer
+ store.flushToFileWriter(mockFileWriter);
+
+ for (ColumnDescriptor col : schema.getColumns()) {
+ inOrder.verify(mockFileWriter).startColumn(
+ eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+ }
+ }
+
+ private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
+ return new CodecFactory(conf).getCompressor(codec, pageSize);
+ }
+}
|